Rápido o Bulk Upsert en pymongo

¿Cómo puedo hacer un aumento de volumen en pymongo? Quiero actualizar un montón de entradas y hacerlas una a la vez es muy lento.

La respuesta a una pregunta casi idéntica está aquí: ¿ Actualización masiva / upsert en MongoDB?

La respuesta aceptada en realidad no responde a la pregunta. Simplemente proporciona un enlace a la CLI de mongo para realizar importaciones / exportaciones.

También estaría abierto a alguien que explique por qué no es posible / no es una buena práctica hacer una subida masiva, pero por favor explique cuál es la solución preferida para este tipo de problema.

MongoDB 2.6+ tiene soporte para operaciones masivas. Esto incluye inserciones masivas, actualizaciones, actualizaciones, etc. El objective de esto es reducir / eliminar los retrasos de la latencia de ida y vuelta de las operaciones de registro por registro (“documento por documento” para que sea correcto).

¿Entonces, cómo funciona esto? Ejemplo en Python, porque en eso estoy trabajando.

>>> import pymongo >>> pymongo.version '2.7rc0' 

Para usar esta función, creamos un objeto ‘masivo’, le agregamos documentos, luego ejecutamos la ejecución y enviaremos todas las actualizaciones a la vez. Advertencias: El tamaño BSON de las operaciones recostackdas (sum de los bsonsizes) no puede superar el límite de tamaño de documento de 16 MB. Por supuesto, la cantidad de operaciones puede variar significativamente, su millaje puede variar.

Ejemplo en Pymongo de la operación de inserción de Bulk:

 import pymongo conn = pymongo.MongoClient('myserver', 8839) db = conn['mydbname'] coll = db.myCollection bulkop = coll.initialize_ordered_bulk_op() retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':1}}) retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':2}}) retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':3}}) retval = bulkop.execute() 

Este es el método esencial. Más información disponible en:

http://api.mongodb.org/python/2.7rc1/examples/bulk.html

Edición: – desde la versión 3.5 del controlador python, initialize_ordered_bulk_op está en desuso. Utilice bulk_write () en su lugar. [ http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.bulk_write ]

Las versiones modernas de pymongo (mayor que 3.x) envuelven las operaciones masivas en una interfaz consistente que reduce las calificaciones en donde la versión del servidor no admite operaciones masivas. Esto es ahora consistente en los controladores soportados oficialmente por MongoDB.

Por lo tanto, el método preferido para la encoding es usar bulk_write() lugar, donde usa otro UpdateOne otra acción de operación apropiada en su lugar. Y ahora, por supuesto, se prefiere utilizar las listas de lenguaje natural en lugar de un constructor específico

La traducción directa de la vieja documentación:

 from pymongo import UpdateOne operations = [ UpdateOne({ "field1": 1},{ "$push": { "vals": 1 } },upsert=True), UpdateOne({ "field1": 1},{ "$push": { "vals": 2 } },upsert=True), UpdateOne({ "field1": 1},{ "$push": { "vals": 3 } },upsert=True) ] result = collection.bulk_write(operations) 

O el clásico bucle de transformación de documentos:

 import random from pymongo import UpdateOne random.seed() operations = [] for doc in collection.find(): # Set a random number on every document update operations.append( UpdateOne({ "_id": doc["_id"] },{ "$set": { "random": random.randint(0,10) } }) ) # Send once every 1000 in batch if ( len(operations) == 1000 ): collection.bulk_write(operations,ordered=False) operations = [] if ( len(operations) > 0 ): collection.bulk_write(operations,ordered=False) 

El resultado devuelto es de BulkWriteResult que contendrá contadores de documentos coincidentes y actualizados, así como los valores de _id devueltos para cualquier “actualización” que ocurra.

Hay un poco de una idea errónea sobre el tamaño de la matriz de operaciones masivas. La solicitud real que se envió al servidor no puede exceder el límite de BSON de 16 MB, ya que ese límite también se aplica a la “solicitud” enviada al servidor que también utiliza el formato BSON.

Sin embargo, eso no gobierna el tamaño de la matriz de solicitud que puede generar, ya que las operaciones reales solo se enviarán y procesarán en lotes de 1000 de todos modos. La única restricción real es que esas 1000 instrucciones de operación en sí mismas no crean un documento BSON mayor a 16MB. Que es de hecho una orden bastante alta.

El concepto general de métodos masivos es “menos tráfico”, como resultado de enviar muchas cosas a la vez y solo tratar con una respuesta del servidor. La reducción de esa sobrecarga asociada a cada solicitud de actualización ahorra mucho tiempo.

La respuesta sigue siendo la misma: no hay soporte para actualizaciones masivas.

Puede actualizar todos los documentos que coincidan con la especificación de su consulta usando multi = True.

Hay un error aquí acerca de hacer un lote de comandos de la forma que desee.

La actualización masiva más rápida con Python 3.5+, motor y asyncio:

 import asyncio import datetime import logging import random import time import motor.motor_asyncio import pymongo.errors async def execute_bulk(bulk): try: await bulk.execute() except pymongo.errors.BulkWriteError as err: logging.error(err.details) async def main(): cnt = 0 bulk = db.initialize_unordered_bulk_op() tasks = [] async for document in db.find({}, {}, no_cursor_timeout=True): cnt += 1 bulk.find({'_id': document['_id']}).update({'$set': {"random": random.randint(0,10)}}) if not cnt % 1000: task = asyncio.ensure_future(execute_bulk(bulk)) tasks.append(task) bulk = db.initialize_unordered_bulk_op() if cnt % 1000: task = asyncio.ensure_future(bulk.execute(bulk)) tasks.append(task) logging.info('%s processed', cnt) await asyncio.gather(*tasks) logging.basicConfig(level='INFO') db = motor.motor_asyncio.AsyncIOMotorClient()['database']['collection'] start_time = time.time() loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: execution_time = time.time() - start_time logging.info('Execution time: %s', datetime.timedelta(seconds=execution_time)) 

Si tiene muchos datos y desea usar “_id” para juzgar si existen datos,

puedes probar…

 import pymongo from pymongo import UpdateOne client = pymongo.MongoClient('localhost', 27017) db=client['sampleDB'] collectionInfo = db.sample #sample data datas=[ {"_id":123456,"name":"aaa","N":1,"comment":"first sample","lat":22,"lng":33}, {"_id":234567,"name":"aaa","N":1,"comment":"second sample","lat":22,"lng":33}, {"_id":345678,"name":"aaa","N":1,"comment":"xxx sample","lat":22,"lng":33}, {"_id":456789,"name":"aaa","N":1,"comment":"yyy sample","lat":22,"lng":33}, {"_id":123456,"name":"aaaaaaaaaaaaaaaaaa","N":1,"comment":"zzz sample","lat":22,"lng":33}, {"_id":11111111,"name":"aaa","N":1,"comment":"zzz sample","lat":22,"lng":33} ] #you should split judge item and other data ids=[data.pop("_id") for data in datas] operations=[UpdateOne({"_id":idn},{'$set':data},upsert=True) for idn ,data in zip(ids,datas)] collectionInfo.bulk_write(operations) 

Mi inglés es muy pobre, si no puedes entender lo que digo, lo siento.