agrupando filas consecutivas en el dataframe de PySpark

Tengo el siguiente ejemplo Spark DataFrame:

rdd = sc.parallelize([(1,"19:00:00", "19:30:00", 30), (1,"19:30:00", "19:40:00", 10),(1,"19:40:00", "19:43:00", 3), (2,"20:00:00", "20:10:00", 10), (1,"20:05:00", "20:15:00", 10),(1,"20:15:00", "20:35:00", 20)]) df = spark.createDataFrame(rdd, ["user_id", "start_time", "end_time", "duration"]) df.show() +-------+----------+--------+--------+ |user_id|start_time|end_time|duration| +-------+----------+--------+--------+ | 1| 19:00:00|19:30:00| 30| | 1| 19:30:00|19:40:00| 10| | 1| 19:40:00|19:43:00| 3| | 2| 20:00:00|20:10:00| 10| | 1| 20:05:00|20:15:00| 10| | 1| 20:15:00|20:35:00| 20| +-------+----------+--------+--------+ 

Quiero agrupar filas consecutivas en función de las horas de inicio y finalización. Por ejemplo, para el mismo user_id, si la hora de inicio de una fila es la misma que la hora de finalización de la fila anterior, quiero agruparlos y sumr la duración.

El resultado deseado es:

 +-------+----------+--------+--------+ |user_id|start_time|end_time|duration| +-------+----------+--------+--------+ | 1| 19:00:00|19:43:00| 43| | 2| 20:00:00|20:10:00| 10| | 1| 20:05:00|20:35:00| 30| +-------+----------+--------+--------+ 

Las primeras tres filas del dataframe se agruparon porque todas corresponden a user_id 1 y las horas de inicio y finalización forman una línea de tiempo continua.

Este fue mi enfoque inicial:

Utilice la función de retraso para obtener la próxima hora de inicio:

 from pyspark.sql.functions import * from pyspark.sql import Window import sys # compute next start time window = Window.partitionBy('user_id').orderBy('start_time') df = df.withColumn("next_start_time", lag(df.start_time, -1).over(window)) df.show() +-------+----------+--------+--------+---------------+ |user_id|start_time|end_time|duration|next_start_time| +-------+----------+--------+--------+---------------+ | 1| 19:00:00|19:30:00| 30| 19:30:00| | 1| 19:30:00|19:40:00| 10| 19:40:00| | 1| 19:40:00|19:43:00| 3| 20:05:00| | 1| 20:05:00|20:15:00| 10| 20:15:00| | 1| 20:15:00|20:35:00| 20| null| | 2| 20:00:00|20:10:00| 10| null| +-------+----------+--------+--------+---------------+ 

obtener la diferencia entre la hora de finalización de la fila actual y la hora de inicio de la siguiente fila:

 time_fmt = "HH:mm:ss" timeDiff = unix_timestamp('next_start_time', format=time_fmt) - unix_timestamp('end_time', format=time_fmt) df = df.withColumn("difference", timeDiff) df.show() +-------+----------+--------+--------+---------------+----------+ |user_id|start_time|end_time|duration|next_start_time|difference| +-------+----------+--------+--------+---------------+----------+ | 1| 19:00:00|19:30:00| 30| 19:30:00| 0| | 1| 19:30:00|19:40:00| 10| 19:40:00| 0| | 1| 19:40:00|19:43:00| 3| 20:05:00| 1320| | 1| 20:05:00|20:15:00| 10| 20:15:00| 0| | 1| 20:15:00|20:35:00| 20| null| null| | 2| 20:00:00|20:10:00| 10| null| null| +-------+----------+--------+--------+---------------+----------+ 

Ahora mi idea era usar la función de sum con una ventana para obtener la sum acumulativa de la duración y luego hacer un grupoBy. Pero mi enfoque fue defectuoso por muchas razones.

Aquí hay un enfoque:

Reúna filas en grupos donde un grupo es un conjunto de filas con el mismo user_id que son consecutivos ( start_time coincide con end_time anterior). Luego puedes usar este group para hacer tu agregación.

Una forma de llegar aquí es mediante la creación de columnas de indicadores intermedios para decirle si el usuario ha cambiado o si el tiempo no es consecutivo. Luego realice una sum acumulativa sobre la columna del indicador para crear el group .

