Cargar Spark RDD a Neo4j en Python

Estoy trabajando en un proyecto en el que estoy usando el procesamiento de Spark for Data. Mis datos ahora están procesados ​​y necesito cargar los datos en Neo4j . Después de cargar en Neo4j, lo usaré para mostrar los resultados.

Quería que toda la implementación se hiciera en la progtwigción de Python . Pero no pude encontrar ninguna biblioteca o ejemplo en la red. ¿Puede por favor ayudar con enlaces o las bibliotecas o cualquier ejemplo.

Mi RDD es un PairdRDD. Y en cada tupla, tengo que crear una relación.
PairdRDD

Key Value Jack [a,b,c] 

Para propósitos de simplicidad, transformé el RDD a

  Key value Jack a Jack b Jack c 

Entonces tengo que crear relaciones entre

  Jack->a Jack->b Jack->c 

Según la respuesta de William, puedo cargar una lista directamente. Pero estos datos están arrojando el error de cifrado.

Intenté así:

  def writeBatch(b): print("writing batch of " + str(len(b))) session = driver.session() session.run('UNWIND {batch} AS elt MERGE (n:user1 {user: elt[0]})', {'batch': b}) session.close() def write2neo(v): batch_d.append(v) for hobby in v[1]: batch_d.append([v[0],hobby]) global processed processed += 1 if len(batch) >= 500 or processed >= max: writeBatch(batch) batch[:] = [] max = userhobbies.count() userhobbies.foreach(write2neo) 

b es la lista de listas. Elt desenrollado es una lista de dos elementos elt [0], elt [1] como clave y valores.

Error

 ValueError: Structure signature must be a single byte value 

Gracias de antemano.

Puedes hacer un foreach en tu RDD, ejemplo:

 from neo4j.v1 import GraphDatabase, basic_auth driver = GraphDatabase.driver("bolt://localhost", auth=basic_auth("",""), encrypted=False) from pyspark import SparkContext sc = SparkContext() dt = sc.parallelize(range(1, 5)) def write2neo(v): session = driver.session() session.run("CREATE (n:Node {value: {v} })", {'v': v}) session.close() dt.foreach(write2neo) 

Sin embargo, me gustaría mejorar la función para agrupar las escrituras, pero este simple fragmento de código funciona para la implementación básica

ACTUALIZACIÓN CON EJEMPLO DE ESCRITOS POR LOTES

 sc = SparkContext() batch = [] max = None processed = 0 def writeBatch(b): print("writing batch of " + str(len(b))) session = driver.session() session.run('UNWIND {batch} AS elt CREATE (n:Node {v: elt})', {'batch': b}) session.close() def write2neo(v): batch.append(v) global processed processed += 1 if len(batch) >= 500 or processed >= max: writeBatch(batch) batch[:] = [] dt = sc.parallelize(range(1, 2136)) max = dt.count() dt.foreach(write2neo) 

– Que resultados con

 16/09/15 12:25:47 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) writing batch of 500 writing batch of 500 writing batch of 500 writing batch of 500 writing batch of 135 16/09/15 12:25:47 INFO PythonRunner: Times: total = 279, boot = -103, init = 245, finish = 137 16/09/15 12:25:47 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1301 bytes result sent to driver 16/09/15 12:25:47 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 294 ms on localhost (1/1) 16/09/15 12:25:47 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 16/09/15 12:25:47 INFO DAGScheduler: ResultStage 1 (foreach at /Users/ikwattro/dev/graphaware/untitled/writeback.py:36) finished in 0.295 s 16/09/15 12:25:47 INFO DAGScheduler: Job 1 finished: foreach at /Users/ikwattro/dev/graphaware/untitled/writeback.py:36, took 0.308263 s