Chispa – Operación RDD anidada

Tengo dos RDD dicen

rdd1 = id | created | destroyed | price 1 | 1 | 2 | 10 2 | 1 | 5 | 11 3 | 2 | 3 | 11 4 | 3 | 4 | 12 5 | 3 | 5 | 11 rdd2 = [1,2,3,4,5] # lets call these value as timestamps (ts) 

rdd2 se genera básicamente mediante el rango (valor de valor inicial, valor final, intervalo). Los parámetros aquí pueden variar. El tamaño puede ser igual o diferente a rdd1. La idea es obtener registros de rdd1 en rdd2 en función de los valores de rdd2 mediante un filtro de criertia (los registros de rdd1 se pueden repetir mientras se recuperan como se puede ver en la salida)

criterios de filtrado rdd1.created <= ts <rdd1.destroyed)

Rendimiento esperado:

 ts | prices 1 | 10,11 # ie for ids 1,2 of rdd1 2 | 11,11 # ids 2,3 3 | 11,12,11 # ids 2,4,5 4 | 11,11 # ids 2,5 

Ahora quiero filtrar RDD1 en función de alguna condición que use las teclas de RDD2. (descrito anteriormente) Y devuelve los resultados que unen las claves de RDD2 y los resultados filtrados de RDD1

Así que hago:

 rdd2.map(lambda x : somefilterfunction(x, rdd1)) def somefilterfunction(x, rdd1): filtered_rdd1 = rdd1.filter(rdd1[1]  x) prices = filtered_rdd1.map(lambda x : x[3]) res = prices.collect() return (x, list(res)) 

Y me sale:

Excepción: parece que está intentando transmitir un RDD o hacer referencia a un RDD desde una acción o transformación. Las transformaciones y acciones RDD solo pueden ser invocadas por el controlador, no dentro de otras transformaciones; por ejemplo, rdd1.map (lambda x: rdd2.values.count () * x) no es válido porque la transformación de valores y la acción de conteo no se pueden realizar dentro de la transformación rdd1.map. Para más información, vea SPARK-5063.

Intenté usar groupBy, pero ya que aquí los elementos de rdd1 pueden repetirse una y otra vez en comparación con la agrupación que entiendo que podría agrupar cada elemento de rdd1 en una ranura en particular una sola vez.

La única forma ahora es usar un bucle normal para hacer el filtrado y unir todo al final.

¿Alguna sugerencia?

Dado que utiliza el rango normal, no hay ninguna razón para crear un segundo RDD. Simplemente puede generar valores en un rango específico para cada registro:

 from __future__ import division # Required only for Python 2.x from math import ceil from itertools import takewhile rdd1 = sc.parallelize([ (1, 1, 2, 10), (2, 1, 5, 11), (3, 2, 3, 11), (4, 3, 4, 12), (5, 3, 5, 11), ]) def generate(start, end, step): def _generate(id, created, destroyed, price): # Smallest ts >= created start_for_record = int(ceil((created - start) / step) * step + start) rng = takewhile( lambda x: created <= x < destroyed, xrange(start_for_record, end, step)) # In Python 3.x use range for i in rng: yield i, price return _generate result = rdd1.flatMap(lambda x: generate(1, 6, 1)(*x)).groupByKey() 

Y el resultado:

 result.mapValues(list).collect() ## [(1, [10, 11]), (2, [11, 11]), (3, [11, 12, 11]), (4, [11, 11])]