Preparar mi bigdata con Spark a través de Python

Mis 100m de tamaño, datos cuantificados:

(1424411938', [3885, 7898]) (3333333333', [3885, 7898]) 

Resultado deseado:

 (3885, [3333333333, 1424411938]) (7898, [3333333333, 1424411938]) 

Entonces, lo que quiero es transformar los datos para que agrupe 3885 (por ejemplo) con todos los data[0] que los tienen). Aquí está lo que hice en python :

 def prepare(data): result = [] for point_id, cluster in data: for index, c in enumerate(cluster): found = 0 for res in result: if c == res[0]: found = 1 if(found == 0): result.append((c, [])) for res in result: if c == res[0]: res[1].append(point_id) return result 

pero cuando mapPartitions() ‘ed data RDD con prepare() , parece que hace lo que quiero solo en la partición actual, por lo tanto, devolver un resultado más grande que el deseado.

Por ejemplo, si el primer registro en el inicio estaba en la primera partición y el segundo en la segunda, entonces obtendría como resultado:

     (3885, [3333333333]) (7898, [3333333333]) (3885, [1424411938]) (7898, [1424411938]) 

    ¿Cómo modificar mi prepare() para obtener el efecto deseado? Alternativamente, ¿cómo procesar el resultado que prepare() produce, para que pueda obtener el resultado deseado?


    Como ya habrás notado en el código, no me importa la velocidad en absoluto.

    Aquí hay una forma de crear los datos:

     data = [] from random import randint for i in xrange(0, 10): data.append((randint(0, 100000000), (randint(0, 16000), randint(0, 16000)))) data = sc.parallelize(data) 

    Puedes usar un montón de transformaciones básicas de pyspark para lograr esto.

     >>> rdd = sc.parallelize([(1424411938, [3885, 7898]),(3333333333, [3885, 7898])]) >>> r = rdd.flatMap(lambda x: ((a,x[0]) for a in x[1])) 

    Usamos flatMap para tener una clave, un par de valores para cada elemento en x[1] y cambiamos el formato de línea de datos a (a, x[0]) , el a aquí es cada elemento en x[1] . Para entender mejor flatMap puede consultar la documentación.

     >>> r2 = r.groupByKey().map(lambda x: (x[0],tuple(x[1]))) 

    Acabamos de agrupar todas las claves, pares de valores por sus claves y usamos la función de tupla para convertirlo en iterable.

     >>> r2.collect() [(3885, (1424411938, 3333333333)), (7898, (1424411938, 3333333333))] 

    Como dijiste, puedes usar [: 150] para tener los primeros 150 elementos, supongo que este sería un uso adecuado:

    r2 = r.groupByKey().map(lambda x: (x[0],tuple(x[1])[:150]))

    Intenté ser lo más explicativo posible. Espero que esto ayude.