pyspark EOFError después de llamar al mapa

Soy nuevo en spark & ​​pyspark.

Estoy leyendo un pequeño archivo csv (~ 40k) en un dataframe.

from pyspark.sql import functions as F df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/tmp/sm.csv') df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0)) df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF() 

Me sale un error extraño que no ocurre todas las veces, pero ocurre con bastante frecuencia

 >>> df2.show(1) +--------------------+---------+ | features| label| +--------------------+---------+ |[0.0,0.0,0.0,0.0,...|4700734.0| +--------------------+---------+ only showing top 1 row >>> df2.count() 41999 >>> df2.show(1) +--------------------+---------+ | features| label| +--------------------+---------+ |[0.0,0.0,0.0,0.0,...|4700734.0| +--------------------+---------+ only showing top 1 row >>> df2.count() 41999 >>> df2.show(1) Traceback (most recent call last): File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker File "spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 136, in main if read_int(infile) == SpecialLengths.END_OF_STREAM: File "spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", line 545, in read_int raise EOFError EOFError +--------------------+---------+ | features| label| +--------------------+---------+ |[0.0,0.0,0.0,0.0,...|4700734.0| +--------------------+---------+ only showing top 1 row 

Una vez que se haya generado EOFError, no lo volveré a ver hasta que haga algo que requiera interactuar con el servidor de chispa.

Cuando llamo a df2.count () muestra que el indicador [Etapa xxx], que es lo que quiero decir con eso, va al servidor de chispa. Cualquier cosa que se dispare que parezca que finalmente termine dándole a EOFError nuevamente cuando hago algo con df2.

No parece que suceda con df (vs. df2), así que parece que debe ser algo que sucede con la línea df.map ().

¿Puedes por favor intentar hacer un mapa después de convertir el dataframe en rdd? Está aplicando la función de mapa en un dataframe y luego de nuevo creando un dataframe a partir de eso. La syntax sería como

 df.rdd.map().toDF() 

Por favor, déjeme saber si funciona. Gracias.

Creo que estás ejecutando Spark 2.x y superior. Debajo del código debe crear su dataframe desde csv:

 df = spark.read.format("csv").option("header", "true").load("csvfile.csv") 

entonces puedes tener el siguiente código:

 df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0)) 

y luego puedes crear df2 sin Fila y toDF ()

Avíseme si esto funciona o si está utilizando Spark 1.6 … gracias.