Spark agrega una nueva columna al dataframe con el valor de la fila anterior

Me pregunto cómo puedo lograr lo siguiente en Spark (Pyspark)

Marco de datos inicial:

+--+---+ |id|num| +--+---+ |4 |9.0| +--+---+ |3 |7.0| +--+---+ |2 |3.0| +--+---+ |1 |5.0| +--+---+ 

Marco de datos resultante:

 +--+---+-------+ |id|num|new_Col| +--+---+-------+ |4 |9.0| 7.0 | +--+---+-------+ |3 |7.0| 3.0 | +--+---+-------+ |2 |3.0| 5.0 | +--+---+-------+ 

En general, puedo “agregar” nuevas columnas a un dataframe utilizando algo como: df.withColumn("new_Col", df.num * 10)

Sin embargo, no tengo idea de cómo puedo lograr este “cambio de filas” para la nueva columna, de modo que la nueva columna tenga el valor de un campo de la fila anterior (como se muestra en el ejemplo). Tampoco pude encontrar nada en la documentación de la API sobre cómo acceder a una determinada fila en un DF por índice.

Cualquier ayuda sería apreciada.

Puede utilizar la función de ventana de lag siguiente manera

 from pyspark.sql.functions import lag, col from pyspark.sql.window import Window df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"]) w = Window().partitionBy().orderBy(col("id")) df.select("*", lag("num").over(w).alias("new_col")).na.drop().show() ## +---+---+-------+ ## | id|num|new_col| ## +---+---+-------| ## | 2|3.0| 5.0| ## | 3|7.0| 3.0| ## | 4|9.0| 7.0| ## +---+---+-------+ 

Pero hay algunos problemas importantes:

  1. Si necesita una operación global (no particionada por alguna otra columna / columnas) es extremadamente ineficiente.
  2. Necesita una forma natural de ordenar sus datos.

Si bien el segundo problema casi nunca es un problema, el primero puede ser un factor decisivo. Si este es el caso, simplemente debe convertir su DataFrame a RDD y calcular el lag manualmente. Ver por ejemplo:

  • Cómo transformar datos con la ventana deslizante sobre los datos de series temporales en Pyspark
  • Apache Spark Moving Average (escrito en Scala, pero se puede ajustar para PySpark. Asegúrese de leer los comentarios primero).

Otros enlaces útiles:

  val df = sc.parallelize(Seq((4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0))).toDF("id", "num") df.show +---+---+ | id|num| +---+---+ | 4|9.0| | 3|7.0| | 2|3.0| | 1|5.0| +---+---+ df.withColumn("new_column", lag("num", 1, 0).over(w)).show +---+---+----------+ | id|num|new_column| +---+---+----------+ | 1|5.0| 0.0| | 2|3.0| 5.0| | 3|7.0| 3.0| | 4|9.0| 7.0| +---+---+----------+