¿Cómo agrego una nueva columna a un Spark DataFrame (usando PySpark)?

Tengo un Spark DataFrame (usando PySpark 1.5.1) y me gustaría agregar una nueva columna.

He intentado lo siguiente sin ningún éxito:

type(randomed_hours) # => list # Create in Python and transform to RDD new_col = pd.DataFrame(randomed_hours, columns=['new_col']) spark_new_col = sqlContext.createDataFrame(new_col) my_df_spark.withColumn("hours", spark_new_col["new_col"]) 

También tengo un error al usar esto:

 my_df_spark.withColumn("hours", sc.parallelize(randomed_hours)) 

Entonces, ¿cómo agrego una nueva columna (basada en el vector Python) a un DataFrame existente con PySpark?

No puede agregar una columna arbitraria a un DataFrame en Spark. Las nuevas columnas solo se pueden crear utilizando literales (otros tipos de literales se describen en ¿Cómo agregar una columna constante en un Spark DataFrame? )

 from pyspark.sql.functions import lit df = sqlContext.createDataFrame( [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3")) df_with_x4 = df.withColumn("x4", lit(0)) df_with_x4.show() ## +---+---+-----+---+ ## | x1| x2| x3| x4| ## +---+---+-----+---+ ## | 1| a| 23.0| 0| ## | 3| B|-23.0| 0| ## +---+---+-----+---+ 

transformando una columna existente:

 from pyspark.sql.functions import exp df_with_x5 = df_with_x4.withColumn("x5", exp("x3")) df_with_x5.show() ## +---+---+-----+---+--------------------+ ## | x1| x2| x3| x4| x5| ## +---+---+-----+---+--------------------+ ## | 1| a| 23.0| 0| 9.744803446248903E9| ## | 3| B|-23.0| 0|1.026187963170189...| ## +---+---+-----+---+--------------------+ 

incluido usando join

 from pyspark.sql.functions import exp lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v")) df_with_x6 = (df_with_x5 .join(lookup, col("x1") == col("k"), "leftouter") .drop("k") .withColumnRenamed("v", "x6")) ## +---+---+-----+---+--------------------+----+ ## | x1| x2| x3| x4| x5| x6| ## +---+---+-----+---+--------------------+----+ ## | 1| a| 23.0| 0| 9.744803446248903E9| foo| ## | 3| B|-23.0| 0|1.026187963170189...|null| ## +---+---+-----+---+--------------------+----+ 

o generado con la función / udf:

 from pyspark.sql.functions import rand df_with_x7 = df_with_x6.withColumn("x7", rand()) df_with_x7.show() ## +---+---+-----+---+--------------------+----+-------------------+ ## | x1| x2| x3| x4| x5| x6| x7| ## +---+---+-----+---+--------------------+----+-------------------+ ## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617| ## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873| ## +---+---+-----+---+--------------------+----+-------------------+ 

En pyspark.sql.functions rendimiento, las funciones pyspark.sql.functions ( pyspark.sql.functions ), que se asignan a la expresión Catalyst, generalmente se prefieren a las funciones definidas por el usuario de Python.

Si desea agregar contenido de un RDD arbitrario como columna, puede

  • agregar números de fila al dataframe existente
  • llame a zipWithIndex en RDD y zipWithIndex a dataframe
  • unir ambos usando el índice como una clave de unión

Para agregar una columna usando un UDF:

 df = sqlContext.createDataFrame( [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3")) from pyspark.sql.functions import udf from pyspark.sql.types import * def valueToCategory(value): if value == 1: return 'cat1' elif value == 2: return 'cat2' ... else: return 'n/a' # NOTE: it seems that calls to udf() must be after SparkContext() is called udfValueToCategory = udf(valueToCategory, StringType()) df_with_cat = df.withColumn("category", udfValueToCategory("x1")) df_with_cat.show() ## +---+---+-----+---------+ ## | x1| x2| x3| category| ## +---+---+-----+---------+ ## | 1| a| 23.0| cat1| ## | 3| B|-23.0| n/a| ## +---+---+-----+---------+ 

Para Spark 2.0

 # assumes schema has 'age' column df.select('*', (df.age + 10).alias('agePlusTen')) 

Me gustaría ofrecer un ejemplo generalizado para un caso de uso muy similar:

Caso de uso: Tengo un CSV que consiste en:

 First|Third|Fifth data|data|data data|data|data ...billion more lines 

Necesito realizar algunas transformaciones y el csv final debe verse como

 First|Second|Third|Fourth|Fifth data|null|data|null|data data|null|data|null|data ...billion more lines 

Necesito hacer esto porque este es el esquema definido por algún modelo y necesito que mis datos finales sean interoperables con las inserciones masivas de SQL y esas cosas.

asi que:

1) Leí el csv original usando spark.read y lo llamé “df”.

2) Hago algo a los datos.

3) Agrego las columnas nulas usando este script:

 outcols = [] for column in MY_COLUMN_LIST: if column in df.columns: outcols.append(column) else: outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column))) df = df.select(outcols) 

De esta manera, puede estructurar su esquema después de cargar un csv (también funcionaría para reordenar columnas si tiene que hacer esto para muchas tablas).

Puede definir un nuevo udf al agregar un column_name :

 u_f = F.udf(lambda :yourstring,StringType()) a.select(u_f().alias('column_name') 
 from pyspark.sql.functions import udf from pyspark.sql.types import * func_name = udf( lambda val: val, # do sth to val StringType() ) df.withColumn('new_col', func_name(df.old_col))