Explicar la funcionalidad agregada en Spark.

Estoy buscando una mejor explicación de la funcionalidad agregada que está disponible a través de spark en python.

El ejemplo que tengo es el siguiente (usando pyspark de la versión Spark 1.2.0)

sc.parallelize([1,2,3,4]).aggregate( (0, 0), (lambda acc, value: (acc[0] + value, acc[1] + 1)), (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))) 

Salida:

 (10, 4) 

Obtengo el resultado esperado (10,4) que es la sum de 1+2+3+4 y 4 elementos. Si cambio el valor inicial pasado a la función agregada a (1,0) desde (0,0) obtengo el siguiente resultado

 sc.parallelize([1,2,3,4]).aggregate( (1, 0), (lambda acc, value: (acc[0] + value, acc[1] + 1)), (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))) 

Salida:

 (19, 4) 

El valor aumenta en 9. Si lo cambio a (2,0) , el valor va a (28,4) y así sucesivamente.

¿Puede alguien explicarme cómo se calcula este valor? Esperaba que el valor subiera en 1, no en 9, esperaba ver (11,4) cambio, estoy viendo (19,4) .

No tengo suficientes puntos de reputación para comentar sobre la respuesta anterior de Maasg. En realidad, el valor cero debe ser ‘neutral’ hacia el seqop, lo que significa que no interferirá con el resultado del seqop, como 0 hacia sumr, o 1 hacia *;

NUNCA debe intentar con valores no neutrales, ya que podrían aplicarse tiempos arbitrarios. Este comportamiento no solo está vinculado a varias particiones.

Intenté el mismo experimento como se indica en la pregunta. con 1 partición, el valor cero se aplicó 3 veces. con 2 particiones, 6 veces. Con 3 particiones, 9 veces y esto continuará.

No estaba totalmente convencido de la respuesta aceptada, y la respuesta de JohnKnight ayudó, así que aquí está mi punto de vista:

Primero, expliquemos el agregado () con mis propias palabras:

Prototipo :

agregado (valor cero, seqOp, combOp)

Descripción :

aggregate() permite tomar un RDD y generar un valor único que es de un tipo diferente al que estaba almacenado en el RDD original.

Parámetros :

  1. zeroValue : el valor de inicialización, para su resultado, en el formato deseado.
  2. seqOp : la operación que desea aplicar a los registros RDD. Se ejecuta una vez por cada registro en una partición.
  3. combOp : define cómo se combOp los objetos resultantes (uno para cada partición).

Ejemplo :

Calcule la sum de una lista y la longitud de esa lista. Devuelve el resultado en un par de (sum, length) .

En un shell de Spark, primero creé una lista con 4 elementos, con 2 particiones :

 listRDD = sc.parallelize([1,2,3,4], 2) 

entonces definí mi seqOp :

 seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) ) 

y mi combOp :

 combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) ) 

y luego agregué:

 listRDD.aggregate( (0, 0), seqOp, combOp) Out[8]: (10, 4) 

Como puede ver, le di nombres descriptivos a mis variables, pero permítame explicarlo más:

La primera partición tiene la lista secundaria [1, 2]. Aplicaremos el seqOp a cada elemento de esa lista y esto producirá un resultado local, un par de (sum, length) , que reflejará el resultado localmente, solo en esa primera partición.

Entonces, comencemos: local_result se inicializa con el parámetro zeroValue que proporcionamos el aggregate() , es decir (0, 0) y list_element es el primer elemento de la lista, es decir 1. Como resultado, esto es lo que sucede:

 0 + 1 = 1 0 + 1 = 1 

Ahora, el resultado local es (1, 1), eso significa que, hasta ahora, para la primera partición, después de procesar solo el primer elemento, la sum es 1 y la longitud 1. Observe que local_result se actualiza de (0, 0), a (1, 1).

 1 + 2 = 3 1 + 1 = 2 

