Indexa un dataframe de pandas en Elasticsearch sin elasticsearch-py

Me gustaría indexar un montón de grandes marcos de datos de pandas (algunos millones de filas y 50 columnas) en Elasticsearch.

Al buscar ejemplos sobre cómo hacer esto, la mayoría de la gente usará el método de ayuda masiva de elasticsearch-py , pasándole una instancia de la clase Elasticsearch que maneja la conexión, así como una lista de diccionarios que se crea con el dataframe.to_dict de pandas ( orientar = ‘registros’) método . Los metadatos se pueden insertar en el dataframe de antemano como nuevas columnas, por ejemplo, df['_index'] = 'my_index' etc.

Sin embargo, tengo razones para no usar la biblioteca elasticsearch-py y me gustaría hablar directamente con la API masiva de Elasticsearch , por ejemplo, a través de solicitudes u otra biblioteca HTTP conveniente. Además, df.to_dict() es muy lento en grandes marcos de datos, desafortunadamente, y convertir un dataframe en una lista de dictados que luego se serializa a JSON por elasticsearch-py suena como una sobrecarga innecesaria cuando hay algo como dataframe.to_json () que Es bastante rápido incluso en grandes marcos de datos.

¿Cuál sería un enfoque fácil y rápido para obtener un dataframe de pandas en el formato requerido por la API masiva? Creo que un paso en la dirección correcta es usar dataframe.to_json() siguiente manera:

 import pandas as pd df = pd.DataFrame.from_records([{'a': 1, 'b': 2}, {'a': 3, 'b': 4}, {'a': 5, 'b': 6}]) df ab 0 1 2 1 3 4 2 5 6 df.to_json(orient='records', lines=True) '{"a":1,"b":2}\n{"a":3,"b":4}\n{"a":5,"b":6}' 

Esta es ahora una cadena JSON separada por una nueva línea, sin embargo, aún falta los metadatos. ¿Cuál sería una manera de lograrlo allí?

edición: para completar, un documento JSON de metadatos se vería así:

 {"index": {"_index": "my_index", "_type": "my_type"}} 

Por lo tanto, al final, todo el JSON esperado por la API masiva se vería así (con un salto de línea adicional después de la última línea):

 {"index": {"_index": "my_index", "_type": "my_type"}} {"a":1,"b":2} {"index": {"_index": "my_index", "_type": "my_type"}} {"a":3,"b":4} {"index": {"_index": "my_index", "_type": "my_type"}} {"a":5,"b":6} 

Mientras tanto, descubrí múltiples posibilidades de cómo hacerlo con al menos una velocidad razonable:

 import json import pandas as pd import requests # df is a dataframe or dataframe chunk coming from your reading logic df['_id'] = df['column_1'] + '_' + df['column_2'] # or whatever makes your _id df_as_json = df.to_json(orient='records', lines=True) final_json_string = '' for json_document in df_as_json.split('\n'): jdict = json.loads(json_document) metadata = json.dumps({'index': {'_id': jdict['_id']}}) jdict.pop('_id') final_json_string += metadata + '\n' + json.dumps(jdict) + '\n' headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} r = requests.post('http://elasticsearch.host:9200/my_index/my_type/_bulk', data=final_json_string, headers=headers, timeout=60) 

En lugar de usar el método to_json() pandas, también se podría usar to_dict() siguiente manera. Esto fue un poco más lento en mis pruebas pero no mucho:

 dicts = df.to_dict(orient='records') final_json_string = '' for document in dicts: metadata = {"index": {"_id": document["_id"]}} document.pop('_id') final_json_string += json.dumps(metadata) + '\n' + json.dumps(document) + '\n' 

Al ejecutar esto en grandes conjuntos de datos, uno puede ahorrar un par de minutos reemplazando la biblioteca json predeterminada de Python con ujson o rapidjson a través de la instalación, luego import ujson as json o import rapidjson as json , respectivamente.

Se puede lograr una aceleración aún mayor al reemplazar la ejecución secuencial de los pasos con uno paralelo para que la lectura y la conversión no se detengan mientras las solicitudes esperan que Elasticsearch procese todos los documentos y devuelva una respuesta. Esto podría hacerse a través de subprocesos, multiprocesamiento, asyncio, colas de tareas, … pero esto está fuera del scope de esta pregunta.

Si encuentra un método para hacer la conversión a json aún más rápido, avíseme.

Esta función inserta un dataframe de pandas en la búsqueda elástica (trozo por trozo)

 def insertDataframeIntoElastic(dataFrame,index='index', typ = 'test', server = 'http://localhost:9200', chunk_size = 2000): headers = {'content-type': 'application/x-ndjson', 'Accept-Charset': 'UTF-8'} records = dataFrame.to_dict(orient='records') actions = ["""{ "index" : { "_index" : "%s", "_type" : "%s"} }\n""" % (index, typ) +json.dumps(records[j]) for j in range(len(records))] i=0 while i