Pyspark: uso de repartitionAndSortWithinPartitions con múltiples clasificaciones Critiria

Suponiendo que estoy teniendo el siguiente RDD:

rdd = sc.parallelize([('a', (5,1)), ('d', (8,2)), ('2', (6,3)), ('a', (8,2)), ('d', (9,6)), ('b', (3,4)),('c', (8,3))]) 

¿Cómo puedo usar repartitionAndSortWithinPartitions y ordenar por x [0] y después de x [1] [0]? Usando lo siguiente, ordeno solo por la clave (x [0]):

 Npartitions = sc.defaultParallelism rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: hash(x) % Npartitions, 2) 

Una forma de hacerlo es la siguiente, pero debería haber algo más simple, supongo:

 Npartitions = sc.defaultParallelism partitioned_data = rdd .partitionBy(2) .map(lambda x:(x[0],x[1][0],x[1][1])) .toDF(['letter','number2','number3']) .sortWithinPartitions(['letter','number2'],ascending=False) .map(lambda x:(x.letter,(x.number2,x.number3))) >>> partitioned_data.glom().collect() [[], [(u'd', (9, 6)), (u'd', (8, 2))], [(u'c', (8, 3)), (u'c', (6, 3))], [(u'b', (3, 4))], [(u'a', (8, 2)), (u'a', (5, 1))] 

Como se puede ver, tengo que convertirlo en Dataframe para poder usar sortWithinPartitions . ¿Hay otra manera? Usando repartitionAndSortWIthinPartitions ?

(No importa que los datos no se clasifiquen globalmente. Me importa que solo se clasifiquen dentro de las particiones).

Es posible, pero deberá incluir toda la información requerida en la clave compuesta:

 from pyspark.rdd import portable_hash n = 2 def partitioner(n): """Partition by the first item in the key tuple""" def partitioner_(x): return portable_hash(x[0]) % n return partitioner_ (rdd .keyBy(lambda kv: (kv[0], kv[1][0])) # Create temporary composite key .repartitionAndSortWithinPartitions( numPartitions=n, partitionFunc=partitioner(n), ascending=False) .map(lambda x: x[1])) # Drop key (note: there is no partitioner set anymore) 

Explicado paso a paso:

  • keyBy(lambda kv: (kv[0], kv[1][0])) crea una clave sustituta que consiste en la clave original y el primer elemento del valor. En otras palabras se transforma:

     (0, (5,1)) 

    dentro

     ((0, 5), (0, (5, 1))) 

    En la práctica, puede ser un poco más eficiente simplemente remodelar los datos para

     ((0, 5), 1) 
  • partitioner define la función de partición basada en un hash del primer elemento de la clave, por lo que:

     partitioner(7)((0, 5)) ## 0 partitioner(7)((0, 6)) ## 0 partitioner(7)((0, 99)) ## 0 partitioner(7)((3, 99)) ## 3 

    Como puedes ver, es consistente e ignora el segundo bit.

  • usamos la función predeterminada de función keyfunc que es identidad ( lambda x: x ) y dependemos del orden lexicográfico definido en la tuple Python:

     (0, 5) < (1, 5) ## True (0, 5) < (0, 4) ## False 

Como se mencionó anteriormente, podría cambiar la forma de los datos:

 rdd.map(lambda kv: ((kv[0], kv[1][0]), kv[1][1])) 

y soltar el map final para mejorar el rendimiento.