Uso de la función Reducir () de Python para unir múltiples marcos de datos de PySpark

¿Alguien sabe por qué el uso de functools.reduce() de functools.reduce() podría llevar a un peor rendimiento al unir múltiples DataFrames de PySpark que unir iterativamente los mismos DataFrames usando un bucle for ? Específicamente, esto produce una desaceleración masiva seguida de un error de memoria insuficiente:

 def join_dataframes(list_of_join_columns, left_df, right_df): return left_df.join(right_df, on=list_of_join_columns) joined_df = functools.reduce( functools.partial(join_dataframes, list_of_join_columns), list_of_dataframes, ) 

mientras que este no lo hace:

 joined_df = list_of_dataframes[0] joined_df.cache() for right_df in list_of_dataframes[1:]: joined_df = joined_df.join(right_df, on=list_of_join_columns) 

Cualquier idea sería apreciada grandemente ¡Gracias!

Una razón es que una reducción o un pliegue suele ser funcionalmente puro: el resultado de cada operación de acumulación no se escribe en la misma parte de la memoria, sino en un nuevo bloque de memoria.

En principio, el recolector de basura podría liberar el bloque anterior después de cada acumulación, pero si no lo hace, asignará memoria para cada versión actualizada del acumulador.

Siempre y cuando use CPython (diferentes implementaciones pueden, pero en realidad no deberían mostrar un comportamiento significativamente diferente en este caso específico). Si analiza la implementación reducida, verá que es solo un bucle for con un mínimo manejo de excepciones.

El núcleo es exactamente equivalente al bucle que usas

 for element in it: value = function(value, element) 

y no hay evidencia que respalde las afirmaciones de ningún comportamiento especial.

Además, pruebas sencillas con un número de fotogtwigs. Limitaciones prácticas de las combinaciones Spark (las combinaciones son una de las operaciones más caras en Spark )

 dfs = [ spark.range(10000).selectExpr( "rand({}) AS id".format(i), "id AS value", "{} AS loop ".format(i) ) for i in range(200) ] 

No se muestran diferencias significativas en el tiempo entre el bucle for directo

 def f(dfs): df1 = dfs[0] for df2 in dfs[1:]: df1 = df1.join(df2, ["id"]) return df1 %timeit -n3 f(dfs) ## 6.25 s ± 257 ms per loop (mean ± std. dev. of 7 runs, 3 loops each) 

y reduce invocación

 from functools import reduce def g(dfs): return reduce(lambda x, y: x.join(y, ["id"]), dfs) %timeit -n3 g(dfs) ### 6.47 s ± 455 ms per loop (mean ± std. dev. of 7 runs, 3 loops each) 

De manera similar, los patrones de comportamiento de JVM en general son comparables entre los ciclos for-loop.

Para CPU de bucle y uso de memoria – VisualVM

y reduce

reducir el uso de la CPU y la memoria – VisualVM

Finalmente ambos generan planes de ejecución idénticos.

 g(dfs)._jdf.queryExecution().optimizedPlan().equals( f(dfs)._jdf.queryExecution().optimizedPlan() ) ## True 

lo que indica que no hay diferencia cuando se evalúan los planes y es probable que se produzcan OOM.

En otras palabras, la correlación no implica causalidad, y es poco probable que los problemas de rendimiento observados estén relacionados con el método que utiliza para combinar DataFrames .