¿Cómo agrupar por teclas múltiples en chispa?

Tengo un montón de tuplas que están en forma de claves compuestas y valores. Por ejemplo,

tfile.collect() = [(('id1','pd1','t1'),5.0), (('id2','pd2','t2'),6.0), (('id1','pd1','t2'),7.5), (('id1','pd1','t3'),8.1) ] 

Quiero realizar operaciones de tipo sql en esta colección, donde puedo agregar la información basada en id [1..n] o pd [1..n]. Quiero implementar el uso de apis de vanilla pyspark y no usar SQLContext. En mi implementación actual, estoy leyendo un montón de archivos y fusionando el RDD.

 def readfile(): fr = range(6,23) tfile = sc.union([sc.textFile(basepath+str(f)+".txt") .map(lambda view: set_feature(view,f)) .reduceByKey(lambda a, b: a+b) for f in fr]) return tfile 

Tengo la intención de crear una matriz agregada como un valor. Por ejemplo,

 agg_tfile = [((id1,pd1),[5.0,7.5,8.1])] 

donde 5.0,7.5,8.1 representa [t1, t2, t3]. Actualmente estoy logrando lo mismo con el código de vainilla python usando diccionarios. Funciona bien para conjuntos de datos más pequeños. Pero me preocupa ya que esto no puede escalar para conjuntos de datos más grandes. ¿Hay alguna manera eficiente de lograr lo mismo usando pyspark apis?

Mi conjetura es que usted quiere transponer los datos de acuerdo a múltiples campos.

Una forma sencilla es concatenar los campos de destino que agrupará y convertirlo en una clave en un RDD emparejado. Por ejemplo:

 lines = sc.parallelize(['id1,pd1,t1,5.0', 'id2,pd2,t2,6.0', 'id1,pd1,t2,7.5', 'id1,pd1,t3,8.1']) rdd = lines.map(lambda x: x.split(',')).map(lambda x: (x[0] + ', ' + x[1], x[3])).reduceByKey(lambda a, b: a + ', ' + b) print rdd.collect() 

Entonces obtendrás el resultado transpuesto.

 [('id1, pd1', '5.0, 7.5, 8.1'), ('id2, pd2', '6.0')] 

Agrupé ((id1, t1), ((p1,5.0), (p2,6.0)) y así sucesivamente … como mi función de mapa. Más tarde, reduzco utilizando map_group que crea una matriz para [p1, p2,. .] y llena valores en sus respectivas posiciones.

 def map_group(pgroup): x = np.zeros(19) x[0] = 1 value_list = pgroup[1] for val in value_list: fno = val[0].split('.')[0] x[int(fno)-5] = val[1] return x tgbr = tfile.map(lambda d: ((d[0][0],d[0][2]),[(d[0][1],d[1])])) \ .reduceByKey(lambda p,q:p+q) \ .map(lambda d: (d[0], map_group(d))) 

Esto se siente como una solución costosa en términos de computación. Pero funciona por ahora.