PySpark: Uso de objetos en RDD

Actualmente estoy aprendiendo Python y quiero aplicarlo en / con Spark. Tengo este script muy simple (e inútil):

import sys from pyspark import SparkContext class MyClass: def __init__(self, value): self.v = str(value) def addValue(self, value): self.v += str(value) def getValue(self): return self.v if __name__ == "__main__": if len(sys.argv) != 1: print("Usage CC") exit(-1) data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4] sc = SparkContext(appName="WordCount") d = sc.parallelize(data) inClass = d.map(lambda input: (input, MyClass(input))) reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue)) print(reduzed.collect()) 

Al ejecutarlo con

spark-submit CustomClass.py

..el siguiente error es Thorwn (salida acortada):

 Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream for obj in iterator: File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1728, in add_shuffle_key File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 415, in dumps return pickle.dumps(obj, protocol) PicklingError: Can't pickle __main__.MyClass: attribute lookup __main__.MyClass failed at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)... 

A mi la statement

PicklingError: Can't pickle __main__.MyClass: attribute lookup __main__.MyClass failed

parece ser importante Significa que las instancias de clase no pueden ser serializadas, ¿verdad? ¿Sabes cómo resolver este problema?

Gracias y saludos

Hay una serie de cuestiones:

  • Si coloca MyClass en un archivo separado, puede ser decapado. Este es un problema común para muchos usos Python de pickle. Esto es fácil de resolver moviendo MyClass y usando from myclass import MyClass . Normalmente, el dill puede solucionar estos problemas (como en la import dill as pickle ), pero no funcionó para mí aquí.
  • Una vez que esto se resuelve, su reducción no funciona desde que se llama a addValue return None (no return), no una instancia de MyClass . Necesitas cambiar addValue para devolverte.
  • Por último, la lambda debe llamar a getValue , por lo que debe tener a.addValue(b.getValue())

Juntos: myclass.py

 class MyClass: def __init__(self, value): self.v = str(value) def addValue(self, value): self.v += str(value) return self def getValue(self): return self.v 

main.py

 import sys from pyspark import SparkContext from myclass import MyClass if __name__ == "__main__": if len(sys.argv) != 1: print("Usage CC") exit(-1) data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4] sc = SparkContext(appName="WordCount") d = sc.parallelize(data) inClass = d.map(lambda input: (input, MyClass(input))) reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue())) print(reduzed.collect())