Cargar archivo CSV con Spark

Soy nuevo en Spark y estoy tratando de leer datos CSV de un archivo con Spark. Esto es lo que estoy haciendo:

sc.textFile('file.csv') .map(lambda line: (line.split(',')[0], line.split(',')[1])) .collect() 

Espero que esta llamada me dé una lista de las dos primeras columnas de mi archivo, pero recibo este error:

 File "", line 1, in  IndexError: list index out of range 

aunque mi archivo CSV como más de una columna.

¿Estás seguro de que todas las líneas tienen al menos 2 columnas? ¿Puedes probar algo como, solo para comprobar ?:

 sc.textFile("file.csv") \ .map(lambda line: line.split(",")) \ .filter(lambda line: len(line)>1) \ .map(lambda line: (line[0],line[1])) \ .collect() 

Alternativamente, puede imprimir el culpable (si existe):

 sc.textFile("file.csv") \ .map(lambda line: line.split(",")) \ .filter(lambda line: len(line)<=1) \ .collect() 

Spark 2.0.0+

Puede usar la fuente de datos csv incorporada directamente:

 spark.read.csv( "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema ) 

o

 (spark.read .schema(schema) .option("header", "true") .option("mode", "DROPMALFORMED") .csv("some_input_file.csv")) 

Sin incluir dependencias externas.

Chispa <2.0.0 :

En lugar de un análisis manual, que está lejos de ser trivial en un caso general, recomendaría spark-csv :

Asegúrate de que Spark CSV esté incluido en la ruta ( --packages , --jars , --driver-class-path )

Y cargue sus datos de la siguiente manera:

 (df = sqlContext .read.format("com.databricks.spark.csv") .option("header", "true") .option("inferschema", "true") .option("mode", "DROPMALFORMED") .load("some_input_file.csv")) 

Puede manejar la carga, la inferencia de esquemas, la eliminación de líneas mal formadas y no requiere pasar datos de Python a la JVM.

Nota :

Si conoce el esquema, es mejor evitar la inferencia del esquema y pasarlo a DataFrameReader . Suponiendo que tiene tres columnas: entero, doble y cadena:

 from pyspark.sql.types import StructType, StructField from pyspark.sql.types import DoubleType, IntegerType, StringType schema = StructType([ StructField("A", IntegerType()), StructField("B", DoubleType()), StructField("C", StringType()) ]) (sqlContext .read .format("com.databricks.spark.csv") .schema(schema) .option("header", "true") .option("mode", "DROPMALFORMED") .load("some_input_file.csv")) 
 from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|"); print(df.collect()) 

Y aún otra opción que consiste en leer el archivo CSV usando Pandas y luego importar Pandas DataFrame en Spark.

Por ejemplo:

 from pyspark import SparkContext from pyspark.sql import SQLContext import pandas as pd sc = SparkContext('local','example') # if using locally sql_sc = SQLContext(sc) pandas_df = pd.read_csv('file.csv') # assuming the file contains a header # pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header s_df = sql_sc.createDataFrame(pandas_df) 

Simplemente dividir por coma también dividirá las comas que están dentro de los campos (por ejemplo a,b,"1,2,3",c ), por lo que no se recomienda. La respuesta de zero323 es buena si desea utilizar la API DataFrames, pero si desea mantener el Spark base, puede analizar csvs en Python base con el módulo csv :

 # works for both python 2 and 3 import csv rdd = sc.textFile("file.csv") rdd = rdd.mapPartitions(lambda x: csv.reader(x)) 

EDITAR: Como @muon se menciona en los comentarios, esto tratará el encabezado como cualquier otra fila, por lo que deberá extraerlo manualmente. Por ejemplo, header = rdd.first(); rdd = rdd.filter(lambda x: x != header) header = rdd.first(); rdd = rdd.filter(lambda x: x != header) (asegúrese de no modificar el header antes de que se evalúe el filtro). Pero en este punto, probablemente esté mejor usando un analizador CSV incorporado.

Esto está en línea con lo que JP Mercier sugirió inicialmente sobre el uso de Pandas, pero con una modificación importante: si lees datos en Pandas en partes, debería ser más maleable. Lo que significa que puedes analizar un archivo mucho más grande de lo que Pandas puede manejar como una sola pieza y pasarlo a Spark en tamaños más pequeños. (Esto también responde al comentario sobre por qué uno querría usar Spark si pueden cargar todo en Pandas de todos modos).

 from pyspark import SparkContext from pyspark.sql import SQLContext import pandas as pd sc = SparkContext('local','example') # if using locally sql_sc = SQLContext(sc) Spark_Full = sc.emptyRDD() chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000) # if you have headers in your csv file: headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns) for chunky in chunk_100k: Spark_Full += sc.parallelize(chunky.values.tolist()) YourSparkDataFrame = Spark_Full.toDF(headers) # if you do not have headers, leave empty instead: # YourSparkDataFrame = Spark_Full.toDF() YourSparkDataFrame.show() 

Ahora, también hay otra opción para cualquier archivo csv general: https://github.com/seahboonsiew/pyspark-csv de la siguiente manera:

Supongamos que tenemos el siguiente contexto

 sc = SparkContext sqlCtx = SQLContext or HiveContext 

Primero, distribuya pyspark-csv.py a los ejecutores usando SparkContext

 import pyspark_csv as pycsv sc.addPyFile('pyspark_csv.py') 

Lea datos csv a través de SparkContext y conviértalos a DataFrame

 plaintext_rdd = sc.textFile('hdfs://xxxx/blah.csv') dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd) 

