Cómo utilizar elasticsearch.helpers.streaming_bulk

Alguien puede recomendar cómo usar la función elasticsearch.helpers.streaming_bulk en lugar de elasticsearch.helpers.bulk para indexar datos en elasticsearch.

Si simplemente cambio streaming_bulk en lugar de en bloque, nada se indexa, por lo que supongo que se debe utilizar de forma diferente.

El código a continuación crea datos de índice, tipo e índice del archivo CSV en trozos de 500 elementos en elasticsearch. Está funcionando correctamente, pero estoy vagando, ¿es posible boost el rendimiento? Es por eso que quiero probar la función streaming_bulk.

Actualmente necesito 10 minutos para indexar 1 millón de filas para un documento CSV de 200 MB. Uso dos máquinas, Centos 6.6 con 8 CPU-s, x86_64, CPU MHz: 2499.902, Mem: 15.574G en total. No estoy seguro de que pueda ir más rápido.

es = elasticsearch.Elasticsearch([{'host': 'uxmachine-test', 'port': 9200}]) index_name = 'new_index' type_name = 'new_type' mapping = json.loads(open(config["index_mapping"]).read()) #read mapping from json file es.indices.create(index_name) es.indices.put_mapping(index=index_name, doc_type=type_name, body=mapping) with open(file_to_index, 'rb') as csvfile: reader = csv.reader(csvfile) #read documents for indexing from CSV file, more than million rows content = {"_index": index_name, "_type": type_name} batch_chunks = [] iterator = 0 for row in reader: var = transform_row_for_indexing(row,fields, index_name, type_name,id_name,id_increment) id_increment = id_increment + 1 #var = transform_row_for_indexing(row,fields, index_name, type_name) batch_chunks.append(var) if iterator % 500 == 0: helpers.bulk(es,batch_chunks) del batch_chunks[:] print "ispucalo batch" iterator = iterator + 1 # indexing of last batch_chunk if len(batch_chunks) != 0: helpers.bulk(es,batch_chunks) 

Así que la transmisión masiva devuelve un interator. Lo que significa que no pasará nada hasta que comiences a iterar sobre él. El código para la función ‘bulk’ se ve así:

 success, failed = 0, 0 # list of errors to be collected is not stats_only errors = [] for ok, item in streaming_bulk(client, actions, **kwargs): # go through request-reponse pairs and detect failures if not ok: if not stats_only: errors.append(item) failed += 1 else: success += 1 return success, failed if stats_only else errors 

Básicamente, solo llamar a streaming_bulk (cliente, acciones, ** kwargs) no hará nada. No es hasta que lo repitas, como se hace en este bucle for, que la indexación realmente comienza a suceder.

Así que en su código. Le invitamos a cambiar ‘bulk’ a ‘streaming_bulk’; sin embargo, debe recorrer los resultados de la transmisión en masa para que, en realidad, tenga algo indexado.

streaming_bulk consume un iterador de actions y produce una respuesta para cada acción. Por lo tanto, primero debería escribir un iterador simple sobre sus documentos como este:

 def document_stream(file_to_index): with open(file_to_index, "rb") as csvfile: for row in csv.reader(csvfile): yield {"_index": index_name, "_type": type_name, "_source": transform_row(row) } 

Y luego hacer la inserción masiva en streaming.

 stream = document_stream(file_to_index) for ok, response in streaming_bulk(es, actions = stream): if not ok: # failure inserting print response