¿Cómo usar Scala UDF en PySpark?

Quiero poder usar una función de Scala como UDF en PySpark

package com.test object ScalaPySparkUDFs extends Serializable { def testFunction1(x: Int): Int = { x * 2 } def testUDFFunction1 = udf { x: Int => testFunction1(x) } } 

Puedo acceder a testFunction1 en PySpark y hacer que devuelva valores:

 functions = sc._jvm.com.test.ScalaPySparkUDFs functions.testFunction1(10) 

Lo que quiero poder hacer es usar esta función como un UDF, idealmente en una llamada withColumn :

 row = Row("Value") numbers = sc.parallelize([1,2,3,4]).map(row).toDF() numbers.withColumn("Result", testUDFFunction1(numbers['Value'])) 

Creo que un enfoque prometedor es el que se encuentra aquí: Spark: ¿Cómo asignar Python con Scala o Java User Defined Functions?

Sin embargo, al realizar los cambios en el código encontrado para usar testUDFFunction1 en testUDFFunction1 lugar:

 def udf_test(col): sc = SparkContext._active_spark_context _f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1.apply return Column(_f(_to_seq(sc, [col], _to_java_column))) 

Yo obtengo:

  AttributeError: 'JavaMember' object has no attribute 'apply' 

No entiendo esto porque creo que testUDFFunction1 tiene un método de aplicación.

No quiero usar expresiones del tipo que se encuentra aquí: Registre UDF a SqlContext desde Scala para usar en PySpark

Cualquier sugerencia sobre cómo hacer este trabajo sería apreciada!

La pregunta que has vinculado está utilizando un object Scala. El object Scala es un singleton y puede utilizar el método de apply directamente.

Aquí se usa una función nula que devuelve un objeto de la clase UserDefinedFunction co, primero debe llamar a la función:

 _f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1() # Note () at the end Column(_f.apply(_to_seq(sc, [col], _to_java_column))) 

De acuerdo con @ user6910411, debe llamar al método de aplicación directamente en la función. Por lo tanto, su código será.

UDF en Scala:

 import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions._ object ScalaPySparkUDFs { def testFunction1(x: Int): Int = { x * 2 } def getFun(): UserDefinedFunction = udf(testFunction1 _ ) } 

Código PySpark:

 def test_udf(col): sc = spark.sparkContext _test_udf = sc._jvm.com.test.ScalaPySparkUDFs.getFun() return Column(_test_udf.apply(_to_seq(sc, [col], _to_java_column))) row = Row("Value") numbers = sc.parallelize([1,2,3,4]).map(row).toDF() numbers.withColumn("Result", test_udf(numbers['Value']))