Si sus datos csv no contienen nuevas líneas en ninguno de los campos, puede cargar sus datos con textFile() y analizarlos.

 import csv import StringIO def loadRecord(line): input = StringIO.StringIO(line) reader = csv.DictReader(input, fieldnames=["name1", "name2"]) return reader.next() input = sc.textFile(inputFile).map(loadRecord) 

Si desea cargar csv como un dataframe, puede hacer lo siguiente:

 from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.format('com.databricks.spark.csv') \ .options(header='true', inferschema='true') \ .load('sampleFile.csv') # this is your csv file 

Funcionó bien para mí.

Si tiene una o más filas con menos o más números de columnas que 2 en el conjunto de datos, puede aparecer este error.

También soy nuevo en Pyspark e bash leer el archivo CSV. El siguiente código funcionó para mí:

En este código que estoy usando el conjunto de datos de kaggle, el enlace es: https://www.kaggle.com/carrie1/ecommerce-data

1. Sin mencionar el esquema:

 from pyspark.sql import SparkSession scSpark = SparkSession \ .builder \ .appName("Python Spark SQL basic example: Reading CSV file without mentioning schema") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() sdfData = scSpark.read.csv("data.csv", header=True, sep=",") sdfData.show() 

Ahora revisa las columnas: sdfData.columns

La salida será:

 ['InvoiceNo', 'StockCode','Description','Quantity', 'InvoiceDate', 'CustomerID', 'Country'] 

Compruebe el tipo de datos para cada columna:

 sdfData.schema StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true))) 

Esto le dará al dataframe todas las columnas con tipo de datos como StringType

2. Con esquema: si conoce el esquema o desea cambiar el tipo de datos de cualquier columna en la tabla anterior, entonces use esto (digamos que tengo las siguientes columnas y las quiero en un tipo de datos en particular para cada una de ellas)

 from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField from pyspark.sql.types import DoubleType, IntegerType, StringType schema = StructType([\ StructField("InvoiceNo", IntegerType()),\ StructField("StockCode", StringType()), \ StructField("Description", StringType()),\ StructField("Quantity", IntegerType()),\ StructField("InvoiceDate", StringType()),\ StructField("CustomerID", DoubleType()),\ StructField("Country", StringType())\ ]) scSpark = SparkSession \ .builder \ .appName("Python Spark SQL example: Reading CSV file with schema") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() sdfData = scSpark.read.csv("data.csv", header=True, sep=",", schema=schema) 

Ahora verifique el esquema para el tipo de datos de cada columna:

 sdfData.schema StructType(List(StructField(InvoiceNo,IntegerType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true))) 

Editado: podemos usar la siguiente línea de código también sin mencionar el esquema explícitamente:

 sdfData = scSpark.read.csv("data.csv", header=True, inferSchema = True) sdfData.schema 

La salida es:

 StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true))) 

La salida se verá así:

 sdfData.show() +---------+---------+--------------------+--------+--------------+----------+-------+ |InvoiceNo|StockCode| Description|Quantity| InvoiceDate|CustomerID|Country| +---------+---------+--------------------+--------+--------------+----------+-------+ | 536365| 85123A|WHITE HANGING HEA...| 6|12/1/2010 8:26| 2.55| 17850| | 536365| 71053| WHITE METAL LANTERN| 6|12/1/2010 8:26| 3.39| 17850| | 536365| 84406B|CREAM CUPID HEART...| 8|12/1/2010 8:26| 2.75| 17850| | 536365| 84029G|KNITTED UNION FLA...| 6|12/1/2010 8:26| 3.39| 17850| | 536365| 84029E|RED WOOLLY HOTTIE...| 6|12/1/2010 8:26| 3.39| 17850| | 536365| 22752|SET 7 BABUSHKA NE...| 2|12/1/2010 8:26| 7.65| 17850| | 536365| 21730|GLASS STAR FROSTE...| 6|12/1/2010 8:26| 4.25| 17850| | 536366| 22633|HAND WARMER UNION...| 6|12/1/2010 8:28| 1.85| 17850| | 536366| 22632|HAND WARMER RED P...| 6|12/1/2010 8:28| 1.85| 17850| | 536367| 84879|ASSORTED COLOUR B...| 32|12/1/2010 8:34| 1.69| 13047| | 536367| 22745|POPPY'S PLAYHOUSE...| 6|12/1/2010 8:34| 2.1| 13047| | 536367| 22748|POPPY'S PLAYHOUSE...| 6|12/1/2010 8:34| 2.1| 13047| | 536367| 22749|FELTCRAFT PRINCES...| 8|12/1/2010 8:34| 3.75| 13047| | 536367| 22310|IVORY KNITTED MUG...| 6|12/1/2010 8:34| 1.65| 13047| | 536367| 84969|BOX OF 6 ASSORTED...| 6|12/1/2010 8:34| 4.25| 13047| | 536367| 22623|BOX OF VINTAGE JI...| 3|12/1/2010 8:34| 4.95| 13047| | 536367| 22622|BOX OF VINTAGE AL...| 2|12/1/2010 8:34| 9.95| 13047| | 536367| 21754|HOME BUILDING BLO...| 3|12/1/2010 8:34| 5.95| 13047| | 536367| 21755|LOVE BUILDING BLO...| 3|12/1/2010 8:34| 5.95| 13047| | 536367| 21777|RECIPE BOX WITH M...| 4|12/1/2010 8:34| 7.95| 13047| +---------+---------+--------------------+--------+--------------+----------+-------+ only showing top 20 rows 
 import pandas as pd data1 = pd.read_csv("test1.csv") data2 = pd.read_csv("train1.csv")