Convertir el dataframe de Pandas en un error del dataframe de Spark

Estoy tratando de convertir Pandas DF en una chispa. Cabeza del DF:

10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543 10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611 10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691 

Código:

 dataset = pd.read_csv("data/AS/test_v2.csv") sc = SparkContext(conf=conf) sqlCtx = SQLContext(sc) sdf = sqlCtx.createDataFrame(dataset) 

Y me salió un error:

 TypeError: Can not merge type  and  

Debe asegurarse de que las columnas de dataframe de pandas sean adecuadas para el tipo de chispa que se infiere. Si su dataframe pandas enumera algo como:

 pd.info()  RangeIndex: 5062 entries, 0 to 5061 Data columns (total 51 columns): SomeCol 5062 non-null object Col2 5062 non-null object 

Y estás recibiendo ese error, prueba:

 df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str) 

Ahora, asegúrese de que .astype(str) sea ​​realmente el tipo que desea que sean esas columnas. Básicamente, cuando el código Java subyacente intenta inferir el tipo de un objeto en python, utiliza algunas observaciones y realiza una conjetura, si esa estimación no se aplica a todos los datos en la (s) columna (s) que está intentando convertir de pandas a chispa va a fallar.

Los errores relacionados con los tipos se pueden evitar imponiendo un esquema de la siguiente manera:

nota : se creó un archivo de texto ( test.csv ) con los datos originales (como arriba) y se insertaron nombres de columna hipotéticos (“col1”, “col2”, …, “col25”).

 import pyspark from pyspark.sql import SparkSession import pandas as pd spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate() pdDF = pd.read_csv("test.csv") 

contenidos del dataframe pandas:

 pdDF col1 col2 col3 col4 col5 col6 col7 col8 col9 col10 ... col16 col17 col18 col19 col20 col21 col22 col23 col24 col25 0 10000001 1 0 1 12:35 OK 10002 1 0 9 ... 3 9 0 0 1 1 0 0 4 543 1 10000001 2 0 1 12:36 OK 10002 1 0 9 ... 3 9 2 1 1 3 1 3 2 611 2 10000002 1 0 4 12:19 PA 10003 1 1 7 ... 2 15 2 0 2 3 1 2 2 691 

A continuación, crea el esquema:

 from pyspark.sql.types import * mySchema = StructType([ StructField("Col1", LongType(), True)\ ,StructField("Col2", IntegerType(), True)\ ,StructField("Col3", IntegerType(), True)\ ,StructField("Col4", IntegerType(), True)\ ,StructField("Col5", StringType(), True)\ ,StructField("Col6", StringType(), True)\ ,StructField("Col7", IntegerType(), True)\ ,StructField("Col8", IntegerType(), True)\ ,StructField("Col9", IntegerType(), True)\ ,StructField("Col10", IntegerType(), True)\ ,StructField("Col11", StringType(), True)\ ,StructField("Col12", StringType(), True)\ ,StructField("Col13", IntegerType(), True)\ ,StructField("Col14", IntegerType(), True)\ ,StructField("Col15", IntegerType(), True)\ ,StructField("Col16", IntegerType(), True)\ ,StructField("Col17", IntegerType(), True)\ ,StructField("Col18", IntegerType(), True)\ ,StructField("Col19", IntegerType(), True)\ ,StructField("Col20", IntegerType(), True)\ ,StructField("Col21", IntegerType(), True)\ ,StructField("Col22", IntegerType(), True)\ ,StructField("Col23", IntegerType(), True)\ ,StructField("Col24", IntegerType(), True)\ ,StructField("Col25", IntegerType(), True)]) 

nota : True (implica nullable permitido)

crear el dataframe pyspark:

 df = spark.createDataFrame(pdDF,schema=mySchema) 

confirme que el dataframe de pandas ahora es un dataframe de pyspark:

 type(df) 

salida:

 pyspark.sql.dataframe.DataFrame 

He intentado esto con tus datos y está funcionando:

 %pyspark import pandas as pd from pyspark.sql import SQLContext print sc df = pd.read_csv("test.csv") print type(df) print df sqlCtx = SQLContext(sc) sqlCtx.createDataFrame(df).show() 

Una vez recibí un mensaje de error similar, en mi caso fue porque el dataframe de mis pandas contenía NULL. Recomendaré probar y manejar esto en pandas antes de convertirlo en chispa (esto resolvió el problema en mi caso).