¿Cómo equilibrar mis datos en las particiones?

Edición : la respuesta es de ayuda, pero describí mi solución en: problema de memoria en Spark .


Tengo un RDD con particiones 202092, que lee un conjunto de datos creado por otros. Puedo ver manualmente que los datos no están equilibrados en las particiones, por ejemplo, algunas de ellas tienen 0 imágenes y otras tienen 4k, mientras que la media está en 432. Al procesar los datos, recibí este error:

Container killed by YARN for exceeding memory limits. 16.9 GB of 16 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 

mientras memoryOverhead ya está impulsado. Siento que están ocurriendo algunos picos que hacen que Yarn mate mi contenedor, porque ese pico desborda los bordes especificados.

Entonces, ¿qué debo hacer para asegurarme de que mis datos estén (aproximadamente) equilibrados en las particiones?


Mi idea era que la repartición () funcionaría, invoca la reproducción aleatoria:

 dataset = dataset.repartition(202092) 

pero acabo de recibir el mismo error, a pesar de las instrucciones de la guía de progtwigción :

repartition (numPartitions)

Reordena los datos en el RDD de forma aleatoria para crear más o menos particiones y equilibrarlas . Esto siempre baraja todos los datos a través de la red.


Revisa mi ejemplo de juguete sin embargo:

 data = sc.parallelize([0,1,2], 3).mapPartitions(lambda x: range((x.next() + 1) * 1000)) d = data.glom().collect() len(d[0]) # 1000 len(d[1]) # 2000 len(d[2]) # 3000 repartitioned_data = data.repartition(3) re_d = repartitioned_data.glom().collect() len(re_d[0]) # 1854 len(re_d[1]) # 1754 len(re_d[2]) # 2392 repartitioned_data = data.repartition(6) re_d = repartitioned_data.glom().collect() len(re_d[0]) # 422 len(re_d[1]) # 845 len(re_d[2]) # 1643 len(re_d[3]) # 1332 len(re_d[4]) # 1547 len(re_d[5]) # 211 repartitioned_data = data.repartition(12) re_d = repartitioned_data.glom().collect() len(re_d[0]) # 132 len(re_d[1]) # 265 len(re_d[2]) # 530 len(re_d[3]) # 1060 len(re_d[4]) # 1025 len(re_d[5]) # 145 len(re_d[6]) # 290 len(re_d[7]) # 580 len(re_d[8]) # 1113 len(re_d[9]) # 272 len(re_d[10]) # 522 len(re_d[11]) # 66 

El límite de sobrecarga de memoria que supera el problema creo que se debe a los búferes de DirectMemory utilizados durante la búsqueda. Creo que está arreglado en 2.0.0. (Tuvimos el mismo problema, pero dejamos de profundizar mucho más cuando descubrimos que la actualización a 2.0.0 lo resolvió. Desafortunadamente, no tengo números de Spark que me respalden).


Las particiones desiguales después del repartition son sorprendentes. Contraste con https://github.com/apache/spark/blob/v2.0.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L443 . Spark incluso genera claves aleatorias en el repartition , por lo que no se hace con un hash que podría estar sesgado.

Probé tu ejemplo y obtuve exactamente los mismos resultados con Spark 1.6.2 y Spark 2.0.0. Pero no de Scala:

 scala> val data = sc.parallelize(1 to 3, 3).mapPartitions { it => (1 to it.next * 1000).iterator } data: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at :24 scala> data.mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq res1: Seq[Int] = WrappedArray(1000, 2000, 3000) scala> data.repartition(3).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq res2: Seq[Int] = WrappedArray(1999, 2001, 2000) scala> data.repartition(6).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq res3: Seq[Int] = WrappedArray(999, 1000, 1000, 1000, 1001, 1000) scala> data.repartition(12).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq res4: Seq[Int] = WrappedArray(500, 501, 501, 501, 501, 500, 499, 499, 499, 499, 500, 500) 

¡Qué hermosas particiones!


(Lo siento, no es una respuesta completa. Solo quería compartir mis conclusiones hasta ahora).