Cómo empujar un dataframe de Spark a la búsqueda elástica (Pyspark)

Beginner ES pregunta aquí

¿Cuál es el flujo de trabajo o los pasos para enviar un Spark Dataframe a Elastic Search?

De la investigación, creo que necesito usar el método spark.newAPIHadoopFile () .

Sin embargo, al profundizar en la documentación de Elastic Search y en otras preguntas y respuestas de la stack, todavía estoy un poco confundido sobre en qué formato deben estar los argumentos y por qué

Tenga en cuenta que estoy usando pyspark, esta es una nueva tabla para ES (no existe un índice ya), y el df es de 5 columnas (2 tipos de cadena, 2 tipos largos y 1 lista de entradas) con ~ 3.5M filas.

Esto me funcionó, tenía mis datos en df .

 df = df.drop('_id') df.write.format( "org.elasticsearch.spark.sql" ).option( "es.resource", '%s/%s' % (conf['index'], conf['doc_type']) ).option( "es.nodes", conf['host'] ).option( "es.port", conf['port'] ).save() 

“ `

/path/to/spark-submit --master spark://master:7077 --jars ./jar_files/elasticsearch-hadoop-5.6.4.jar --driver-class-path ./jar_files/elasticsearch-hadoop-5.6.4.jar main_df.py mi trabajo usando /path/to/spark-submit --master spark://master:7077 --jars ./jar_files/elasticsearch-hadoop-5.6.4.jar --driver-class-path ./jar_files/elasticsearch-hadoop-5.6.4.jar main_df.py comando /path/to/spark-submit --master spark://master:7077 --jars ./jar_files/elasticsearch-hadoop-5.6.4.jar --driver-class-path ./jar_files/elasticsearch-hadoop-5.6.4.jar main_df.py .

Logré encontrar una respuesta así que la compartiré. Los Spark DF (de pyspark.sql) no admiten actualmente los métodos newAPIHadoopFile() ; sin embargo, df.rdd.saveAsNewAPIHadoopFile() me estaba dando errores. El truco consistía en convertir el df a cadenas a través de la siguiente función

 def transform(doc): import json import hashlib _json = json.dumps(doc) keys = doc.keys() for key in keys: if doc[key] == 'null' or doc[key] == 'None': del doc[key] if not doc.has_key('id'): id = hashlib.sha224(_json).hexdigest() doc['id'] = id else: id = doc['id'] _json = json.dumps(doc) return (id, _json) 

Así que mi flujo de trabajo JSON es:

1: df = spark.read.json('XXX.json')

2: rdd_mapped = df.rdd.map(lambda y: y.asDict())

3: final_rdd = rdd_mapped.map(transform)

4:

 final_rdd.saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf={ "es.resource" : " / ", "es.mapping.id":"id", "es.input.json": "true", "es.net.http.auth.user":"elastic", "es.write.operation":"index", "es.nodes.wan.only":"false", "es.net.http.auth.pass":"changeme", "es.nodes":", , ...", "es.port":"9200" }) 

Puede encontrar más información sobre los argumentos de ES aquí ( vaya a “Configuración”)