Desbordamiento de stack mientras se procesan varias columnas con un UDF

Tengo un DataFrame con muchas columnas de tipo str , y quiero aplicar una función a todas esas columnas, sin cambiar el nombre de sus nombres ni agregar más columnas. Intenté usar un bucle for-in ejecutando withColumn (ver ejemplo abajo), pero normalmente cuando ejecuto el código, muestra un Stack Overflow (rara vez funciona), este DataFrame no es grande en absoluto, tiene solo ~ 15000 registros.

 # df is a DataFrame def lowerCase(string): return string.strip().lower() lowerCaseUDF = udf(lowerCase, StringType()) for (columnName, kind) in df.dtypes: if(kind == "string"): df = df.withColumn(columnName, lowerCaseUDF(df[columnName])) df.select("Tipo_unidad").distinct().show() 

El error completo es muy largo, por lo tanto decidí pegar solo algunas líneas. Pero puedes encontrar la traza completa aquí Traza completa

Py4JJavaError: Se produjo un error al llamar a o516.showString. : org.apache.spark.SparkException: trabajo abortado debido a una falla en la etapa: la tarea 1 en la etapa 2.0 falló 4 veces, la falla más reciente: tarea perdida 1.3 en la etapa 2.0 (TID 38, worker2.mcbo.mood.com.ve): java.lang.StackOverflowError at java.io.ObjectInputStream $ BlockDataInputStream.readByte (ObjectInputStream.java:2774)

Estoy pensando que este problema se produce porque este código inicia muchos trabajos (uno para cada columna de tipo string ), ¿podría mostrarme otra alternativa o lo que estoy haciendo mal?

Intenta algo como esto:

 from pyspark.sql.functions import col, lower, trim exprs = [ lower(trim(col(c))).alias(c) if t == "string" else col(c) for (c, t) in df.dtypes ] df.select(*exprs) 

Este enfoque tiene dos ventajas principales sobre su solución actual:

  • requiere solo una proyección única (no hay un linaje en crecimiento que sea el responsable más probable de SO) en lugar de una proyección por columna de cadena.
  • opera directamente solo una representación interna sin pasar datos a Python ( BatchPythonProcessing ).