Agregar una nueva columna en el Marco de datos derivado de otras columnas (Spark)

Estoy usando Spark 1.3.0 y Python. Tengo un dataframe y deseo agregar una columna adicional que se deriva de otras columnas. Me gusta esto,

>>old_df.columns [col_1, col_2, ..., col_m] >>new_df.columns [col_1, col_2, ..., col_m, col_n] 

dónde

 col_n = col_3 - col_4 

¿Cómo hago esto en PySpark?

Una forma de lograrlo es usar el método de withColumn :

 old_df = sqlContext.createDataFrame(sc.parallelize( [(0, 1), (1, 3), (2, 5)]), ('col_1', 'col_2')) new_df = old_df.withColumn('col_n', old_df.col_1 - old_df.col_2) 

Alternativamente, puede usar SQL en una tabla registrada:

 old_df.registerTempTable('old_df') new_df = sqlContext.sql('SELECT *, col_1 - col_2 AS col_n FROM old_df') 

Adicionalmente, podemos usar udf.

 from pyspark.sql.functions import udf,col from pyspark.sql.types import IntegerType from pyspark import SparkContext from pyspark.sql import SQLContext sc = SparkContext() sqlContext = SQLContext(sc) old_df = sqlContext.createDataFrame(sc.parallelize( [(0, 1), (1, 3), (2, 5)]), ('col_1', 'col_2')) function = udf(lambda col1, col2 : col1-col2, IntegerType()) new_df = old_df.withColumn('col_n',function(col('col_1'), col('col_2'))) new_df.show()