Pyspark: analizar una columna de cadenas json

Tengo un dataframe pyspark que consta de una columna, llamada json , donde cada fila es una cadena unicode de json. Me gustaría analizar cada fila y devolver un nuevo dataframe donde cada fila es el json analizado.

 # Sample Data Frame jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}' jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}' jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}' df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)]) 

He intentado mapear sobre cada fila con json.loads :

 (df .select('json') .rdd .map(lambda x: json.loads(x)) .toDF() ).show() 

Pero esto devuelve un TypeError: expected string or buffer

Sospecho que parte del problema es que al convertir de un dataframe de dataframe a un rdd , la información del esquema se pierde, por lo que también he intentado ingresar manualmente la información del esquema:

 schema = StructType([StructField('json', StringType(), True)]) rdd = (df .select('json') .rdd .map(lambda x: json.loads(x)) ) new_df = sql_context.createDataFrame(rdd, schema) new_df.show() 

Pero me sale el mismo TypeError .

Mirando esta respuesta , parece que aplanar las filas con flatMap podría ser útil aquí, pero tampoco estoy teniendo éxito con eso:

 schema = StructType([StructField('json', StringType(), True)]) rdd = (df .select('json') .rdd .flatMap(lambda x: x) .flatMap(lambda x: json.loads(x)) .map(lambda x: x.get('body')) ) new_df = sql_context.createDataFrame(rdd, schema) new_df.show() 

Recibo este error: AttributeError: 'unicode' object has no attribute 'get' .

Convertir un dataframe con cadenas json en un dataframe estructurado es, en realidad, bastante simple si se convierte el dataframe a RDD de cadenas (consulte: http://spark.apache.org/docs/latest/sql-programming-guide). html # json-datasets )

Por ejemplo:

 >>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json)) >>> new_df.printSchema() root |-- body: struct (nullable = true) | |-- id: long (nullable = true) | |-- name: string (nullable = true) | |-- sub_json: struct (nullable = true) | | |-- id: long (nullable = true) | | |-- sub_sub_json: struct (nullable = true) | | | |-- col1: long (nullable = true) | | | |-- col2: string (nullable = true) |-- header: struct (nullable = true) | |-- foo: string (nullable = true) | |-- id: long (nullable = true) 

Para Spark 2.1+ , puede usar from_json que permite la preservación de las otras columnas no json dentro del dataframe de la siguiente manera:

 from pyspark.sql.functions import from_json json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema df.withColumn('json', from_json(col('json'), json_schema)) 

Dejas que Spark derive el esquema de la columna de la cadena json. Entonces, la columna df.json ya no es un StringType, pero la estructura json decodificada correctamente, es decir, el tipo de estructura nested y todas las demás columnas de df se conservan tal cual.

Puede acceder al contenido json de la siguiente manera:

 df.select(col('json.header').alias('header')) 

Las respuestas existentes no funcionan si su JSON no tiene el formato perfecto / tradicional. Por ejemplo, la inferencia de esquema basada en RDD espera JSON entre llaves {} y proporcionará un esquema incorrecto (que da como resultado valores null ) si, por ejemplo, sus datos parecen:

 [ { "a": 1.0, "b": 1 }, { "a": 0.0, "b": 2 } ] 

Escribí una función para solucionar este problema desinfectando JSON de modo que viva en otro objeto JSON:

 def parseJSONCols(df, *cols, sanitize=True): """Auto infer the schema of a json column and parse into a struct. rdd-based schema inference works if you have well-formatted JSON, like ``{"key": "value", ...}``, but breaks if your 'JSON' is just a string (``"data"``) or is an array (``[1, 2, 3]``). In those cases you can fix everything by wrapping the data in another JSON object (``{"key": [1, 2, 3]}``). The ``sanitize`` option (default True) automatically performs the wrapping and unwrapping. The schema inference is based on this `SO Post `_. Parameters ---------- df : pyspark dataframe Dataframe containing the JSON cols. *cols : string(s) Names of the columns containing JSON. sanitize : boolean Flag indicating whether you'd like to sanitize your records by wrapping and unwrapping them in another JSON object layer. Returns ------- pyspark dataframe A dataframe with the decoded columns. """ res = df for i in cols: # sanitize if requested. if sanitize: res = ( res.withColumn( i, psf.concat(psf.lit('{"data": '), i, psf.lit('}')) ) ) # infer schema and apply it schema = spark.read.json(res.rdd.map(lambda x: x[i])).schema res = res.withColumn(i, psf.from_json(psf.col(i), schema)) # unpack the wrapped object if needed if sanitize: res = res.withColumn(i, psf.col(i).data) return res 

Nota: psf = pyspark.sql.functions .

Aquí hay una versión concisa (spark SQL) de la función parseJSONCols de @ nolan-conaway.

 SELECT explode( from_json( concat('{"data":', '[{"a": 1.0,"b": 1},{"a": 0.0,"b": 2}]', '}'), 'data array>' ).data) as data; 

PD. También he añadido la función de explosión: P

Necesitarás conocer algunos tipos de HIVE SQL.