Por ejemplo:

 import pyspark.sql.functions as f from pyspark.sql import Window w1 = Window.orderBy("start_time") df = df.withColumn( "userChange", (f.col("user_id") != f.lag("user_id").over(w1)).cast("int") )\ .withColumn( "timeChange", (f.col("start_time") != f.lag("end_time").over(w1)).cast("int") )\ .fillna( 0, subset=["userChange", "timeChange"] )\ .withColumn( "indicator", (~((f.col("userChange") == 0) & (f.col("timeChange")==0))).cast("int") )\ .withColumn( "group", f.sum(f.col("indicator")).over(w1.rangeBetween(Window.unboundedPreceding, 0)) ) df.show() #+-------+----------+--------+--------+----------+----------+---------+-----+ #|user_id|start_time|end_time|duration|userChange|timeChange|indicator|group| #+-------+----------+--------+--------+----------+----------+---------+-----+ #| 1| 19:00:00|19:30:00| 30| 0| 0| 0| 0| #| 1| 19:30:00|19:40:00| 10| 0| 0| 0| 0| #| 1| 19:40:00|19:43:00| 3| 0| 0| 0| 0| #| 2| 20:00:00|20:10:00| 10| 1| 1| 1| 1| #| 1| 20:05:00|20:15:00| 10| 1| 1| 1| 2| #| 1| 20:15:00|20:35:00| 20| 0| 0| 0| 2| #+-------+----------+--------+--------+----------+----------+---------+-----+ 

Ahora que tenemos la columna del group , podemos agregar lo siguiente para obtener el resultado deseado:

 df.groupBy("user_id", "group")\ .agg( f.min("start_time").alias("start_time"), f.max("end_time").alias("end_time"), f.sum("duration").alias("duration") )\ .drop("group")\ .show() #+-------+----------+--------+--------+ #|user_id|start_time|end_time|duration| #+-------+----------+--------+--------+ #| 1| 19:00:00|19:43:00| 43| #| 1| 20:05:00|20:35:00| 30| #| 2| 20:00:00|20:10:00| 10| #+-------+----------+--------+--------+ 

Aquí hay una solución de trabajo derivada de la respuesta de Pault:

Crear el dataframe:

 rdd = sc.parallelize([(1,"19:00:00", "19:30:00", 30), (1,"19:30:00", "19:40:00", 10),(1,"19:40:00", "19:43:00", 3), (2,"20:00:00", "20:10:00", 10), (1,"20:05:00", "20:15:00", 10),(1,"20:15:00", "20:35:00", 20)]) df = spark.createDataFrame(rdd, ["user_id", "start_time", "end_time", "duration"]) df.show() +-------+----------+--------+--------+ |user_id|start_time|end_time|duration| +-------+----------+--------+--------+ | 1| 19:00:00|19:30:00| 30| | 1| 19:30:00|19:40:00| 10| | 1| 19:40:00|19:43:00| 3| | 1| 20:05:00|20:15:00| 10| | 1| 20:15:00|20:35:00| 20| +-------+----------+--------+--------+ 

Cree una columna de indicador que indique cuándo ha cambiado la hora, y use la sum acumulada para dar a cada grupo una identificación única:

 import pyspark.sql.functions as f from pyspark.sql import Window w1 = Window.partitionBy('user_id').orderBy('start_time') df = df.withColumn( "indicator", (f.col("start_time") != f.lag("end_time").over(w1)).cast("int") )\ .fillna( 0, subset=[ "indicator"] )\ .withColumn( "group", f.sum(f.col("indicator")).over(w1.rangeBetween(Window.unboundedPreceding, 0)) ) df.show() +-------+----------+--------+--------+---------+-----+ |user_id|start_time|end_time|duration|indicator|group| +-------+----------+--------+--------+---------+-----+ | 1| 19:00:00|19:30:00| 30| 0| 0| | 1| 19:30:00|19:40:00| 10| 0| 0| | 1| 19:40:00|19:43:00| 3| 0| 0| | 1| 20:05:00|20:15:00| 10| 1| 1| | 1| 20:15:00|20:35:00| 20| 0| 1| +-------+----------+--------+--------+---------+-----+ 

Ahora GroupBy en la identificación de usuario y la variable de grupo.

 +-------+----------+--------+--------+ |user_id|start_time|end_time|duration| +-------+----------+--------+--------+ | 1| 19:00:00|19:43:00| 43| | 1| 20:05:00|20:35:00| 30| +-------+----------+--------+--------+