Problema al crear una lista global desde el mapa usando PySpark

Tengo este código donde estoy leyendo un archivo en ipython usando pyspark . Lo que estoy tratando de hacer es agregarle una pieza que forme una lista basada en una columna particular leída del archivo, pero cuando bash ejecutarlo, la lista aparece vacía y no se le anexa nada. Mi código es:

 list1 = [] def file_read(line): list1.append(line[10]) # bunch of other code which process other column indexes on `line` inputData = sc.textFile(fileName).zipWithIndex().filter(lambda (line,rownum): rownum>0).map(lambda (line, rownum): line) column_val = (inputData .map(lambda line: line.split(",")) .filter(lambda line: len(line) >1 ) .map(file_read)) 

Cuando ejecuto esta parte del código, la list1 todavía está vacía, aunque hay datos en la line[10] ya que la estoy utilizando en otras partes del código en la misma función anterior. Parece como si no lo estuviera agregando a la lista. ¿Cómo puedo formar la lista de arriba?

Bueno, en realidad se agrega a la list1 , el problema no es al que estás pensando. Todas las variables a las que se hace referencia en los cierres se serializan y se envían a los trabajadores. Se aplica a list1 también.

Cada partición recibe su propia copia de la list1 , cuando se llama file_read datos se agregan a esta copia, y cuando se finaliza una fase de mapa dada, queda fuera del scope y se descarta.

No es un código especialmente elegante, pero deberías ver que es realmente lo que está sucediendo aquí:

 rdd = sc.parallelize(range(100), 5) line1 = [] def file_read(line): list1.append(line) print len(list1) return line xs = rdd.map(file_read).collect() 

Editar

Spark proporciona dos tipos de variable compartida. Variables de difusión , que se leen solo desde la perspectiva del trabajador, y acumuladores que se escriben solo desde la perspectiva del conductor.

Por defecto, los acumuladores solo admiten variables numéricas y están pensados ​​para usarse principalmente como contadores. Sin embargo, es posible definir acumuladores personalizados. Para hacer esto, debe ampliar la clase AccumulatorParam y proporcionar implementaciones personalizadas de zero y addInPlace :

 class ListParam(AccumulatorParam): def zero(self, v): return [] def addInPlace(self, acc1, acc2): acc1.extend(acc2) return acc1 

A continuación, puede redefinir file_read siguiente manera:

 def file_read1(line): global list1 # Required otherwise the next line will fail list1 += [line] return line 

Ejemplo de uso:

 list1 = sc.accumulator([], ListParam()) rdd = sc.parallelize(range(10)).map(file_read1).collect() list1.value 

Incluso si es posible usar un acumulador como este, es probable que sea muy costoso usarlo en la práctica y en el peor de los casos puede bloquear el controlador. En su lugar, simplemente puede utilizar otra transformación:

 tmp = (inputData .map(lambda line: line.split(",")) .filter(lambda line: len(line) >1 )) def line_read2(line): return ... # Just a core logic line1 = tmp.map(lambda line: line[10]) column_val = tmp.map(line_read2) 

Nota al margen :

El código que has proporcionado no hace nada. Las transformaciones en Spark son solo las descripciones de lo que se debe hacer, pero hasta que no se llama un dato de acción, nada se ejecuta realmente.