y ahora el resultado local es (3, 2), que será el resultado final de la primera partición, ya que no hay otros elementos en la lista secundaria de la primera partición.

Haciendo lo mismo para la segunda partición, obtenemos (7, 2).

Ahora aplicamos el combOp a cada resultado local, para que podamos formar el resultado global final, como este: (3,2) + (7,2) = (10, 4)


Ejemplo descrito en ‘figura’:

  (0, 0) <-- zeroValue [1, 2] [3, 4] 0 + 1 = 1 0 + 3 = 3 0 + 1 = 1 0 + 1 = 1 1 + 2 = 3 3 + 4 = 7 1 + 1 = 2 1 + 1 = 2 | | vv (3, 2) (7, 2) \ / \ / \ / \ / \ / \ / ------------ | combOp | ------------ | v (10, 4) 

Inspirado en este gran ejemplo .


Entonces, si el valor de zeroValue no es (0, 0), pero (1, 0), uno esperaría obtener (8 + 4, 2 + 2) = (12, 4), lo que no explica lo que experimenta. Incluso si modificamos el número de particiones de mi ejemplo, no podré volver a obtenerlo.

La clave aquí es la respuesta de JohnKnight, que establece que el valor zeroValue no es solo análogo al número de particiones, sino que puede aplicarse más veces de lo que espera.

El agregado le permite transformar y combinar los valores del RDD a voluntad.

Utiliza dos funciones:

El primero transforma y agrega los elementos de la colección original [T] en un agregado local [U] y toma la forma: (U, T) => U. Puede verlo como un pliegue y, por lo tanto, también requiere un cero. Para esa operación. Esta operación se aplica localmente a cada partición en paralelo.

Aquí es donde radica la clave de la pregunta: el único valor que debe usarse aquí es el valor CERO para la operación de reducción. Esta operación se ejecuta localmente en cada partición, por lo tanto, agregar cualquier valor a ese valor cero se sumrá al resultado multiplicado por el número de particiones del RDD.

La segunda operación toma 2 valores del tipo de resultado de la operación anterior [U] y lo combina en un solo valor. Esta operación reducirá los resultados parciales de cada partición y producirá el total real.

Por ejemplo: Dado un RDD de cadenas:

 val rdd:RDD[String] = ??? 

Digamos que desea agregar la longitud de las cadenas en ese RDD, por lo que haría:

