SQLAlchemy manejo adecuado de sesión en aplicaciones multi-hilo

Tengo problemas para entender cómo abrir y cerrar las sesiones de la base de datos de manera eficiente, como entendí en la documentación de sqlalchemy, si uso scoped_session para construir mi objeto Session y luego el objeto Session devuelto para crear sesiones, es seguro para subprocesos, así que básicamente cada hilo. obtendrá su propia sesión, y no habrá problemas con ella. Ahora el ejemplo a continuación funciona, lo coloco en un bucle infinito para ver si cierra las sesiones correctamente y si lo controlé correctamente (en mysql ejecutando “SHOW PROCESSLIST;”), las conexiones siguen creciendo, no las cierra. , aunque utilicé session.close (), e incluso elimine el objeto scoped_session al final de cada ejecución. ¿Qué estoy haciendo mal? Mi objective en una aplicación más grande es usar el número mínimo de conexiones de base de datos requeridas, porque mi implementación actual crea una nueva sesión en cada método donde se requiere y la cierra antes de regresar, lo que parece ineficaz.

from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from threading import Thread from Queue import Queue, Empty as QueueEmpty from models import MyModel DATABASE_CONNECTION_INFO = 'mysql://username:password@localhost:3306/dbname' class MTWorker(object): def __init__(self, worker_count=5): self.task_queue = Queue() self.worker_count = worker_count self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) self.DBSession = scoped_session( sessionmaker( autoflush=True, autocommit=False, bind=self.db_engine ) ) def _worker(self): db_session = self.DBSession() while True: try: task_id = self.task_queue.get(False) try: item = db_session.query(MyModel).filter(MyModel.id == task_id).one() # do something with item except Exception as exc: # if an error occurrs we skip it continue finally: db_session.commit() self.task_queue.task_done() except QueueEmpty: db_session.close() return def start(self): try: db_session = self.DBSession() all_items = db_session.query(MyModel).all() for item in all_items: self.task_queue.put(item.id) for _i in range(self.worker_count): t = Thread(target=self._worker) t.start() self.task_queue.join() finally: db_session.close() self.DBSession.remove() if __name__ == '__main__': while True: mt_worker = MTWorker(worker_count=50) mt_worker.start() 

Solo debe llamar a create_engine y scoped_session una vez por proceso (por base de datos). Cada uno obtendrá su propio grupo de conexiones o sesiones (respectivamente), por lo que desea asegurarse de que solo está creando un grupo. Solo hazlo a nivel de módulo global. si necesita administrar sus sesiones más rápidamente que eso, probablemente no debería usar scoped_session

Otro cambio que se debe hacer es usar DBSession directamente como si fuera una sesión. los métodos de sesión de llamada en scoped_session crearán de forma transparente una sesión de subproceso local, si es necesario, y reenviarán la llamada de método a la sesión.

Otra cosa a tener en cuenta es el pool_size de la agrupación de conexiones, que es 5 de forma predeterminada. Para muchas aplicaciones está bien, pero si está creando muchos subprocesos, es posible que deba ajustar ese parámetro

 DATABASE_CONNECTION_INFO = 'mysql://username:password@localhost:3306/dbname' db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) DBSession = scoped_session( sessionmaker( autoflush=True, autocommit=False, bind=db_engine ) ) class MTWorker(object): def __init__(self, worker_count=5): self.task_queue = Queue() self.worker_count = worker_count # snip