Uso del objeto Python personalizado en Pyspark UDF

Cuando se ejecuta la siguiente pieza de código PySpark:

nlp = NLPFunctions() def parse_ingredients(ingredient_lines): parsed_ingredients = nlp.getingredients_bulk(ingredient_lines)[0] return list(chain.from_iterable(parsed_ingredients)) udf_parse_ingredients = UserDefinedFunction(parse_ingredients, ArrayType(StringType())) 

Recibo el siguiente error: _pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects

Me imagino que esto se debe a que PySpark no puede serializar esta clase personalizada. Pero, ¿cómo puedo evitar la sobrecarga de crear una instancia de este objeto costoso en cada ejecución de la función parse_ingredients_line ?

Supongamos que desea utilizar la clase de Identity definida de esta manera ( identity.py ):

 class Identity(object): def __getstate__(self): raise NotImplementedError("Not serializable") def identity(self, x): return x 

Por ejemplo, puede usar un objeto que se puede f.py ( f.py ) y almacenar una instancia de Identity como miembro de la clase:

 from identity import Identity class F(object): identity = None def __call__(self, x): if not F.identity: F.identity = Identity() return F.identity.identity(x) 

y use estos como se muestra a continuación:

 from pyspark.sql.functions import udf import f sc.addPyFile("identity.py") sc.addPyFile("f.py") f_ = udf(fF()) spark.range(3).select(f_("id")).show() 
 +-----+ |F(id)| +-----+ | 0| | 1| | 2| +-----+ 

o función autónoma y cierre:

 from pyspark.sql.functions import udf import identity sc.addPyFile("identity.py") def f(): dict_ = {} @udf() def f_(x): if "identity" not in dict_: dict_["identity"] = identity.Identity() return dict_["identity"].identity(x) return f_ spark.range(3).select(f()("id")).show() 
 +------+ |f_(id)| +------+ | 0| | 1| | 2| +------+ 

Lo resolví basándose en ( https://github.com/scikit-learn/scikit-learn/issues/6975 ) haciendo que todas las dependencias de la clase NLPFunctions sean serializables.

Editar: esta respuesta es incorrecta. El objeto todavía se serializa y luego se des-serializa cuando se transmite, por lo que no se evita la serialización. (¿ Consejos para usar correctamente grandes variables de transmisión? )


Trate de usar una variable de difusión .

 sc = SparkContext() nlp_broadcast = sc.broadcast(nlp) # Stores nlp in de-serialized format. def parse_ingredients(ingredient_lines): parsed_ingredients = nlp_broadcast.value.getingredients_bulk(ingredient_lines)[0] return list(chain.from_iterable(parsed_ingredients))