Filtro RDD basado en row_number

sc.textFile (ruta) permite leer un archivo HDFS pero no acepta parámetros (como omitir varias filas, has_headers, …).

en el libro electrónico O’Reilly de “Learning Spark”, se sugiere usar la siguiente función para leer un CSV (Ejemplo 5-12. Ejemplo de carga de Python CSV)

import csv import StringIO def loadRecord(line): """Parse a CSV line""" input = StringIO.StringIO(line) reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"]) return reader.next() input = sc.textFile(inputFile).map(loadRecord) 

Mi pregunta es sobre cómo ser selectivo acerca de las filas “tomadas”:

  1. Cómo evitar cargar la primera fila (los encabezados)
  2. Cómo eliminar una fila específica (por ejemplo, la fila 5)

Veo algunas soluciones decentes aquí: seleccione una gama de elementos, pero me gustaría ver si hay algo más simple.

¡Gracias!

No se preocupe por cargar las filas / líneas que no necesita. Cuando tu lo hagas:

 input = sc.textFile(inputFile) 

no estas cargando el archivo Acaba de obtener un objeto que le permitirá operar en el archivo. Entonces, para ser eficiente, es mejor pensar en términos de obtener solo lo que usted desea. Por ejemplo:

 header = input.take(1)[0] rows = input.filter(lambda line: line != header) 

Tenga en cuenta que aquí no estoy usando un índice para referirme a la línea que quiero soltar, sino a su valor. Esto tiene el efecto secundario de que otras líneas con este valor también serán ignoradas, pero está más en el espíritu de Spark, ya que Spark distribuirá su archivo de texto en diferentes partes a través de los nodos y el concepto de números de línea se pierde en cada partición. Esta es también la razón por la que esto no es fácil de hacer en Spark (Hadoop), ya que cada partición debe considerarse independiente y un número de línea global rompería esta suposición.

Si realmente necesita trabajar con números de línea, le recomiendo que los agregue al archivo fuera de Spark (consulte aquí ) y luego simplemente filtre por esta columna dentro de Spark.

Edición : Se zipWithIndex solución zipWithIndex como lo sugirió @Daniel Darabos.

 sc.textFile('test.txt')\ .zipWithIndex()\ # [(u'First', 0), (u'Second', 1), ... .filter(lambda x: x[1]!=5)\ # select columns .map(lambda x: x[0])\ # [u'First', u'Second' .collect()