Inserto a granel con SQLAlchemy ORM

¿Hay alguna forma de hacer que SQLAlchemy realice una inserción masiva en lugar de insertar cada objeto individual? es decir,

obra:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3) 

más bien que:

 INSERT INTO `foo` (`bar`) VALUES (1) INSERT INTO `foo` (`bar`) VALUES (2) INSERT INTO `foo` (`bar`) VALUES (3) 

Acabo de convertir algunos códigos para usar sqlalchemy en lugar de raw sql y aunque ahora es mucho más agradable trabajar con él, parece ser más lento ahora (hasta un factor de 10), me pregunto si esta es la razón.

Puede ser que pueda mejorar la situación usando sesiones más eficientemente. En este momento tengo autoCommit=False y hago una session.commit() después de haber agregado algunas cosas. Si bien esto parece hacer que los datos se vuelvan obsoletos si se cambia la base de datos en otro lugar, como si hago una nueva consulta, ¿todavía recupero los resultados anteriores?

¡Gracias por tu ayuda!

SQLAlchemy introdujo eso en la versión 1.0.0 :

Operaciones masivas – documentos de SQLAlchemy

¡Con estas operaciones, ahora puede hacer inserciones masivas o actualizaciones!

Por ejemplo, puedes hacer:

 s = Session() objects = [ User(name="u1"), User(name="u2"), User(name="u3") ] s.bulk_save_objects(objects) s.commit() 

Aquí, se hará un inserto a granel.

Que yo sepa, no hay forma de que el ORM emita inserciones masivas. Creo que la razón subyacente es que SQLAlchemy necesita realizar un seguimiento de la identidad de cada objeto (es decir, nuevas claves primarias), y las inserciones masivas interfieren con eso. Por ejemplo, asumiendo que su tabla foo contiene una columna de id y se asigna a una clase Foo :

 x = Foo(bar=1) print x.id # None session.add(x) session.flush() # BEGIN # INSERT INTO foo (bar) VALUES(1) # COMMIT print x.id # 1 

Dado que SQLAlchemy recogió el valor de x.id sin emitir otra consulta, podemos inferir que obtuvo el valor directamente de la statement INSERT . Si no necesita un acceso posterior a los objetos creados a través de las mismas instancias, puede omitir la capa ORM para su inserción:

 Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}]) # INSERT INTO foo (bar) VALUES ((1,), (2,), (3,)) 

SQLAlchemy no puede hacer coincidir estas nuevas filas con ningún objeto existente, por lo que tendrá que consultarlas nuevamente para cualquier operación posterior.

En lo que respecta a los datos obsoletos, es útil recordar que la sesión no tiene una forma integrada de saber cuándo se cambia la base de datos fuera de la sesión. Para acceder a datos modificados externamente a través de instancias existentes, las instancias deben marcarse como caducadas . Esto sucede de forma predeterminada en session.commit() , pero se puede hacer manualmente llamando a session.expire_all() o session.expire(instance) . Un ejemplo (SQL omitido):

 x = Foo(bar=1) session.add(x) session.commit() print x.bar # 1 foo.update().execute(bar=42) print x.bar # 1 session.expire(x) print x.bar # 42 

session.commit() expira x , por lo que la primera statement de impresión abre implícitamente una nueva transacción y vuelve a consultar los atributos de x . Si comenta la primera statement de impresión, notará que la segunda ahora toma el valor correcto, ya que la nueva consulta no se emite hasta después de la actualización.

Esto tiene sentido desde el punto de vista del aislamiento transaccional: solo debe seleccionar modificaciones externas entre transacciones. Si esto le está causando problemas, sugeriría aclarar o repensar los límites de transacción de su aplicación en lugar de buscar inmediatamente session.expire_all() .

Los documentos de sqlalchemy tienen una excelente reseña del rendimiento de varias técnicas que se pueden usar para inserciones masivas:

