Spark RDD – Mapeo con argumentos extra

¿Es posible pasar argumentos adicionales a la función de mapeo en pySpark? Específicamente, tengo la siguiente receta de código:

raw_data_rdd = sc.textFile("data.json", use_unicode=True) json_data_rdd = raw_data_rdd.map(lambda line: json.loads(line)) mapped_rdd = json_data_rdd.flatMap(processDataLine) 

La función processDataLine toma argumentos adicionales además del objeto JSON, como:

 def processDataLine(dataline, arg1, arg2) 

¿Cómo puedo pasar los argumentos adicionales arg1 y arg2 a la función flaMap ?

  1. Puedes usar una función anónima directamente en un flatMap

     json_data_rdd.flatMap(lambda j: processDataLine(j, arg1, arg2)) 

    o para curry processDataLine

     f = lambda j: processDataLine(dataline, arg1, arg2) json_data_rdd.flatMap(f) 
  2. Puede generar processDataLine esta manera:

     def processDataLine(arg1, arg2): def _processDataLine(dataline): return ... # Do something with dataline, arg1, arg2 return _processDataLine json_data_rdd.flatMap(processDataLine(arg1, arg2)) 
  3. toolz biblioteca de herramientas proporciona un útil decorador de curry :

     from toolz.functoolz import curry @curry def processDataLine(arg1, arg2, dataline): return ... # Do something with dataline, arg1, arg2 json_data_rdd.flatMap(processDataLine(arg1, arg2)) 

    Tenga en cuenta que he empujado el argumento de línea de datos a la última posición. No es obligatorio, pero de esta manera no tenemos que usar argumentos de palabras clave.

  4. Finalmente hay functools.partial ya mencionado por Avihoo Mamka en los comentarios.