Devuelve el RDD de los valores N más grandes de otro RDD en SPARK

Estoy tratando de filtrar un RDD de tuplas para devolver las tuplas N más grandes según los valores clave. Necesito que el formato de retorno sea un RDD.

Así que el RDD:

[(4, 'a'), (12, 'e'), (2, 'u'), (49, 'y'), (6, 'p')] 

Filtrado para las 3 claves más grandes debería devolver el RDD:

 [(6,'p'), (12,'e'), (49,'y')] 

Hacer un sortByKey() y luego take(N) devuelve los valores y no produce un RDD, por lo que no funcionará.

Podría devolver todas las claves, ordenarlas, encontrar el valor Nth más grande y luego filtrar el RDD para valores de clave mayores que eso, pero eso parece muy ineficiente.

Cuál sería la mejor forma de hacer esto?

Con RDD

Una solución rápida pero no particularmente eficiente es seguir sortByKey usar zipWithIndex y filter :

 n = 3 rdd = sc.parallelize([(4, 'a'), (12, 'e'), (2, 'u'), (49, 'y'), (6, 'p')]) rdd.sortByKey().zipWithIndex().filter(lambda xi: xi[1] < n).keys() 

Si n es relativamente pequeño en comparación con el tamaño de RDD, un enfoque un poco más eficiente es evitar la clasificación completa:

 import heapq def key(kv): return kv[0] top_per_partition = rdd.mapPartitions(lambda iter: heapq.nlargest(n, iter, key)) top_per_partition.sortByKey().zipWithIndex().filter(lambda xi: xi[1] < n).keys() 

Si las claves son mucho más pequeñas que los valores y el orden de salida final no importa, el enfoque de filter puede funcionar bien:

 keys = rdd.keys() identity = lambda x: x offset = (keys .mapPartitions(lambda iter: heapq.nlargest(n, iter)) .sortBy(identity) .zipWithIndex() .filter(lambda xi: xi[1] < n) .keys() .max()) rdd.filter(lambda kv: kv[0] <= offset) 

Tampoco mantendrá n valores exactos en caso de empate.

Con DataFrames

Puedes simplemente orderBy y limit

 from pyspark.sql.functions import col rdd.toDF().orderBy(col("_1").desc()).limit(n) 

Un enfoque de menor esfuerzo ya que solo desea convertir los resultados de take(N) a un nuevo RDD.

 sc.parallelize(yourSortedRdd.take(Nth))