PySpark: calcular el máximo de la fila del subconjunto de columnas y agregarlo a un dataframe existente

Me gustaría calcular el máximo de un subconjunto de columnas para cada fila y agregarlo como una nueva columna para el Dataframe existente.

Me las arreglé para hacer esto de una manera muy incómoda:

 def add_colmax(df,subset_columns,colnm): ''' calculate the maximum of the selected "subset_columns" from dataframe df for each row, new column containing row wise maximum is added to dataframe df. df: dataframe. It must contain subset_columns as subset of columns colnm: Name of the new column containing row-wise maximum of subset_columns subset_columns: the subset of columns from w ''' from pyspark.sql.functions import monotonicallyIncreasingId from pyspark.sql import Row def get_max_row_with_None(row): return float(np.max(row)) df_subset = df.select(subset_columns) rdd = df_subset.map( get_max_row_with_None) df_rowsum = rdd.map(Row(colnm)).toDF() df_rowsum = df_rowsum.withColumn("id",monotonicallyIncreasingId()) df = df.withColumn("id",monotonicallyIncreasingId()) df = df.join(df_rowsum,df.id == df_rowsum.id).drop(df.id).drop(df_rowsum.id) return df 

Esta función funciona como:

 rdd1 = sc.parallelize([("foo", 1.0,3.0,None), ("bar", 2.0,2.0,-10), ("baz", 3.3,1.2,10.0)]) df1 = sqlContext.createDataFrame(rdd1, ('v1', 'v2','v3','v4')) df_new = add_colmax(df1,['v2','v3','v4'],"rowsum") df_new.collect() 

devoluciones:

  [Row(v1=u'bar', v2=2.0, v3=2.0, v4=-10, rowsum=2.0), Row(v1=u'baz', v2=3.3, v3=1.2, v4=None, rowsum=3.3), Row(v1=u'foo', v2=1.0, v3=3.0, v4=None, rowsum=3.0)] 

Creo que si pudiera usar funciones definidas por el usuario con withColumn , esto se puede hacer mucho más simple. Pero no pude averiguar cómo hacerlo. Por favor, avíseme si tiene una forma más sencilla de lograr esto. Estoy usando Spark 1.6

Empecemos con un par de importaciones.

 from pyspark.sql.functions import col, lit, coalesce, greatest 

A continuación, defina menos infinito literal:

 minf = lit(float("-inf")) 

Asignar columnas y pasar el resultado al greatest :

 rowmax = greatest(*[coalesce(col(x), minf) for x in ['v2','v3','v4']]) 

Finalmente con withColumn :

 df1.withColumn("rowmax", rowmax) 

con resultado

 +---+---+---+----+------+ | v1| v2| v3| v4|rowmax| +---+---+---+----+------+ |foo|1.0|3.0|null| 3.0| |bar|2.0|2.0| -10| 2.0| |baz|3.3|1.2|null| 3.3| +---+---+---+----+------+ 

Puede usar el mismo patrón con diferentes operaciones por minf reemplazan a minf con un elemento neutral. Por ejemplo:

 rowsum = sum([coalesce(col(x), lit(0)) for x in ['v2','v3','v4']]) 

o:

 from operator import mul from functools import reduce rowproduct = reduce( mul, [coalesce(col(x), lit(1)) for x in ['v2','v3','v4']] ) 

Su propio código podría simplificarse significativamente con udf :

 from pyspark.sql.types import DoubleType from pyspark.sql.functions import udf def get_max_row_with_None_(*cols): return float(max(x for x in cols if x is not None)) get_max_row_with_None = udf(get_max_row_with_None_, DoubleType()) df1.withColumn("rowmax", get_max_row_with_None('v2','v3','v4')) 

Reemplace minf con lit(float("inf")) y greatest con least para obtener el valor más pequeño por fila.