¿Cómo guardar un enorme dataframe de pandas en formato PDF?

Estoy trabajando con pandas y con marcos de datos de chispas. Los marcos de datos son siempre muy grandes (> 20 GB) y las funciones de chispa estándar no son suficientes para esos tamaños. Actualmente estoy convirtiendo mi dataframe de pandas en un dataframe de chispa como este:

dataframe = spark.createDataFrame(pandas_dataframe) 

Hago esa transformación porque con la escritura de chispa de los marcos de datos a hdfs es muy fácil:

 dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy") 

Pero la transformación está fallando para los marcos de datos que son más grandes que 2 GB. Si transformo un dataframe de chispa en pandas puedo usar pyarrow:

 // temporary write spark dataframe to hdfs dataframe.write.parquet(path, mode="overwrite", compression="snappy") // open hdfs connection using pyarrow (pa) hdfs = pa.hdfs.connect("default", 0) // read parquet (pyarrow.parquet (pq)) parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs) table = parquet.read(nthreads=4) // transform table to pandas pandas = table.to_pandas(nthreads=4) // delete temp files hdfs.delete(path, recursive=True) 

Esta es una conversión rápida de chispa a pandas y también funciona para marcos de datos de más de 2 GB. Todavía no pude encontrar una manera de hacerlo al revés. Lo que significa tener un dataframe de pandas que transformo para encender con la ayuda de pyarrow. El problema es que realmente no puedo encontrar la forma de escribir un dataframe de pandas en hdfs.

Mi versión de los pandas: 0.19.0

    Lo que significa tener un dataframe de pandas que transformo para encender con la ayuda de pyarrow.

    pyarrow.Table.fromPandas es la función que está buscando:

     Table.from_pandas(type cls, df, bool timestamps_to_ms=False, Schema schema=None, bool preserve_index=True) Convert pandas.DataFrame to an Arrow Table 
     import pyarrow as pa pdf = ... # type: pandas.core.frame.DataFrame adf = pa.Table.from_pandas(pdf) # type: pyarrow.lib.Table 

    El resultado se puede escribir directamente en Parquet / HDFS sin pasar datos a través de Spark:

     import pyarrow.parquet as pq fs = pa.hdfs.connect() with fs.open(path, "wb") as fw pq.write_table(adf, fw) 

    Ver también

    • @WesMcKinney responde para leer archivos de parquet de HDFS usando PyArrow .
    • Leyendo y escribiendo el formato de parquet de Apache en la documentación de pyarrow .
    • Conectividad del sistema de archivos nativo Hadoop (HDFS) en Python

    Notas de chispa :

    Además, como Spark 2.3 (maestro actual), Arrow se admite directamente en createDataFrame ( SPARK-20791 – Use Apache Arrow para mejorar Spark createDataFrame desde Pandas.DataFrame ). Utiliza SparkContext.defaultParallelism para calcular la cantidad de fragmentos para que pueda controlar fácilmente el tamaño de lotes individuales.

    Finalmente, el defaultParallelism se puede usar para controlar la cantidad de particiones generadas usando el _convert_from_pandas estándar, reduciendo efectivamente el tamaño de las rebanadas a algo más manejable.

    Desafortunadamente, es poco probable que estos resuelvan sus problemas de memoria actuales . Ambos dependen de la parallelize , por lo tanto almacenan todos los datos en la memoria del nodo del controlador. Cambiar a Arrow o ajustar la configuración solo puede acelerar el proceso o abordar las limitaciones de tamaño del bloque.

    En la práctica, no veo ninguna razón para cambiar a Spark aquí, siempre y cuando utilices Pandas DataFrame local como entrada. El cuello de botella más grave en este escenario es la E / S de red del controlador y la distribución de datos no solucionará eso.

    Desde https://issues.apache.org/jira/browse/SPARK-6235

    Soporte para paralelizar R data.frame más grande que 2GB

    esta resuelto.

    Desde https://pandas.pydata.org/pandas-docs/stable/r_interface.html

    Convertir DataFrames en objetos R

    puede convertir un dataframe de pandas en un R data.frame

    Entonces, tal vez los pandas de transformación -> R -> Spark -> hdfs?

    Otra forma es convertir el dataframe de pandas para generar el dataframe (usando pyspark) y guardarlo en hdfs con el comando de guardar. ejemplo

      df = pd.read_csv("data/as/foo.csv") df[['Col1', 'Col2']] = df[['Col2', 'Col2']].astype(str) sc = SparkContext(conf=conf) sqlCtx = SQLContext(sc) sdf = sqlCtx.createDataFrame(df) 

    Aquí astype cambia el tipo de su columna de object a string . Esto evita que se genere una excepción, ya que la chispa no pudo descubrir el tipo de object pandas. Pero asegúrese de que estas columnas realmente sean de tipo cadena.

    Ahora para guardar tu df en hdfs:

      sdf.write.csv('mycsv.csv') 

    Un truco podría ser crear marcos de datos de N pandas (cada uno menos de 2 GB) (partición horizontal) desde el grande y crear N marcos de datos de chispa diferentes, luego fusionarlos (unirlos) para crear uno final para escribir en HDFS. Supongo que su máquina maestra es potente, pero también tiene disponible un clúster en el que está ejecutando Spark.