1) La primera operación transformará las cadenas en tamaño (int) y acumulará los valores para el tamaño.

 val stringSizeCummulator: (Int, String) => Int = (total, string) => total + string.lenght` 

2) proporcionar el CERO para la operación de adición (0)

 val ZERO = 0 

3) una operación para sumr dos enteros juntos:

 val add: (Int, Int) => Int = _ + _ 

Poniendolo todo junto:

 rdd.aggregate(ZERO, stringSizeCummulator, add) 

Entonces, ¿por qué se necesita el CERO? Cuando la función cummulator se aplica al primer elemento de una partición, no hay un total acumulado. ZERO se utiliza aquí.

P.ej. Mi RDD es: – Partición 1: [“Salto”, “sobre”] – Partición 2: [“el”, “muro”]

Esto resultará:

P1:

  1. stringSizeCummulator (ZERO, “Jump”) = 4
  2. stringSizeCummulator (4, “over”) = 8

P2:

  1. stringSizeCummulator (ZERO, “the”) = 3
  2. stringSizeCummulator (3, “wall”) = 7

Reducir: agregar (P1, P2) = 15

Grandes explicaciones, realmente me ayudó a comprender el funcionamiento subyacente de la función agregada. He jugado con él durante algún tiempo y lo descubrí a continuación.

  • Si está utilizando el acc como (0,0), entonces no cambiará el resultado de la salida de la función.

  • Si se cambia el acumulador inicial, se procesará el resultado como se muestra a continuación.

[sum de elementos RDD + valor inicial acc * No. de particiones RDD + valor inicial acc]

para la pregunta aquí, sugeriría que verifique las particiones ya que el número de particiones debería ser 8, de acuerdo con mi entendimiento, ya que cada vez que procesamos la secuencia en una partición de RDD, comenzará con la sum inicial del resultado acc y también cuando va a hacer el peine Op. Volverá a usar el valor inicial acc una vez.

por ejemplo, lista (1,2,3,4) y acc (1,0)

Obtener particiones en scala por RDD.partitions.size

si las particiones son 2 y el número de elementos es 4, entonces => [10 + 1 * 2 + 1] => (13,4)

si la partición es 4 y el número de elementos es 4, entonces => [10 + 1 * 4 + 1] => (15,4)

Espero que esto ayude, puede consultar aquí para una explicación. Gracias.

Puede usar el siguiente código (en scala) para ver con precisión qué está haciendo el aggregate . Construye un árbol de todas las operaciones de sum y fusión:

 sealed trait Tree[+A] case class Leaf[A](value: A) extends Tree[A] case class Branch[A](left: Tree[A], right: Tree[A]) extends Tree[A] val zero : Tree[Int] = Leaf(0) val rdd = sc.parallelize(1 to 4).repartition(3) 

Y luego, en la concha:

 scala> rdd.glom().collect() res5: Array[Array[Int]] = Array(Array(4), Array(1, 2), Array(3)) 

Entonces, tenemos estas 3 particiones: [4], [1,2] y [3].

 scala> rdd.aggregate(zero)((l,r)=>Branch(l, Leaf(r)), (l,r)=>Branch(l,r)) res11: Tree[Int] = Branch(Branch(Branch(Leaf(0),Branch(Leaf(0),Leaf(4))),Branch(Leaf(0),Leaf(3))),Branch(Branch(Leaf(0),Leaf(1)),Leaf(2))) 

Puedes representar el resultado como un árbol:

 + | \__________________ + + | \________ | \ + + + 2 | \ | \ | \ 0 + 0 3 0 1 | \ 0 4 

Puede ver que se crea un primer elemento cero en el nodo del controlador (a la izquierda del árbol) y, a continuación, los resultados de todas las particiones se fusionan uno por uno. También verá que si reemplaza 0 por 1 como lo hizo en su pregunta, agregará 1 a cada resultado en cada partición y también agregará 1 al valor inicial en el controlador. Entonces, el número total de tiempo que se usa el valor cero que das es:

number of partitions + 1 .

Así, en tu caso, el resultado de

 aggregate( (X, Y), (lambda acc, value: (acc[0] + value, acc[1] + 1)), (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))) 

estarán:

 (sum(elements) + (num_partitions + 1)*X, count(elements) + (num_partitions + 1)*Y) 

La implementación del aggregate es bastante simple. Se define en RDD.scala, línea 1107 :

  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance()) val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) sc.runJob(this, aggregatePartition, mergeResult) jobResult } 

Para las personas que buscan el código Equivalente de Scala para el ejemplo anterior, aquí está. Misma lógica, misma entrada / resultado.

 scala> val listRDD = sc.parallelize(List(1,2,3,4), 2) listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :21 scala> listRDD.collect() res7: Array[Int] = Array(1, 2, 3, 4) scala> listRDD.aggregate((0,0))((acc, value) => (acc._1+value,acc._2+1),(acc1,acc2) => (acc1._1+acc2._1,acc1._2+acc2._2)) res10: (Int, Int) = (10,4) 

Intento muchos experimentos sobre esta pregunta. Es mejor establecer el número de partición para el agregado. el seqOp procesará cada partición y aplicará el valor inicial. Además, combOp también aplicará el valor inicial cuando combine todas las particiones. Entonces, les presento el formato para esta pregunta:

 final result = sum(list) + num_Of_Partitions * initial_Value + 1 

Gracias a las gsamaras.

Mi gráfico de vista es como abajo, introduzca la descripción de la imagen aquí