(PySpark) Listas anidadas después de reduceByKey

Estoy seguro de que esto es algo muy simple, pero no encontré nada relacionado con esto.

Mi código es simple:

... stream = stream.map(mapper) stream = stream.reduceByKey(reducer) ... 

Nada extraordinario. Después de todo, la salida se ve así:

 ... key1 value1 key2 [value2, value3] key3 [[value4, value5], value6] ... 

Y así. Entonces, a veces tengo un valor fijo (si es soltero). A veces: listas anidadas que pueden ser muy, muy profundas (en mis datos de prueba simple, tenía 3 niveles de profundidad).

Intenté buscar en las fonts algo como ‘plano’, pero solo encontré el método flatMap que no es (como yo lo entiendo) lo que necesito.

No sé por qué esas listas están anidadas. Mi conjetura es que fueron manejados por diferentes procesos (¿trabajadores?) Y luego se unieron sin aplanamiento.

Por supuesto, puedo escribir un código en Python que desplegará esa lista y la aplanará. Pero creo que esta no es una situación normal, creo que casi todo el mundo necesita una producción plana.

itertools.chain deja de desplegarse en el puño y encuentra un valor no iterable. En otras palabras, todavía necesita algo de encoding (párrafo anterior).

Entonces, ¿cómo aplanar la lista utilizando los métodos nativos de PySpark?

Gracias

El problema aquí es su función de reducción. Para cada tecla, reduceByKey llama a su función de reducción con pares de valores y espera que produzca valores combinados del mismo tipo.

Por ejemplo, digamos que quería realizar una operación de conteo de palabras. Primero, puedo asignar cada palabra a un par (word, 1) , luego puedo reduceByKey(lambda x, y: x + y) para resumir los conteos de cada palabra. Al final, me quedo con un RDD de pares (word, count) .

Aquí hay un ejemplo de la documentación de la API de PySpark :

 >>> from operator import add >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(rdd.reduceByKey(add).collect()) [('a', 2), ('b', 1)] 

Para comprender por qué su ejemplo no funcionó, puede imaginar que la función de reducción se aplica de esta forma:

 reduce(reduce(reduce(firstValue, secondValue), thirdValue), fourthValue) ... 

Según su función de reducción, parece que podría estar intentando implementar la operación groupByKey , que agrupa cada clave con una lista de sus valores.

Además, eche un vistazo a combineByKey , una generalización de reduceByKey() que permite que los tipos de entrada y salida de la función reduce difieran ( reduceByKey se implementa en términos de combineByKey )

Alternativamente, stream.groupByKey().mapValues(lambda x: list(x)).collect() da

 key1 [value1] key2 [value2, value3] key3 [value4, value5, value6]