开发者

SQLAlchemy memory hog on select statement

As p开发者_开发百科er the SQLAlchemy, select statements are treated as iterables in for loops. The effect is that a select statement that would return a massive amount of rows does not use excessive memory.

I am finding that the following statement on a MySQL table:

for row in my_connections.execute(MyTable.__table__.select()):
    yield row

Does not seem to follow this, as I overflow available memory and begin thrashing before the first row is yielded. What am I doing wrong?


The basic MySQLdb cursor fetches the entire query result at once from the server. This can consume a lot of memory and time. Use MySQLdb.cursors.SSCursor when you want to make a huge query and pull results from the server one at a time.

Therefore, try passing connect_args={'cursorclass': MySQLdb.cursors.SSCursor} when creating the engine:

   from sqlalchemy import create_engine, MetaData
   import MySQLdb.cursors
   engine = create_engine('mysql://root:zenoss@localhost/e2', connect_args={'cursorclass': MySQLdb.cursors.SSCursor})
   meta = MetaData(engine, reflect=True)
   conn = engine.connect()
   rs = s.execution_options(stream_results=True).execute()

See http://www.sqlalchemy.org/trac/ticket/1089


Note that using SSCursor locks the table until the fetch is complete. This affects other cursors using the same connection: Two cursors from the same connection can not read from the table concurrently.

However, cursors from different connections can read from the same table concurrently.

Here is some code demonstrating the problem:

import MySQLdb
import MySQLdb.cursors as cursors
import threading
import logging
import config

logger = logging.getLogger(__name__)
query = 'SELECT * FROM huge_table LIMIT 200'

def oursql_conn():
    import oursql
    conn = oursql.connect(
        host=config.HOST, user=config.USER, passwd=config.PASS,
        db=config.MYDB)
    return conn

def mysqldb_conn():
    conn = MySQLdb.connect(
        host=config.HOST, user=config.USER,
        passwd=config.PASS, db=config.MYDB,
        cursorclass=cursors.SSCursor) 
    return conn

def two_cursors_one_conn():
    """Two SSCursors can not use one connection concurrently"""
    def worker(conn):
        cursor = conn.cursor()
        cursor.execute(query)
        for row in cursor:
            logger.info(row)

    conn = mysqldb_conn()
    threads = [threading.Thread(target=worker, args=(conn, ))
               for n in range(2)]
    for t in threads:
        t.daemon = True
        t.start()
        # Second thread may hang or raise OperationalError:
        # File "/usr/lib/pymodules/python2.7/MySQLdb/cursors.py", line 289, in _fetch_row
        #   return self._result.fetch_row(size, self._fetch_type)
        # OperationalError: (2013, 'Lost connection to MySQL server during query')

    for t in threads:
        t.join()

def two_cursors_two_conn():
    """Two SSCursors from independent connections can use the same table concurrently"""    
    def worker():
        conn = mysqldb_conn()        
        cursor = conn.cursor()
        cursor.execute(query)
        for row in cursor:
            logger.info(row)

    threads = [threading.Thread(target=worker) for n in range(2)]
    for t in threads:
        t.daemon = True
        t.start()
    for t in threads:
        t.join()


logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')
two_cursors_one_conn()
two_cursors_two_conn()

Note that oursql is an alternative set of MySQL bindings for Python. oursql cursors are true server-side cursors which fetch rows lazily by default. With oursql installed, if you change

conn = mysqldb_conn()

to

conn = oursql_conn()

then two_cursors_one_conn() runs without hanging or raising an exception.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