Funciones de la ventana de chispa – rango Entre fechas

Estoy teniendo un Spark SQL DataFrame con datos y lo que estoy tratando de obtener son todas las filas que preceden a la fila actual en un rango de fechas determinado. Entonces, por ejemplo, quiero tener todas las filas de 7 días anteriores a la fila dada. Me di cuenta de que necesito usar una Window Function como:

 Window \ .partitionBy('id') \ .orderBy('start') 

Y aquí viene el problema. Quiero tener un rangeBetween de rangeBetween 7 días, pero no hay nada en la documentación de Spark que pueda encontrar sobre esto. ¿Spark incluso proporciona tal opción? Por ahora solo estoy obteniendo todas las filas anteriores con:

 .rowsBetween(-sys.maxsize, 0) 

pero quisiera lograr algo como:

 .rangeBetween("7 days", 0) 

Si alguien me puede ayudar en este caso, estaré muy agradecido. ¡Gracias por adelantado!

    Related of "Funciones de la ventana de chispa – rango Entre fechas"

    Chispa> = 2.3

    Desde Spark 2.3 es posible usar objetos de intervalo usando la API de SQL, pero el DataFrame API DataFrame todavía está en proceso .

     df.createOrReplaceTempView("df") spark.sql( """SELECT *, mean(some_value) OVER ( PARTITION BY id ORDER BY CAST(start AS timestamp) RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW ) AS mean FROM df""").show() ## +---+----------+----------+------------------+ ## | id| start|some_value| mean| ## +---+----------+----------+------------------+ ## | 1|2015-01-01| 20.0| 20.0| ## | 1|2015-01-06| 10.0| 15.0| ## | 1|2015-01-07| 25.0|18.333333333333332| ## | 1|2015-01-12| 30.0|21.666666666666668| ## | 2|2015-01-01| 5.0| 5.0| ## | 2|2015-01-03| 30.0| 17.5| ## | 2|2015-02-01| 20.0| 20.0| ## +---+----------+----------+------------------+ 

    Chispa <2.3

    Por lo que sé, no es posible directamente ni en Spark ni en Hive. Ambos requieren que la cláusula ORDER BY utilizada con RANGE sea ​​numérica. Lo más cercano que encontré es la conversión a la marca de tiempo y el funcionamiento en segundos. Suponiendo que la columna de start contiene el tipo de date :

     from pyspark.sql import Row row = Row("id", "start", "some_value") df = sc.parallelize([ row(1, "2015-01-01", 20.0), row(1, "2015-01-06", 10.0), row(1, "2015-01-07", 25.0), row(1, "2015-01-12", 30.0), row(2, "2015-01-01", 5.0), row(2, "2015-01-03", 30.0), row(2, "2015-02-01", 20.0) ]).toDF().withColumn("start", col("start").cast("date")) 

    Un pequeño ayudante y definición de ventana:

     from pyspark.sql.window import Window from pyspark.sql.functions import mean, col # Hive timestamp is interpreted as UNIX timestamp in seconds* days = lambda i: i * 86400 

    Finalmente consulta:

     w = (Window() .partitionBy(col("id")) .orderBy(col("start").cast("timestamp").cast("long")) .rangeBetween(-days(7), 0)) df.select(col("*"), mean("some_value").over(w).alias("mean")).show() ## +---+----------+----------+------------------+ ## | id| start|some_value| mean| ## +---+----------+----------+------------------+ ## | 1|2015-01-01| 20.0| 20.0| ## | 1|2015-01-06| 10.0| 15.0| ## | 1|2015-01-07| 25.0|18.333333333333332| ## | 1|2015-01-12| 30.0|21.666666666666668| ## | 2|2015-01-01| 5.0| 5.0| ## | 2|2015-01-03| 30.0| 17.5| ## | 2|2015-02-01| 20.0| 20.0| ## +---+----------+----------+------------------+ 

    Lejos de ser bonita pero funciona.


    * Manual del lenguaje Hive, tipos