Cómo hacer la primera primera fila como encabezado al leer un archivo en PySpark y convertirlo en Pandas Dataframe

Estoy leyendo un archivo en PySpark y formando el rdd de él. Luego lo convierto a un dataframe normal y luego a un pandas dataframe . El problema que tengo es que hay una fila de encabezado en mi archivo de entrada y quiero hacer esto también como encabezado de las columnas del dataframe, pero se leen como una fila adicional y no como encabezado. Este es mi código actual:

 def extract(line): return line input_file = sc.textFile('file1.txt').zipWithIndex().filter(lambda (line,rownum): rownum>=0).map(lambda (line, rownum): line) input_data = (input_file .map(lambda line: line.split(";")) .filter(lambda line: len(line) >=0 ) .map(extract)) # Map to tuples df_normal = input_data.toDF() df= df_normal.toPandas() 

Ahora, cuando miro el df , la fila del encabezado del archivo de texto se convierte en la primera fila del dataframe de dataframe y hay un encabezado adicional en df con 0,1,2... como encabezado. ¿Cómo puedo hacer la primera fila como encabezado?

Hay un par de maneras de hacerlo, dependiendo de la estructura exacta de sus datos. Como no proporciona ningún detalle, intentaré mostrarlo utilizando un archivo de datos nyctaxicab.csv que puede descargar .

Si su archivo está en formato csv , debe usar el paquete spark-csv relevante, provisto por Databricks. No es necesario descargarlo explícitamente, simplemente ejecute pyspark siguiente manera:

 $ pyspark --packages com.databricks:spark-csv_2.10:1.3.0 

y entonces

 >>> from pyspark.sql import SQLContext >>> from pyspark.sql.types import * >>> sqlContext = SQLContext(sc) >>> df = sqlContext.read.load('file:///home/vagrant/data/nyctaxisub.csv', format='com.databricks.spark.csv', header='true', inferSchema='true') >>> df.count() 249999 

El archivo tiene 250,000 filas, incluido el encabezado, por lo que 249,999 es el número correcto de registros reales. Aquí está el esquema, como se infiere automáticamente por el paquete:

 >>> df.dtypes [('_id', 'string'), ('_rev', 'string'), ('dropoff_datetime', 'string'), ('dropoff_latitude', 'double'), ('dropoff_longitude', 'double'), ('hack_license', 'string'), ('medallion', 'string'), ('passenger_count', 'int'), ('pickup_datetime', 'string'), ('pickup_latitude', 'double'), ('pickup_longitude', 'double'), ('rate_code', 'int'), ('store_and_fwd_flag', 'string'), ('trip_distance', 'double'), ('trip_time_in_secs', 'int'), ('vendor_id', 'string')] 

Puedes ver más detalles en mi blog relevante .

Si, por alguna razón, no puede usar el paquete spark-csv , tendrá que restar la primera fila de los datos y luego usarla para construir su esquema. Aquí está la idea general, y puede encontrar de nuevo un ejemplo completo con los detalles del código en otra publicación de mi blog :

 >>> taxiFile = sc.textFile("file:///home/ctsats/datasets/BDU_Spark/nyctaxisub.csv") >>> taxiFile.count() 250000 >>> taxiFile.take(5) [u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"', u'"29b3f4a30dea6688d4c289c9672cb996","1-ddfdec8050c7ef4dc694eeeda6c4625e","2013-01-11 22:03:00",+4.07033460000000E+001,-7.40144200000000E+001,"A93D1F7F8998FFB75EEF477EB6077516","68BC16A99E915E44ADA7E639B4DD5F59",2,"2013-01-11 21:48:00",+4.06760670000000E+001,-7.39810790000000E+001,1,,+4.08000000000000E+000,900,"VTS"', u'"2a80cfaa425dcec0861e02ae44354500","1-b72234b58a7b0018a1ec5d2ea0797e32","2013-01-11 04:28:00",+4.08190960000000E+001,-7.39467470000000E+001,"64CE1B03FDE343BB8DFB512123A525A4","60150AA39B2F654ED6F0C3AF8174A48A",1,"2013-01-11 04:07:00",+4.07280540000000E+001,-7.40020370000000E+001,1,,+8.53000000000000E+000,1260,"VTS"', u'"29b3f4a30dea6688d4c289c96758d87e","1-387ec30eac5abda89d2abefdf947b2c1","2013-01-11 22:02:00",+4.07277180000000E+001,-7.39942860000000E+001,"2D73B0C44F1699C67AB8AE322433BDB7","6F907BC9A85B7034C8418A24A0A75489",5,"2013-01-11 21:46:00",+4.07577480000000E+001,-7.39649810000000E+001,1,,+3.01000000000000E+000,960,"VTS"', u'"2a80cfaa425dcec0861e02ae446226e4","1-aa8b16d6ae44ad906a46cc6581ffea50","2013-01-11 10:03:00",+4.07643050000000E+001,-7.39544600000000E+001,"E90018250F0A009433F03BD1E4A4CE53","1AFFD48CC07161DA651625B562FE4D06",5,"2013-01-11 09:44:00",+4.07308080000000E+001,-7.39928280000000E+001,1,,+3.64000000000000E+000,1140,"VTS"'] # Construct the schema from the header >>> header = taxiFile.first() >>> header u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"' >>> schemaString = header.replace('"','') # get rid of the double-quotes >>> schemaString u'_id,_rev,dropoff_datetime,dropoff_latitude,dropoff_longitude,hack_license,medallion,passenger_count,pickup_datetime,pickup_latitude,pickup_longitude,rate_code,store_and_fwd_flag,trip_distance,trip_time_in_secs,vendor_id' >>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(',')] >>> schema = StructType(fields) # Subtract header and use the above-constructed schema: >>> taxiHeader = taxiFile.filter(lambda l: "_id" in l) # taxiHeader needs to be an RDD - the string we constructed above will not do the job >>> taxiHeader.collect() # for inspection purposes only [u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"'] >>> taxiNoHeader = taxiFile.subtract(taxiHeader) >>> taxi_df = taxiNoHeader.toDF(schema) # Spark dataframe >>> import pandas as pd >>> taxi_DF = taxi_df.toPandas() # pandas dataframe 

Por brevedad, aquí todas las columnas terminan siendo de tipo string , pero en la publicación del blog muestro en detalle y explico cómo puede refinar más los tipos de datos (y nombres) deseados para campos específicos.

La respuesta simple sería set header='true'

P.ej:

 df = spark.read.csv('housing.csv', header='true') 

o

 df = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv")