Transformación de estilo pandas de datos agrupados en PySpark DataFrame

Si tenemos un dataframe de Pandas que consta de una columna de categorías y una columna de valores, podemos eliminar la media de cada categoría haciendo lo siguiente:

df["DemeanedValues"] = df.groupby("Category")["Values"].transform(lambda g: g - numpy.mean(g)) 

Según tengo entendido, los marcos de datos de Spark no ofrecen directamente esta operación de agrupación / transformación (estoy usando PySpark en Spark 1.5.0). Entonces, ¿cuál es la mejor manera de implementar este cálculo?

He intentado usar un group-by / join de la siguiente manera:

 df2 = df.groupBy("Category").mean("Values") df3 = df2.join(df) 

Pero es muy lento ya que, según tengo entendido, cada categoría requiere un análisis completo del dataframe.

Pienso (pero no he verificado) que puedo acelerar mucho esto si recojo el resultado de group by by mean en un diccionario y luego lo uso en un UDF de la siguiente manera:

 nameToMean = {...} f = lambda category, value: value - nameToMean[category] categoryDemeaned = pyspark.sql.functions.udf(f, pyspark.sql.types.DoubleType()) df = df.withColumn("DemeanedValue", categoryDemeaned(df.Category, df.Value)) 

¿Existe una forma idiomática de express este tipo de operación sin sacrificar el rendimiento?

Entiendo, cada categoría requiere un análisis completo del dataframe.

No, no lo hace. Las agregaciones de DataFrame se realizan utilizando una lógica similar a aggregateByKey . Consulte el grupo de DataFrame por comportamiento / optimización. Una parte más lenta es la join que requiere clasificación / barajado. Pero todavía no requiere escaneo por grupo.

Si este es un código exacto, su uso es lento porque no proporciona una expresión de unión. Por eso simplemente realiza un producto cartesiano. Así que no solo es ineficiente sino también incorrecto. Quieres algo como esto:

 from pyspark.sql.functions import col means = df.groupBy("Category").mean("Values").alias("means") df.alias("df").join(means, col("df.Category") == col("means.Category")) 

Pienso (pero no he verificado) que puedo acelerar mucho esto si recojo el resultado de group by by mean en un diccionario y luego lo uso en un UDF

Es posible, aunque el rendimiento varía según el caso. Un problema con el uso de UDF de Python es que tiene que mover datos hacia y desde Python. Aún así, definitivamente vale la pena intentarlo. nameToMean embargo, debe considerar el uso de una variable de difusión para nameToMean .

¿Existe una forma idiomática de express este tipo de operación sin sacrificar el rendimiento?

En PySpark 1.6 puede utilizar la función de broadcast :

 df.alias("df").join( broadcast(means), col("df.Category") == col("means.Category")) 

pero no está disponible en <= 1.5.

En realidad, hay una forma idiomática de hacer esto en Spark, usando la expresión Hive OVER .

es decir

 df.registerTempTable('df') with_category_means = sqlContext.sql('select *, mean(Values) OVER (PARTITION BY Category) as category_mean from df') 

Bajo el capó, esto está utilizando una función de ventana. Sin embargo, no estoy seguro de si esto es más rápido que tu solución.