¿Cómo exportar un cuadro de datos de la tabla en PySpark a CSV?

Estoy utilizando Spark 1.3.1 (PySpark) y he generado una tabla utilizando una consulta SQL. Ahora tengo un objeto que es un DataFrame . Quiero exportar este objeto DataFrame (lo he llamado “tabla”) a un archivo csv para poder manipularlo y trazar las columnas. ¿Cómo exporto la “tabla” de DataFrame a un archivo csv?

¡Gracias!

Si el dataframe se ajusta a una memoria de controlador y desea guardar en el sistema de archivos local, puede convertir Spark DataFrame en Pandas DataFrame local usando el método toPandas y luego usar to_csv :

 df.toPandas().to_csv('mycsv.csv') 

De lo contrario puedes usar spark-csv :

  • Chispa 1.3

     df.save('mycsv.csv', 'com.databricks.spark.csv') 
  • Spark 1.4+

     df.write.format('com.databricks.spark.csv').save('mycsv.csv') 

En Spark 2.0+ puede usar la fuente de datos csv directamente:

 df.write.csv('mycsv.csv') 

Para Apache Spark 2+, para guardar el dataframe en un solo archivo csv. Usa el siguiente comando

 query.repartition(1).write.csv("cc_out.csv", sep='|') 

Aquí 1 indica que necesito una partición de csv solamente. Puedes cambiarlo de acuerdo a tus requerimientos.

Si no puede usar spark-csv, puede hacer lo siguiente:

 df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("file.csv") 

Si necesita manejar cadenas con saltos de línea o comas que no funcionarán. Utilizar esta:

 import csv import cStringIO def row2csv(row): buffer = cStringIO.StringIO() writer = csv.writer(buffer) writer.writerow([str(s).encode("utf-8") for s in row]) buffer.seek(0) return buffer.read().strip() df.rdd.map(row2csv).coalesce(1).saveAsTextFile("file.csv") 

¿Qué tal esto (en ti no quieres un forro)?

 for row in df.collect(): d = row.asDict() s = "%d\t%s\t%s\n" % (d["int_column"], d["string_column"], d["string_column"]) f.write(s) 

f es un descriptor de archivo abierto. También el separador es un carácter TAB, pero es fácil de cambiar a lo que quieras.

Debe volver a particionar el Dataframe en una sola partición y luego definir el formato, la ruta y otro parámetro del archivo en formato de sistema de archivos Unix y aquí está,

 df.repartition(1).write.format('com.databricks.spark.csv').save("/path/to/file/myfile.csv",header = 'true') 

Lea más acerca de la función de reparto Lea más acerca de la función de guardar