Básicamente, los ORM no están diseñados para inserciones masivas de alto rendimiento; esta es la razón por la que SQLAlchemy ofrece el Core además del ORM como un componente de primera clase.

Para el caso de uso de inserciones masivas rápidas, el sistema de generación y ejecución de SQL que el ORM construye encima forma parte del Core. Al usar este sistema directamente, podemos producir un INSERT que compita con el uso directo de la base de datos en bruto de la API.

Alternativamente, el ORM de SQLAlchemy ofrece el conjunto de métodos de Operaciones a Granel, que proporcionan enlaces a las subsecciones de la unidad de proceso de trabajo para emitir construcciones INSERT y UPDATE de nivel básico con un pequeño grado de automatización basada en ORM.

El siguiente ejemplo ilustra las pruebas basadas en el tiempo para varios métodos diferentes de inserción de filas, yendo de lo más automatizado a lo menos. Con cPython 2.7, se observaron tiempos de ejecución:

 classics-MacBook-Pro:sqlalchemy classic$ python test.py SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs sqlite3: Total time for 100000 records 0.487842082977 sec 

Guión:

 import time import sqlite3 from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, create_engine from sqlalchemy.orm import scoped_session, sessionmaker Base = declarative_base() DBSession = scoped_session(sessionmaker()) engine = None class Customer(Base): __tablename__ = "customer" id = Column(Integer, primary_key=True) name = Column(String(255)) def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'): global engine engine = create_engine(dbname, echo=False) DBSession.remove() DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False) Base.metadata.drop_all(engine) Base.metadata.create_all(engine) def test_sqlalchemy_orm(n=100000): init_sqlalchemy() t0 = time.time() for i in xrange(n): customer = Customer() customer.name = 'NAME ' + str(i) DBSession.add(customer) if i % 1000 == 0: DBSession.flush() DBSession.commit() print( "SQLAlchemy ORM: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def test_sqlalchemy_orm_pk_given(n=100000): init_sqlalchemy() t0 = time.time() for i in xrange(n): customer = Customer(id=i+1, name="NAME " + str(i)) DBSession.add(customer) if i % 1000 == 0: DBSession.flush() DBSession.commit() print( "SQLAlchemy ORM pk given: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def test_sqlalchemy_orm_bulk_insert(n=100000): init_sqlalchemy() t0 = time.time() n1 = n while n1 > 0: n1 = n1 - 10000 DBSession.bulk_insert_mappings( Customer, [ dict(name="NAME " + str(i)) for i in xrange(min(10000, n1)) ] ) DBSession.commit() print( "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def test_sqlalchemy_core(n=100000): init_sqlalchemy() t0 = time.time() engine.execute( Customer.__table__.insert(), [{"name": 'NAME ' + str(i)} for i in xrange(n)] ) print( "SQLAlchemy Core: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def init_sqlite3(dbname): conn = sqlite3.connect(dbname) c = conn.cursor() c.execute("DROP TABLE IF EXISTS customer") c.execute( "CREATE TABLE customer (id INTEGER NOT NULL, " "name VARCHAR(255), PRIMARY KEY(id))") conn.commit() return conn def test_sqlite3(n=100000, dbname='sqlite3.db'): conn = init_sqlite3(dbname) c = conn.cursor() t0 = time.time() for i in xrange(n): row = ('NAME ' + str(i),) c.execute("INSERT INTO customer (name) VALUES (?)", row) conn.commit() print( "sqlite3: Total time for " + str(n) + " records " + str(time.time() - t0) + " sec") if __name__ == '__main__': test_sqlalchemy_orm(100000) test_sqlalchemy_orm_pk_given(100000) test_sqlalchemy_orm_bulk_insert(100000) test_sqlalchemy_core(100000) test_sqlite3(100000) 

Usualmente lo hago usando add_all .

 from app import session from models import User objects = [User(name="u1"), User(name="u2"), User(name="u3")] session.add_all(objects) session.commit() 

