Hacia la limitación del gran RDD.

Estoy leyendo muchas imágenes y me gustaría trabajar en un pequeño subconjunto de ellas para desarrollarlas. Como resultado, estoy tratando de entender cómo chispa y python podrían hacer que eso suceda:

In [1]: d = sqlContext.read.parquet('foo') In [2]: d.map(lambda x: x.photo_id).first() Out[2]: u'28605' In [3]: d.limit(1).map(lambda x: x.photo_id) Out[3]: PythonRDD[31] at RDD at PythonRDD.scala:43 In [4]: d.limit(1).map(lambda x: x.photo_id).first() // still running... 

..so que esta pasando? Espero que el límite () se ejecute mucho más rápido de lo que teníamos en [2] , pero ese no es el caso * .

A continuación describiré mi comprensión y, por favor, corríjame, ya que obviamente me falta algo:

  1. d es un RDD de pares (lo sé por el esquema) y lo digo con la función de mapa:

    i) Tome cada par (que se llamará x y photo_id atributo photo_id ).

    ii) Eso resultará en un nuevo RDD (anónimo), en el que estamos aplicando el first() método first() , que no estoy seguro de cómo funciona $ , pero debería darme el primer elemento de ese RDD anónimo.

  2. En [3] , limitamos el d RDD a 1, lo que significa que a pesar de que d tiene muchos elementos, use solo 1 y aplique la función de mapa a ese único elemento. El Out [3] debe ser el RDD creado por el mapeo.

  3. En [4] , esperaría seguir la lógica de [3] y simplemente imprimir el único elemento del RDD limitado …

Como se esperaba, después de mirar el monitor, [4] parece procesar todo el conjunto de datos , mientras que los otros no, así que parece que no estoy usando limit() correctamente, o que eso no es lo que estoy buscando:

introduzca la descripción de la imagen aquí


Editar:

 tiny_d = d.limit(1).map(lambda x: x.photo_id) tiny_d.map(lambda x: x.photo_id).first() 

El primero otorgará un PipelinedRDD , que, como se describe aquí , no realizará ninguna acción , solo una transformación.

Sin embargo, la segunda línea también procesará el conjunto de datos completo (de hecho, el número de Tareas ahora son tantas como antes, más una).


* [2] ejecutado instantáneamente, mientras que [4] todavía se está ejecutando y> 3h han pasado ..

$ No pude encontrarlo en la documentación, por el nombre.

Basado en su código, aquí hay un caso de prueba más simple en Spark 2.0

 case class my (x: Int) val rdd = sc.parallelize(0.until(10000), 1000).map { x => my(x) } val df1 = spark.createDataFrame(rdd) val df2 = df1.limit(1) df1.map { r => r.getAs[Int](0) }.first df2.map { r => r.getAs[Int](0) }.first // Much slower than the previous line 

En realidad, Dataset.first es equivalente a Dataset.limit (1) .collect, así que verifique el plan físico de los dos casos:

 scala> df1.map { r => r.getAs[Int](0) }.limit(1).explain == Physical Plan == CollectLimit 1 +- *SerializeFromObject [input[0, int, true] AS value#124] +- *MapElements , obj#123: int +- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#122: org.apache.spark.sql.Row +- Scan ExistingRDD[x#74] scala> df2.map { r => r.getAs[Int](0) }.limit(1).explain == Physical Plan == CollectLimit 1 +- *SerializeFromObject [input[0, int, true] AS value#131] +- *MapElements , obj#130: int +- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#129: org.apache.spark.sql.Row +- *GlobalLimit 1 +- Exchange SinglePartition +- *LocalLimit 1 +- Scan ExistingRDD[x#74] 

Para el primer caso, está relacionado con una optimización en el operador físico CollectLimitExec. Es decir, primero buscará la primera partición para obtener el número límite de filas, 1 en este caso, si no está satisfecho, luego obtendrá más particiones, hasta que se scope el límite deseado. Por lo general, si la primera partición no está vacía, solo se calculará y recuperará la primera partición. Otras particiones ni siquiera serán computadas.

Sin embargo, en el segundo caso, la optimización en CollectLimitExec no ayuda, porque la operación de límite anterior involucra una operación aleatoria. Se calcularán todas las particiones y se ejecutará LocalLimit (1) en cada partición para obtener 1 fila, y luego todas las particiones se barajarán en una sola partición. CollectLimitExec recuperará 1 fila de la partición única resultante.