Usando UDF ignora la condición cuando

Supongamos que tiene el siguiente DataFrame pyspark:

data= [('foo',), ('123',), (None,), ('bar',)] df = sqlCtx.createDataFrame(data, ["col"]) df.show() #+----+ #| col| #+----+ #| foo| #| 123| #|null| #| bar| #+----+ 

Los siguientes dos bloques de código deberían hacer lo mismo, es decir, devolver la mayúscula de la columna si no es null . Sin embargo, el segundo método (usando un udf ) produce un error.

Método 1 : utilizar pyspark.sql.functions.upper()

 import pyspark.sql.functions as f df.withColumn( 'upper', f.when( f.isnull(f.col('col')), f.col('col') ).otherwise(f.upper(f.col('col'))) ).show() #+----+-----+ #| col|upper| #+----+-----+ #| foo| FOO| #| 123| 123| #|null| null| #| bar| BAR| #+----+-----+ 

Método 2 : Usar str.upper() dentro de un udf

 df.withColumn( 'upper', f.when( f.isnull(f.col('col')), f.col('col') ).otherwise(f.udf(lambda x: x.upper(), StringType())(f.col('col'))) ).show() 

Esto me da AttributeError: 'NoneType' object has no attribute 'upper' . ¿Por qué se f.isnull() en la llamada when aparentemente se ignora?

Sé que puedo cambiar mi udf a f.udf(lambda x: x.upper() if x else x, StringType()) para evitar este error, pero me gustaría entender por qué está sucediendo.

Rastreo completo :

 Py4JJavaErrorTraceback (most recent call last)  in () 4 f.isnull(f.col('col')), 5 f.col('col') ----> 6 ).otherwise(f.udf(lambda x: x.upper(), StringType())(f.col('col'))) 7 ).show() /opt/SPARK2/lib/spark2/python/pyspark/sql/dataframe.py in show(self, n, truncate) 316 """ 317 if isinstance(truncate, bool) and truncate: --> 318 print(self._jdf.showString(n, 20)) 319 else: 320 print(self._jdf.showString(n, int(truncate))) /opt/SPARK2/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args: /opt/SPARK2/lib/spark2/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() /opt/SPARK2/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 317 raise Py4JJavaError( 318 "An error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else: 321 raise Py4JError( Py4JJavaError: An error occurred while calling o642.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 77.0 failed 4 times, most recent failure: Lost task 51.3 in stage 77.0 (TID 5101, someserver.prod.somecompany.net, executor 99): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main process() File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 106, in  func = lambda _, it: map(mapper, it) File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 92, in  mapper = lambda a: udf(*a) File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 70, in  return lambda *a: f(*a) File "", line 6, in  AttributeError: 'NoneType' object has no attribute 'upper' 

Related of "Usando UDF ignora la condición cuando"

Tienes que recordar que Spark SQL (a diferencia de RDD) no es lo que ves, es lo que obtienes. El optimizador / planificador es libre de progtwigr operaciones en el orden arbitrario o incluso repetir las etapas varias veces.

Los udfs Python no se aplican en una base de Row , sino que se utilizan en modo batch. when no se ignora mucho, pero no se utiliza para optimizar el plan de ejecución:

 == Physical Plan == *Project [col#0, CASE WHEN isnull(col#0) THEN col#0 ELSE pythonUDF0#21 END AS upper#17] +- BatchEvalPython [(col#0)], [col#0, pythonUDF0#21] +- Scan ExistingRDD[col#0] 

Por lo tanto, la función utilizada con udf debe ser robusta a las entradas None , por ejemplo:

 df.withColumn( 'upper', f.udf( lambda x: x.upper() if x is not None else None, StringType() )(f.col('col')) ).show()