pymongo: eliminar duplicados (¿reducir mapa?)

Tengo una base de datos con varias colecciones (en general ~ 15mil documentos) y los documentos se ven así (simplificado):

{'Text': 'blabla', 'ID': 101} {'Text': 'Whuppppyyy', 'ID': 102} {'Text': 'Abrakadabraaa', 'ID': 103} {'Text': 'olalalaal', 'ID': 104} {'Text': 'test1234545', 'ID': 104} {'Text': 'whapwhapwhap', 'ID': 104} 

Todos ellos también tienen un campo _id único, pero quiero eliminar duplicados de acuerdo con otro campo (el campo de ID externo).

Primero, probé un enfoque muy manual con listas y luego las eliminé, pero el DB parece demasiado grande, toma mucho tiempo y no es práctico.

En segundo lugar, lo siguiente ya no funciona en las versiones actuales de MongoDB, aunque alguien lo sugiera.

 db.collection.ensureIndex( { ID: 1 }, { unique: true, dropDups: true } ) 

Entonces, ahora estoy tratando de crear una solución de reducción de mapas, pero realmente no sé qué estoy haciendo y, especialmente, tengo dificultades para usar otro campo (no la base de datos _id) para encontrar y eliminar duplicados. Aquí está mi primer enfoque malo (adoptado de alguna fuente interent):

 map = Code("function(){ if(this.fieldName){emit(this.fieldName,1);}}") reduce = Code("function(key,values) {return Array.sum(values);}") res = coll.map_reduce(map,reduce,"my_results"); response = [] for doc in res.find(): if(doc['value'] > 1): count = int(doc['value']) - 1 docs = col.find({"fieldName":doc['ID']},{'ID':1}).limit(count) for i in docs: response.append(i['ID']) coll.remove({"ID": {"$in": response}}) 

Cualquier ayuda para reducir cualquier duplicado en el campo de ID externo (dejando una entrada), sería muy apreciada;) ¡Gracias!

Un enfoque alternativo es utilizar el aggregation framework que tiene un mejor rendimiento que el mapa reducido. Considere la siguiente canalización de agregación que, como primera etapa de la canalización de agregación, el operador de $group agrupa los documentos por el campo de ID y almacena en el campo _id cada valor _id de los registros agrupados utilizando el operador $addToSet . El operador del acumulador $sum sum los valores de los campos que se le han pasado, en este caso la constante 1, contando así el número de registros agrupados en el campo de conteo. El otro paso de canalización $match filtra documentos con un recuento de al menos 2, es decir, duplicados.

Una vez que obtiene el resultado de la agregación, itera el cursor para eliminar el primer _id en el campo unique_ids , luego unique_ids el rest en una matriz que se usará más adelante para eliminar los duplicados (menos una entrada):

 cursor = db.coll.aggregate( [ {"$group": {"_id": "$ID", "unique_ids": {"$addToSet": "$_id"}, "count": {"$sum": 1}}}, {"$match": {"count": { "$gte": 2 }}} ] ) response = [] for doc in cursor: del doc["unique_ids"][0] for id in doc["unique_ids"]: response.append(id) coll.remove({"_id": {"$in": response}}) 

Primero, probé un enfoque muy manual con listas y luego las eliminé, pero el DB parece demasiado grande, toma mucho tiempo y no es práctico.

La mejor apuesta es utilizar el método .aggregate() que proporciona acceso a la canalización de agregación para encontrar los documentos que están duplicados. La primera etapa en la tubería es la etapa de $group donde agrupa sus documentos por la clave duplicada y luego utiliza los operadores $push y $sum acumulador que, respectivamente, devuelven una matriz de todos los _id para cada grupo y el recuento de elementos en el grupo. La etapa siguiente y la última en la canalización es la etapa de $match para devolver solo aquellos resultados donde haya una “ID” duplicada. A partir de ahí, itere el cursor y actualice cada documento con operaciones “masivas” .

 pipeline = [{'$group': {'_id': '$ID', 'count': {'$sum': 1}, 'ids': {'$push': '$_id'}}}, {'$match': {'count': {'$gte': 2}}}] bulk = db.collection.initialize_ordered_bulk_op() count = 0 for document in db.collection.aggregate(pipeline): it = iter(document['ids']) next(it) for id in it: bulk.find({'_id': id}).remove_one({'_id': id}) count = count + 1 if count % 1000 == 0: bulk.execute() if count > 0: bulk.execute() 

MongoDB 3.2 desaprueba Bulk() y sus métodos asociados, por lo que deberá utilizar el método bulk_write() para ejecutar su solicitud.

 from pymongo import DeleteOne request = [] for document in db.collection.aggregate(pipeline): it = iter(document['ids']) next(it) for id in it: requests.append(DeleteOne({'_id': id})) db.collection.bulk_write(requests) 

También puede hacer esto en el shell como se muestra en las respuestas aceptadas para eliminar dups de mongodb y ¿Cómo eliminar duplicados con una determinada condición en mongodb?