¿La función distinta () de la chispa mezcla solo las tuplas distintas de cada partición?

Como entiendo notado (), el hash particiona el RDD para identificar las claves únicas. ¿Pero se optimiza al mover solo las diferentes tuplas por partición?

Imagina un RDD con las siguientes particiones

  1. [1, 2, 2, 1, 4, 2, 2]
  2. [1, 3, 3, 5, 4, 5, 5, 5]

De manera distinta en este RDD, ¿todas las claves duplicadas (2 en las particiones 1 y 5 en la partición 2) se barajarán en su partición de destino o solo las claves por partición se barajarán en el objective?

Si todas las teclas se barajan, entonces un agregado () con operaciones set () reducirá el orden aleatorio.

def set_update(u, v): u.add(v) return u rdd.aggregate(set(), set_update, lambda u1,u2: u1|u2) 

unique se implementa a través de reduceByKey en reduceByKey (element, None) . Así que baraja solo valores únicos por partición. Sin embargo, si el número de duplicados es bajo, sigue siendo una operación bastante costosa.

Hay situaciones en las que usar set puede ser útil. En particular, si llama a distinct en PairwseRDD , puede preferir combineByKey / combineByKey en combineByKey lugar para lograr la deduplicación y la partición por clave al mismo tiempo. En particular, considere el siguiente código:

 rdd1 = sc.parallelize([("foo", 1), ("foo", 1), ("bar", 1)]) rdd2 = sc.parallelize([("foo", "x"), ("bar", "y")]) rdd1.distinct().join(rdd2) 

Tiene que barajar rdd1 dos veces, una para join y una para join . En su lugar puedes usar combineByKey :

 def flatten(kvs): (key, (left, right)) = kvs for v in left: yield (key, (v, right)) aggregated = (rdd1 .aggregateByKey(set(), set_update, lambda u1, u2: u1 | u2)) rdd2_partitioned = rdd2.partitionBy(aggregated.getNumPartitions()) (aggregated.join(rdd2_partitioned) .flatMap(flatten)) 

Nota :

join lógica de join es un poco diferente en Scala que en Python (PySpark usa union seguida de groupByKey , vea Spark RDD groupByKey + join vs join performance para Python y Scala DAGs), por lo tanto, tenemos que particionar manualmente el segundo RDD antes de llamar a join .