Cómo usar una clase Scala dentro de Pyspark

He estado buscando por un tiempo si hay alguna forma de usar una clase Scala en Pyspark , y no he encontrado ninguna documentación ni guía sobre este tema.

Digamos que creo una clase simple en Scala que usa algunas bibliotecas de apache-spark , algo como:

 class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) { def exe(): DataFrame = { import sqlContext.implicits._ df.select(col(column)) } } 
  • ¿Hay alguna forma posible de usar esta clase en Pyspark ?
  • ¿Es demasiado duro?
  • ¿Tengo que crear un archivo .py ?
  • ¿Hay alguna guía que muestre cómo hacer eso?

Por cierto, también miré el código de la spark y me sentí un poco perdido, y fui incapaz de replicar su funcionalidad para mi propio propósito.

Sí es posible aunque puede estar lejos de ser trivial. Por lo general, desea un envoltorio de Java (amigable) para que no tenga que lidiar con las características de Scala que no se pueden express fácilmente con Java simple y, como resultado, no funciona bien con la puerta de enlace Py4J.

Suponiendo que su clase es int en el paquete com.example y tenga un DataFrame Python llamado df

 df = ... # Python DataFrame 

tendrás que:

  1. Construye un flask usando tu herramienta de construcción favorita .

  2. --driver-class-path en la ruta de --driver-class-path del controlador, por ejemplo, utilizando el --driver-class-path para el shell / spark-submit PySpark. Dependiendo del código exacto, es posible que tenga que pasarlo usando --jars también

  3. Extraiga la instancia de JVM de una instancia de Python SparkContext :

     jvm = sc._jvm 
  4. Extraiga Scala SQLContext de una instancia de SQLContext :

     ssqlContext = sqlContext._ssql_ctx 
  5. Extraer Java DataFrame de la df :

     jdf = df._jdf 
  6. Crear nueva instancia de SimpleClass :

     simpleObject = jvm.com.example.SimpleClass(ssqlContext, jdf, "v") 
  7. Llame al método exe y envuelva el resultado usando Python DataFrame :

     from pyspark.sql import DataFrame DataFrame(simpleObject.exe(), ssqlContext) 

El resultado debe ser un marco de DataFrame PySpark DataFrame . Por supuesto, puede combinar todos los pasos en una sola llamada.

Importante : este enfoque solo es posible si el código Python se ejecuta únicamente en el controlador. No se puede utilizar dentro de la acción o transformación de Python. Consulte ¿Cómo utilizar la función Java / Scala desde una acción o una transformación? para detalles.