Se agregó soporte directo a SQLAlchemy a partir de la versión 0.8

Según los documentos , connection.execute(table.insert().values(data)) debería hacer el truco. (Tenga en cuenta que esto no es lo mismo que connection.execute(table.insert(), data) que da como resultado muchos inserciones de fila individuales a través de una llamada a executemany ). En cualquier cosa que no sea una conexión local, la diferencia en el rendimiento puede ser enorme.

SQLAlchemy introdujo eso en la versión 1.0.0 :

Operaciones masivas – documentos de SQLAlchemy

¡Con estas operaciones, ahora puede hacer inserciones masivas o actualizaciones!

Por ejemplo (si desea la sobrecarga más baja para los INSERT de tabla simples), puede usar Session.bulk_insert_mappings() :

 loadme = [ (1, 'a') , (2, 'b') , (3, 'c') ] dicts = [] for i in range(len(loadme)): dicts.append(dict(bar=loadme[i][0], fly=loadme[i][1])) s = Session() s.bulk_insert_mappings(Foo, dicts) s.commit() 

O, si lo desea, omita las tuplas de loadme y escriba los diccionarios directamente en dicts (pero me resulta más fácil dejar todo el vocabulario fuera de los datos y cargar una lista de diccionarios en un bucle).

La respuesta de Piere es correcta, pero un problema es que bulk_save_objects de forma predeterminada no devuelve las claves principales de los objetos, si eso le preocupa. Establezca return_defaults en True para obtener este comportamiento.

La documentación está aquí .

 foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')] session.bulk_save_objects(foos, return_defaults=True) for foo in foos: assert foo.id is not None session.commit() 

Esta es una manera:

 values = [1, 2, 3] Foo.__table__.insert().execute([{'bar': x} for x in values]) 

Esto se insertará así:

 INSERT INTO `foo` (`bar`) VALUES (1), (2), (3) 

Referencia: Las preguntas frecuentes de SQLAlchemy incluyen puntos de referencia para varios métodos de confirmación.

Todos los caminos llevan a Roma , pero algunos de ellos atraviesan montañas, requieren transbordadores, pero si desea llegar rápidamente, simplemente tome la autopista.


En este caso, la autopista debe utilizar la función execute_batch () de psycopg2 . La documentación lo dice lo mejor:

La implementación actual de executemany() es (con una subestimación extremadamente caritativa) no está teniendo un rendimiento particular. Estas funciones se pueden utilizar para acelerar la ejecución repetida de una statement contra un conjunto de parámetros. Al reducir la cantidad de viajes de ida y vuelta al servidor, el rendimiento puede ser de órdenes de magnitud mejor que el uso de executemany() .

En mi propia prueba, execute_batch() es aproximadamente el doble de rápido que executemany() , y ofrece la opción de configurar el tamaño de página para ajustes adicionales (si desea exprimir el último 2-3% de rendimiento del controlador).

La misma característica se puede habilitar fácilmente si está utilizando SQLAlchemy configurando use_batch_mode=True como un parámetro cuando use_batch_mode=True una instancia del motor con create_engine()

La mejor respuesta que encontré hasta ahora fue en la documentación de sqlalchemy:

http://docs.sqlalchemy.org/en/latest/faq/performance.html#im-inserting-400-000-rows-with-the-orm-and-it-s-really-slow

Hay un ejemplo completo de un punto de referencia de posibles soluciones.

Como se muestra en la documentación:

bulk_save_objects no es la mejor solución pero su rendimiento es correcto.

La segunda mejor implementación en términos de legibilidad creo que fue con SQLAlchemy Core:

 def test_sqlalchemy_core(n=100000): init_sqlalchemy() t0 = time.time() engine.execute( Customer.__table__.insert(), [{"name": 'NAME ' + str(i)} for i in xrange(n)] ) 

El contexto de esta función se da en el artículo de documentación.