Renombrar campo nested en el dataframe de chispa

Tener un df dataframe en Spark:

  |-- array_field: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- a: string (nullable = true) | | |-- b: long (nullable = true) | | |-- c: long (nullable = true) 

¿Cómo cambiar el nombre del campo array_field.a a array_field.a_renamed ?

[Actualizar]:

.withColumnRenamed() no funciona con campos nesteds, por lo que probé este método intrépido e inseguro:

 # First alter the schema: schema = df.schema schema['array_field'].dataType.elementType['a'].name = 'a_renamed' ind = schema['array_field'].dataType.elementType.names.index('a') schema['array_field'].dataType.elementType.names[ind] = 'a_renamed' # Then set dataframe's schema with altered schema df._schema = schema 

Sé que establecer un atributo privado no es una buena práctica, pero no conozco otra forma de establecer el esquema para df

    Creo que estoy en el camino correcto, pero df.printSchema() aún muestra el nombre antiguo de array_field.a , aunque df.schema == schema es True

    Pitón

    No es posible modificar un solo campo nested. Tienes que recrear toda una estructura. En este caso particular, la solución más simple es usar cast .

    Primero un montón de importaciones:

     from collections import namedtuple from pyspark.sql.functions import col from pyspark.sql.types import ( ArrayType, LongType, StringType, StructField, StructType) 

    y datos de ejemplo:

     Record = namedtuple("Record", ["a", "b", "c"]) df = sc.parallelize([([Record("foo", 1, 3)], )]).toDF(["array_field"]) 

    Confirmemos que el esquema es el mismo que en su caso:

     df.printSchema() 
     root |-- array_field: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- a: string (nullable = true) | | |-- b: long (nullable = true) | | |-- c: long (nullable = true) 

    Puede definir un nuevo esquema, por ejemplo, como una cadena:

     str_schema = "array>" df.select(col("array_field").cast(str_schema)).printSchema() 
     root |-- array_field: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- a_renamed: string (nullable = true) | | |-- b: long (nullable = true) | | |-- c: long (nullable = true) 

    o un tipo de DataType :

     struct_schema = ArrayType(StructType([ StructField("a_renamed", StringType()), StructField("b", LongType()), StructField("c", LongType()) ])) df.select(col("array_field").cast(struct_schema)).printSchema() 
     root |-- array_field: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- a_renamed: string (nullable = true) | | |-- b: long (nullable = true) | | |-- c: long (nullable = true) 

    Scala

    Las mismas técnicas se pueden utilizar en Scala:

     case class Record(a: String, b: Long, c: Long) val df = Seq(Tuple1(Seq(Record("foo", 1, 3)))).toDF("array_field") val strSchema = "array>" df.select($"array_field".cast(strSchema)) 

    o

     import org.apache.spark.sql.types._ val structSchema = ArrayType(StructType(Seq( StructField("a_renamed", StringType), StructField("b", LongType), StructField("c", LongType) ))) df.select($"array_field".cast(structSchema)) 

    Posibles mejoras :

    Si utiliza una manipulación de datos expresiva o una biblioteca de procesamiento JSON, podría ser más fácil volcar los tipos de datos a dict o cadenas JSON y tomarlos desde allí, por ejemplo (Python / toolz ):

     from toolz.curried import pipe, assoc_in, update_in, map from operator import attrgetter # Update name to "a_updated" if name is "a" rename_field = update_in( keys=["name"], func=lambda x: "a_updated" if x == "a" else x) updated_schema = pipe( # Get schema of the field as a dict df.schema["array_field"].jsonValue(), # Update fields with rename update_in( keys=["type", "elementType", "fields"], func=lambda x: pipe(x, map(rename_field), list)), # Load schema from dict StructField.fromJson, # Get data type attrgetter("dataType")) df.select(col("array_field").cast(updated_schema)).printSchema()