Pyspark leyó varios archivos csv en un dataframe (¿O RDD?)

Tengo un clúster Spark 2.0.2 que estoy atacando a través de Pyspark a través de Jupyter Notebook. Tengo varios archivos txt delimitados por tuberías (cargados en HDFS, pero también disponibles en un directorio local) que necesito cargar usando spark-csv en tres marcos de datos separados, dependiendo del nombre del archivo.

Veo tres enfoques que puedo tomar, o bien puedo usar python para recorrer de alguna manera el directorio HDFS (aún no he descubierto cómo hacer esto, cargar cada archivo y luego hacer una unión).

También sé que existe alguna funcionalidad comodín (consulte aquí ) en spark: probablemente pueda aprovechar

Por último, podría usar pandas para cargar el archivo csv de vainilla desde el disco como un dataframe de pandas y luego crear un dataframe de chispa. La desventaja aquí es que estos archivos son grandes, y la carga en la memoria en un solo nodo podría demorar ~ 8 gb. (Es por eso que esto se está moviendo a un clúster en primer lugar).

Aquí está el código que tengo hasta ahora y algunos pseudo códigos para los dos métodos:

import findspark findspark.init() import pyspark from pyspark.sql import SparkSession import pandas as pd sc = pyspark.SparkContext(appName = 'claims_analysis', master='spark://someIP:7077') spark = SparkSession(sc) #METHOD 1 - iterate over HDFS directory for currFile in os.listdir(HDFS:///someDir//): if #filename contains 'claim': #create or unionAll to merge claim_df if #filename contains 'pharm': #create or unionAll to merge pharm_df if #filename contains 'service': #create or unionAll to merge service_df #Method 2 - some kind of wildcard functionality claim_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*.csv') pharm_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*.csv') service_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*.csv') #METHOD 3 - load to a pandas df and then convert to spark df for currFile in os.listdir(HDFS:///someDir//) pd_df = pd.read_csv(currFile, sep = '|') df = spark.createDataFrame(pd_df) if #filename contains 'claim': #create or unionAll to merge claim_df if #filename contains 'pharm': #create or unionAll to merge pharm_df if #filename contains 'service': #create or unionAll to merge service_df 

¿Alguien sabe como implementar el método 1 o 2? No he podido resolver esto. Además, me sorprendió que no haya una mejor manera de cargar archivos csv en un dataframe de pyspark: usar un paquete de terceros para algo que parece que debería ser una característica nativa que me confundió (simplemente me perdí el caso de uso estándar para cargar archivos csv en un dataframe?) En última instancia, voy a escribir un único dataframe consolidado en HDFS (usando .write.parquet ()) para poder borrar la memoria y hacer algunos análisis utilizando MLlib. Si el enfoque que he destacado no es la mejor práctica, agradecería un impulso en la dirección correcta.

Enfoque 1:

En Python no puedes referirte directamente a la ubicación de HDFS. Necesitas tomar la ayuda de otra biblioteca como pydoop. En scala y java, tienes API. Incluso con pydoop, leerás los archivos uno por uno. Es malo leer los archivos uno por uno y no usar la opción de lectura paralela provista por spark.

Enfoque 2:

Debería poder apuntar los archivos múltiples con comas separadas o con comodines. De esta manera, spark se encarga de leer los archivos y distribuirlos en particiones. Pero si va con la opción de unión con cada dataframe, hay un caso de borde cuando lee dinámicamente cada archivo. Cuando tiene muchos archivos, la lista puede llegar a ser tan grande a nivel de controlador y puede causar problemas de memoria. La razón principal es que, el proceso de lectura todavía está ocurriendo a nivel de controlador.

Esta opción es mejor. La chispa leerá todos los archivos relacionados con expresiones regulares y los convertirá en particiones. Obtiene un RDD para todas las coincidencias de comodines y, a partir de ahí, no tiene que preocuparse por la unión de los rdd individuales.

Código de muestra cnippet:

 distFile = sc.textFile("/hdfs/path/to/folder/fixed_file_name_*.csv") 

Enfoque 3:

A menos que tenga alguna aplicación heredada en python que use las características de pandas, preferiría usar la API provista por chispa