Usando UDF ignora la condición cuando

Supongamos que tiene el siguiente DataFrame pyspark:

data= [('foo',), ('123',), (None,), ('bar',)] df = sqlCtx.createDataFrame(data, ["col"]) df.show() #+----+ #| col| #+----+ #| foo| #| 123| #|null| #| bar| #+----+ 

Los siguientes dos bloques de código deberían hacer lo mismo, es decir, devolver la mayúscula de la columna si no es null . Sin embargo, el segundo método (usando un udf ) produce un error.

Método 1 : utilizar pyspark.sql.functions.upper()

 import pyspark.sql.functions as f df.withColumn( 'upper', f.when( f.isnull(f.col('col')), f.col('col') ).otherwise(f.upper(f.col('col'))) ).show() #+----+-----+ #| col|upper| #+----+-----+ #| foo| FOO| #| 123| 123| #|null| null| #| bar| BAR| #+----+-----+ 

Método 2 : Usar str.upper() dentro de un udf

 df.withColumn( 'upper', f.when( f.isnull(f.col('col')), f.col('col') ).otherwise(f.udf(lambda x: x.upper(), StringType())(f.col('col'))) ).show() 

Esto me da AttributeError: 'NoneType' object has no attribute 'upper' . ¿Por qué se f.isnull() en la llamada when aparentemente se ignora?

Sé que puedo cambiar mi udf a f.udf(lambda x: x.upper() if x else x, StringType()) para evitar este error, pero me gustaría entender por qué está sucediendo.

Rastreo completo :

 Py4JJavaErrorTraceback (most recent call last)  in () 4 f.isnull(f.col('col')), 5 f.col('col') ----> 6 ).otherwise(f.udf(lambda x: x.upper(), StringType())(f.col('col'))) 7 ).show() /opt/SPARK2/lib/spark2/python/pyspark/sql/dataframe.py in show(self, n, truncate) 316 """ 317 if isinstance(truncate, bool) and truncate: --> 318 print(self._jdf.showString(n, 20)) 319 else: 320 print(self._jdf.showString(n, int(truncate))) /opt/SPARK2/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args: /opt/SPARK2/lib/spark2/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() /opt/SPARK2/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 317 raise Py4JJavaError( 318 "An error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else: 321 raise Py4JError( Py4JJavaError: An error occurred while calling o642.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 77.0 failed 4 times, most recent failure: Lost task 51.3 in stage 77.0 (TID 5101, someserver.prod.somecompany.net, executor 99): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main process() File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 106, in  func = lambda _, it: map(mapper, it) File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 92, in  mapper = lambda a: udf(*a) File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 70, in  return lambda *a: f(*a) File "", line 6, in  AttributeError: 'NoneType' object has no attribute 'upper' 

Tienes que recordar que Spark SQL (a diferencia de RDD) no es lo que ves, es lo que obtienes. El optimizador / planificador es libre de progtwigr operaciones en el orden arbitrario o incluso repetir las etapas varias veces.

Los udfs Python no se aplican en una base de Row , sino que se utilizan en modo batch. when no se ignora mucho, pero no se utiliza para optimizar el plan de ejecución:

 == Physical Plan == *Project [col#0, CASE WHEN isnull(col#0) THEN col#0 ELSE pythonUDF0#21 END AS upper#17] +- BatchEvalPython [(col#0)], [col#0, pythonUDF0#21] +- Scan ExistingRDD[col#0] 

Por lo tanto, la función utilizada con udf debe ser robusta a las entradas None , por ejemplo:

 df.withColumn( 'upper', f.udf( lambda x: x.upper() if x is not None else None, StringType() )(f.col('col')) ).show()