PySpark – Superposición de tiempo para objeto en RDD

Mi objective es agrupar objetos en función de la superposición de tiempo.

Cada objeto en mi rdd contiene un start_time y end_time .

Probablemente estoy haciendo esto de manera ineficiente, pero lo que planeo hacer es asignar una identificación de superposición a cada objeto en función de si se ha superpuesto en algún momento con alguno de los otros objetos. Tengo la lógica del tiempo se superponen. Entonces, espero overlap_id por ese overlap_id .

Así que primero

 mapped_rdd = rdd.map(assign_overlap_id) final_rdd = mapped_rdd.reduceByKey(combine_objects) 

Ahora esto viene a mi pregunta. ¿Cómo puedo escribir la función assign_overlap_id?

 def assign_overlap_id(x): ... ... return (overlap_id, x) 

Solución ingenua utilizando Spark SQL y Data Frames:

Scala:

 import org.apache.spark.sql.functions.udf case class Interval(start_time: Long, end_time: Long) val rdd = sc.parallelize( Interval(0, 3) :: Interval(1, 4) :: Interval(2, 5) :: Interval(3, 4) :: Interval(5, 8) :: Interval(7, 10) :: Nil ) val df = sqlContext.createDataFrame(rdd) // Simple check if a given intervals overlap def overlaps(start_first: Long, end_first: Long, start_second: Long, end_second: Long):Boolean = { (start_second > start_first & start_second < end_first) | (end_second > start_first & end_second < end_first) } // Register udf and data frame aliases // It look like Spark SQL doesn't support // aliases in FROM clause [1] so we have to // register df twice sqlContext.udf.register("overlaps", overlaps) df.registerTempTable("df1") df.registerTempTable("df2") // Join and filter sqlContext.sql(""" SELECT * FROM df1 JOIN df2 WHERE overlaps(df1.start_time, df1.end_time, df2.start_time, df2.end_time) """).show 

Y lo mismo usando PySpark.

 from pyspark.sql.functions import udf from pyspark.sql.types import BooleanType rdd = sc.parallelize([ (0, 3), (1, 4), (2, 5), (3, 4), (5, 8), (7, 10) ]) df = sqlContext.createDataFrame(rdd, ('start_time', 'end_time')) def overlaps(start_first, end_first, start_second, end_second): return ((start_first < start_second < end_first) or (start_first < end_second < end_first)) sqlContext.registerFunction('overlaps', overlaps, BooleanType()) df.registerTempTable("df1") df.registerTempTable("df2") sqlContext.sql(""" SELECT * FROM df1 JOIN df2 WHERE overlaps(df1.start_time, df1.end_time, df2.start_time, df2.end_time) """).show() 

Transformaciones de bajo nivel con agrupación por ventana.

Un enfoque un poco más inteligente es generar pares candidatos utilizando una ventana de un ancho específico. Aquí hay una solución bastante simplificada:

Scala:

 // Generates list of "buckets" for a given interval def genRange(interval: Interval) = interval match { case Interval(start_time, end_time) => { (start_time / 10L * 10L) to (((end_time / 10) + 1) * 10) by 1 } } // For each interval generate pairs (bucket, interval) val pairs = rdd.flatMap( (i: Interval) => genRange(i).map((r) => (r, i))) // Join (in the worst case scenario it is still O(n^2) // But in practice should be better than a naive // Cartesian product val candidates = pairs. join(pairs). map({ case (k, (Interval(s1, e1), Interval(s2, e2))) => (s1, e1, s2, e2) }).distinct // For each candidate pair check if there is overlap candidates.filter { case (s1, e1, s2, e2) => overlaps(s1, e1, s2, e2) } 

Pitón:

 def genRange(start_time, end_time): return xrange(start_time / 10L * 10L, ((end_time / 10) + 1) * 10) pairs = rdd.flatMap(lambda (s, e): ((r, (s, e)) for r in genRange(s, e))) candidates = (pairs .join(pairs) .map(lambda (k, ((s1, e1), (s2, e2))): (s1, e1, s2, e2)) .distinct()) candidates.filter(lambda (s1, e1, s2, e2): overlaps(s1, e1, s2, e2)) 

Si bien puede ser suficiente en algunas pruebas de datos para una solución lista para producción, debe considerar implementar algún algoritmo de vanguardia como NCList .

  1. http://docs.datastax.com/en/datastax_enterprise/4.6/datastax_enterprise/spark/sparkSqlSupportedSyntax.html