Cómo convertir la cadena a ArrayType del diccionario (JSON) en PySpark

Intentando convertir StringType en ArrayType de JSON para un CSV generado a partir de un dataframe.

Usando pyspark en Spark2

El archivo CSV que estoy tratando; es como sigue –

 date,attribute2,count,attribute3 2017-09-03,'attribute1_value1',2,'[{"key":"value","key2":2},{"key":"value","key2":2},{"key":"value","key2":2}]' 2017-09-04,'attribute1_value2',2,'[{"key":"value","key2":20},{"key":"value","key2":25},{"key":"value","key2":27}]' 

Como se muestra arriba, contiene un atributo "attribute3" en cadena literal, que técnicamente es una lista de diccionario (JSON) con una longitud exacta de 2. (Esta es la salida de la función distinta)

printSchema() del printSchema()

 attribute3: string (nullable = true) 

Estoy tratando de lanzar el "attribute3" a ArrayType siguiente manera

 temp = dataframe.withColumn( "attribute3_modified", dataframe["attribute3"].cast(ArrayType()) ) 
 Traceback (most recent call last): File "", line 1, in  TypeError: __init__() takes at least 2 arguments (1 given) 

De hecho, ArrayType espera el tipo de datos como argumento. Lo intenté con "json" , pero no funcionó.

Salida deseada: al final, necesito convertir attribute3 a ArrayType() o una simple lista de Python. (Estoy tratando de evitar el uso de eval )

¿Cómo lo convierto a ArrayType para que pueda tratarlo como una lista de JSON?

¿Me estoy perdiendo algo aquí?

(La documentación , no aborda este problema de manera directa)

Utilice from_json con un esquema que coincida con los datos reales en la columna attribute3 para convertir json a ArrayType:

Marco de datos original:

 df.printSchema() #root # |-- date: string (nullable = true) # |-- attribute2: string (nullable = true) # |-- count: long (nullable = true) # |-- attribute3: string (nullable = true) from pyspark.sql.functions import from_json from pyspark.sql.types import * 

Crea el esquema :

 schema = ArrayType( StructType([StructField("key", StringType()), StructField("key2", IntegerType())])) 

Utilice from_json :

 df = df.withColumn("attribute3", from_json(df.attribute3, schema)) df.printSchema() #root # |-- date: string (nullable = true) # |-- attribute2: string (nullable = true) # |-- count: long (nullable = true) # |-- attribute3: array (nullable = true) # | |-- element: struct (containsNull = true) # | | |-- key: string (nullable = true) # | | |-- key2: integer (nullable = true) df.show(1, False) #+----------+----------+-----+------------------------------------+ #|date |attribute2|count|attribute3 | #+----------+----------+-----+------------------------------------+ #|2017-09-03|attribute1|2 |[[value, 2], [value, 2], [value, 2]]| #+----------+----------+-----+------------------------------------+ 

La respuesta de @Psidom no me funciona porque estoy usando Spark 2.1.

En mi caso, tuve que modificar un poco su cadena attribute3 para envolverla en un diccionario:

 import pyspark.sql.functions as f df2 = df.withColumn("attribute3", f.concat(f.lit('{"data": '), "attribute3", f.lit("}"))) df2.select("attribute3").show(truncate=False) #+--------------------------------------------------------------------------------------+ #|attribute3 | #+--------------------------------------------------------------------------------------+ #|{"data": [{"key":"value","key2":2},{"key":"value","key2":2},{"key":"value","key2":2}]}| #+--------------------------------------------------------------------------------------+ 

Ahora puedo definir el esquema de la siguiente manera:

 schema = StructType( [ StructField( "data", ArrayType( StructType( [ StructField("key", StringType()), StructField("key2", IntegerType()) ] ) ) ) ] ) 

Ahora use from_json seguido de getItem() :

 df3 = df2.withColumn("attribute3", f.from_json("attribute3", schema).getItem("data")) df3.show(truncate=False) #+----------+----------+-----+---------------------------------+ #|date |attribute2|count|attribute3 | #+----------+----------+-----+---------------------------------+ #|2017-09-03|attribute1|2 |[[value,2], [value,2], [value,2]]| #+----------+----------+-----+---------------------------------+ 

Y el esquema:

 df3.printSchema() # root # |-- attribute3: array (nullable = true) # | |-- element: struct (containsNull = true) # | | |-- key: string (nullable = true) # | | |-- key2: integer (nullable = true)