Error de lanzamiento de PySpark El método __getnewargs __ () no existe

Tengo un conjunto de archivos. La ruta a los archivos se guardan en un archivo., Diga “all_files.txt”. Usando apache spark, necesito hacer una operación en todos los archivos y agrupar los resultados.

Los pasos que quiero hacer son:

  • Cree un RDD leyendo “all_files.txt”
  • Para cada línea en “all_files.txt” (Cada línea es una ruta de acceso a algún archivo), lea el contenido de cada uno de los archivos en un solo RDD
  • Entonces haz una operación con todos los contenidos.

Este es el código que escribí para el mismo:

def return_contents_from_file (file_name): return spark.read.text(file_name).rdd.map(lambda r: r[0]) def run_spark(): file_name = 'path_to_file' spark = SparkSession \ .builder \ .appName("PythonWordCount") \ .getOrCreate() counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) \ # this line is supposed to return the paths to each file .flatMap(return_contents_from_file) \ # here i am expecting to club all the contents of all files .flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files 

Esto está lanzando el error:

línea 323, en get_return_value py4j.protocol.Py4JError: Ocurrió un error al llamar a o25. getnewargs . Rastreo: py4j.Py4JExcepción: El método getnewargs ([]) no existe en py4j.reflection.ReflectionEngine.getMethod (ReflectionEngine.java:318) en py4j.reflection.getMethod (ReflectionEngine.payas.payas). invoke (Gateway.java:272) en py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.java:132) en py4j.commands.CallCommand.execute (CallCommand.java:79) en py4j.GatewayConnection.run (GatewayConnection.java) ) en java.lang.Thread.run (Thread.java:745)

¿Puede alguien decirme qué estoy haciendo mal y cómo debo seguir adelante? Gracias por adelantado.

No se permite el uso de spark dentro de flatMap o cualquier transformación que ocurra en los ejecutores (la sesión de spark está disponible solo en el controlador). Tampoco es posible crear RDD de RDD (ver: ¿Es posible crear RDD nesteds en Apache Spark? )

Pero puede lograr esta transformación de otra manera: lea todo el contenido de all_files.txt en el dataframe, use el map local para convertirlos en marcos de datos y reduce localmente para unirlos todos, vea el ejemplo:

 >>> filenames = spark.read.text('all_files.txt').collect() >>> dataframes = map(lambda r: spark.read.text(r[0]), filenames) >>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)