Actualización de una columna de dataframe en chispa

En cuanto a la nueva api de marcos de datos de chispa, no está claro si es posible modificar las columnas de marcos de datos.

¿Cómo haría para cambiar un valor en la fila x columna y de un dataframe?

En pandas esto sería df.ix[x,y] = new_value

Edición: Consolidando lo que se dijo a continuación, no puede modificar el dataframe existente ya que es inmutable, pero puede devolver un nuevo dataframe con las modificaciones deseadas.

Si solo desea reemplazar un valor en una columna basado en una condición, como np.where :

 from pyspark.sql import functions as F update_func = (F.when(F.col('update_col') == replace_val, new_value) .otherwise(F.col('update_col'))) df = df.withColumn('new_column_name', update_func) 

Si desea realizar alguna operación en una columna y crear una nueva columna que se agregue al dataframe:

 import pyspark.sql.functions as F import pyspark.sql.types as T def my_func(col): do stuff to column here return transformed_value # if we assume that my_func returns a string my_udf = F.UserDefinedFunction(my_func, T.StringType()) df = df.withColumn('new_column_name', my_udf('update_col')) 

Si desea que la nueva columna tenga el mismo nombre que la columna anterior, puede agregar el paso adicional:

 df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col') 

Si bien no puede modificar una columna como tal, puede operar en una columna y devolver un nuevo DataFrame que refleje ese cambio. Para eso, primero debe crear un UserDefinedFunction implemente la operación para aplicar y luego aplicar selectivamente esa función solo a la columna de destino. En Python:

 from pyspark.sql.functions import UserDefinedFunction from pyspark.sql.types import StringType name = 'target_column' udf = UserDefinedFunction(lambda x: 'new_value', StringType()) new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns]) 

new_df ahora tiene el mismo esquema que old_df (asumiendo que old_df.target_column era de tipo StringType ) pero todos los valores en la columna target_column serán target_column .

Normalmente al actualizar una columna, queremos asignar un valor antiguo a un nuevo valor. Aquí hay una manera de hacer eso en pyspark sin UDF:

 # update df[update_col], mapping old_value --> new_value from pyspark.sql import functions as F df = df.withColumn(update_col, F.when(df[update_col]==old_value,new_value). otherwise(df[update_col])). 

DataFrames están basados ​​en RDDs. Los RDD son estructuras inmutables y no permiten la actualización de elementos en el sitio. Para cambiar los valores, deberá crear un nuevo DataFrame transformando el original usando el DSL similar a SQL o las operaciones RDD como el map .

Una plataforma de diapositivas altamente recomendada: Introducción de DataFrames en Spark para la ciencia de datos a gran escala .

Al igual que maasg dice, puede crear un nuevo DataFrame a partir del resultado de un mapa aplicado al DataFrame antiguo. Un ejemplo para un df DataFrame dado con dos filas:

 val newDf = sqlContext.createDataFrame(df.map(row => Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema) 

Tenga en cuenta que si los tipos de las columnas cambian, debe darle un esquema correcto en lugar de df.schema . Consulte la api de org.apache.spark.sql.Row para conocer los métodos disponibles: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[Actualización] O usando UDF en Scala:

 import org.apache.spark.sql.functions._ val toLong = udf[Long, String] (_.toLong) val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName") 

y si el nombre de la columna debe permanecer igual, puede renombrarlo de nuevo:

 modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")