Agregar la sum de la columna como nueva columna en el dataframe de PySpark

Estoy usando PySpark y tengo un dataframe Spark con un montón de columnas numéricas. Quiero agregar una columna que sea la sum de todas las demás columnas.

Supongamos que mi dataframe tenía las columnas “a”, “b” y “c”. Sé que puedo hacerlo:

df.withColumn('total_col', df.a + df.b + df.c) 

El problema es que no quiero escribir cada columna individualmente y agregarlas, especialmente si tengo muchas columnas. Quiero poder hacer esto automáticamente o especificando una lista de nombres de columna que quiero agregar. Hay otra manera de hacer esto?

Esto no era obvio. No veo una sum basada en filas de las columnas definidas en la API de marcos de datos de la chispa.

Versión 2

Esto se puede hacer de una manera bastante simple:

 newdf = df.withColumn('total', sum(df[col] for col in df.columns)) 

df.columns es suministrado por pyspark como una lista de cadenas que dan todos los nombres de columna en el Spark Dataframe. Para una sum diferente, puede proporcionar cualquier otra lista de nombres de columna en su lugar.

No probé esto como mi primera solución porque no estaba seguro de cómo se comportaría. Pero funciona.

Versión 1

Esto es demasiado complicado, pero también funciona.

Puedes hacerlo:

  1. use df.columns para obtener una lista de los nombres de las columnas
  2. Usa esa lista de nombres para hacer una lista de las columnas.
  3. pase esa lista a algo que invoque la función de adición sobrecargada de la columna de una manera funcional de tipo plegable

Con python’s reduce , un cierto conocimiento de cómo funciona la sobrecarga del operador y el código pyspark para las columnas aquí se convierte en:

 def column_add(a,b): return a.__add__(b) newdf = df.withColumn('total_col', reduce(column_add, ( df[col] for col in df.columns ) )) 

Tenga en cuenta que esto es una reducción de python, no una reducción de chispa RDD, y el término de paréntesis en el segundo parámetro para reducir requiere el paréntesis porque es una expresión generadora de listas.

Probado, funciona!

 $ pyspark >>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache() >>> df DataFrame[a: bigint, b: bigint, c: bigint] >>> df.columns ['a', 'b', 'c'] >>> def column_add(a,b): ... return a.__add__(b) ... >>> df.withColumn('total', reduce(column_add, ( df[col] for col in df.columns ) )).collect() [Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)] 

La solución

 newdf = df.withColumn('total', sum(df[col] for col in df.columns)) 

Publicado por @Paul trabaja. Sin embargo, estaba recibiendo el error, tantos otros como he visto,

 TypeError: 'Column' object is not callable 

Después de algún tiempo encontré el problema (al menos en mi caso). El problema es que previamente importé algunas funciones de pyspark con la línea

 from pyspark.sql.functions import udf, col, count, sum, when, avg, mean, min 

por lo tanto, la línea importó el comando sum pyspark mientras que df.withColumn('total', sum(df[col] for col in df.columns)) se supone que usa la función de sum python normal.

Puede eliminar la referencia de la función pyspark con del sum .

De lo contrario en mi caso cambié la importación a

 import pyspark.sql.functions as F 

y luego hizo referencia a las funciones como F.sum .

La forma más directa de hacerlo es usar la función expr

 from pyspark.sql.functions import * data = data.withColumn('total', expr("col1 + col2 + col3 + col4")) 

Mi problema fue similar al anterior (un poco más complejo) ya que tuve que agregar sums de columnas consecutivas como nuevas columnas en el dataframe de PySpark. Este enfoque utiliza el código de la versión 1 de Paul anterior:

 import pyspark from pyspark.sql import SparkSession import pandas as pd spark = SparkSession.builder.appName('addColAsCumulativeSUM').getOrCreate() df=spark.createDataFrame(data=[(1,2,3),(4,5,6),(3,2,1)\ ,(6,1,-4),(0,2,-2),(6,4,1)\ ,(4,5,2),(5,-3,-5),(6,4,-1)]\ ,schema=['x1','x2','x3']) df.show() +---+---+---+ | x1| x2| x3| +---+---+---+ | 1| 2| 3| | 4| 5| 6| | 3| 2| 1| | 6| 1| -4| | 0| 2| -2| | 6| 4| 1| | 4| 5| 2| | 5| -3| -5| | 6| 4| -1| +---+---+---+ colnames=df.columns 

Agregue nuevas columnas que sean sums acumulativas (consecutivas):

 for i in range(0,len(colnames)): colnameLst= colnames[0:i+1] colname = 'cm'+ str(i+1) df = df.withColumn(colname, sum(df[col] for col in colnameLst)) 

df.show ()

 +---+---+---+---+---+---+ | x1| x2| x3|cm1|cm2|cm3| +---+---+---+---+---+---+ | 1| 2| 3| 1| 3| 6| | 4| 5| 6| 4| 9| 15| | 3| 2| 1| 3| 5| 6| | 6| 1| -4| 6| 7| 3| | 0| 2| -2| 0| 2| 0| | 6| 4| 1| 6| 10| 11| | 4| 5| 2| 4| 9| 11| | 5| -3| -5| 5| 2| -3| | 6| 4| -1| 6| 10| 9| +---+---+---+---+---+---+ 

Las columnas de ‘sum acumulativa’ añadidas son las siguientes:

 cm1 = x1 cm2 = x1 + x2 cm3 = x1 + x2 + x3