Tengo un conjunto de archivos. La ruta a los archivos se guardan en un archivo., Diga “all_files.txt”. Usando apache spark, necesito hacer una operación en todos los archivos y agrupar los resultados.
Los pasos que quiero hacer son:
Este es el código que escribí para el mismo:
def return_contents_from_file (file_name): return spark.read.text(file_name).rdd.map(lambda r: r[0]) def run_spark(): file_name = 'path_to_file' spark = SparkSession \ .builder \ .appName("PythonWordCount") \ .getOrCreate() counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) \ # this line is supposed to return the paths to each file .flatMap(return_contents_from_file) \ # here i am expecting to club all the contents of all files .flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files
Esto está lanzando el error:
línea 323, en get_return_value py4j.protocol.Py4JError: Ocurrió un error al llamar a o25. getnewargs . Rastreo: py4j.Py4JExcepción: El método getnewargs ([]) no existe en py4j.reflection.ReflectionEngine.getMethod (ReflectionEngine.java:318) en py4j.reflection.getMethod (ReflectionEngine.payas.payas). invoke (Gateway.java:272) en py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.java:132) en py4j.commands.CallCommand.execute (CallCommand.java:79) en py4j.GatewayConnection.run (GatewayConnection.java) ) en java.lang.Thread.run (Thread.java:745)
¿Puede alguien decirme qué estoy haciendo mal y cómo debo seguir adelante? Gracias por adelantado.
No se permite el uso de spark
dentro de flatMap
o cualquier transformación que ocurra en los ejecutores (la sesión de spark
está disponible solo en el controlador). Tampoco es posible crear RDD de RDD (ver: ¿Es posible crear RDD nesteds en Apache Spark? )
Pero puede lograr esta transformación de otra manera: lea todo el contenido de all_files.txt
en el dataframe, use el map
local para convertirlos en marcos de datos y reduce
localmente para unirlos todos, vea el ejemplo:
>>> filenames = spark.read.text('all_files.txt').collect() >>> dataframes = map(lambda r: spark.read.text(r[0]), filenames) >>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)