Spark no puede declinar method_descriptor

Me sale este mensaje de error extraño

15/01/26 13:05:12 INFO spark.SparkContext: Created broadcast 0 from wholeTextFiles at NativeMethodAccessorImpl.java:-2 Traceback (most recent call last): File "/home/user/inverted-index.py", line 78, in  print sc.wholeTextFiles(data_dir).flatMap(update).top(10)#groupByKey().map(store) File "/home/user/spark2/python/pyspark/rdd.py", line 1045, in top return self.mapPartitions(topIterator).reduce(merge) File "/home/user/spark2/python/pyspark/rdd.py", line 715, in reduce vals = self.mapPartitions(func).collect() File "/home/user/spark2/python/pyspark/rdd.py", line 676, in collect bytesInJava = self._jrdd.collect().iterator() File "/home/user/spark2/python/pyspark/rdd.py", line 2107, in _jrdd pickled_command = ser.dumps(command) File "/home/user/spark2/python/pyspark/serializers.py", line 402, in dumps return cloudpickle.dumps(obj, 2) File "/home/user/spark2/python/pyspark/cloudpickle.py", line 816, in dumps cp.dump(obj) File "/home/user/spark2/python/pyspark/cloudpickle.py", line 133, in dump return pickle.Pickler.dump(self, obj) File "/usr/lib/python2.7/pickle.py", line 224, in dump self.save(obj) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 562, in save_tuple save(element) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/home/user/spark2/python/pyspark/cloudpickle.py", line 254, in save_function self.save_function_tuple(obj, [themodule]) File "/home/user/spark2/python/pyspark/cloudpickle.py", line 304, in save_function_tuple save((code, closure, base_globals)) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple save(element) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 600, in save_list self._batch_appends(iter(obj)) File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends save(x) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/home/user/spark2/python/pyspark/cloudpickle.py", line 254, in save_function self.save_function_tuple(obj, [themodule]) File "/home/user/spark2/python/pyspark/cloudpickle.py", line 304, in save_function_tuple save((code, closure, base_globals)) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple save(element) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 600, in save_list self._batch_appends(iter(obj)) File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends save(x) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/home/user/spark2/python/pyspark/cloudpickle.py", line 254, in save_function self.save_function_tuple(obj, [themodule]) File "/home/user/spark2/python/pyspark/cloudpickle.py", line 304, in save_function_tuple save((code, closure, base_globals)) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple save(element) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 600, in save_list self._batch_appends(iter(obj)) File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends save(tmp[0]) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/home/user/spark2/python/pyspark/cloudpickle.py", line 249, in save_function self.save_function_tuple(obj, modList) File "/home/user/spark2/python/pyspark/cloudpickle.py", line 309, in save_function_tuple save(f_globals) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict pickle.Pickler.save_dict(self, obj) File "/usr/lib/python2.7/pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/home/user/spark2/python/pyspark/cloudpickle.py", line 650, in save_reduce save(state) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict pickle.Pickler.save_dict(self, obj) File "/usr/lib/python2.7/pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/home/user/spark2/python/pyspark/cloudpickle.py", line 650, in save_reduce save(state) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict pickle.Pickler.save_dict(self, obj) File "/usr/lib/python2.7/pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/home/user/spark2/python/pyspark/cloudpickle.py", line 650, in save_reduce save(state) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict pickle.Pickler.save_dict(self, obj) File "/usr/lib/python2.7/pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/home/user/spark2/python/pyspark/cloudpickle.py", line 547, in save_inst self.save_inst_logic(obj) File "/home/user/spark2/python/pyspark/cloudpickle.py", line 537, in save_inst_logic save(stuff) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict pickle.Pickler.save_dict(self, obj) File "/usr/lib/python2.7/pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/home/user/spark2/python/pyspark/cloudpickle.py", line 547, in save_inst self.save_inst_logic(obj) File "/home/user/spark2/python/pyspark/cloudpickle.py", line 537, in save_inst_logic save(stuff) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict pickle.Pickler.save_dict(self, obj) File "/usr/lib/python2.7/pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/home/user/spark2/python/pyspark/cloudpickle.py", line 616, in save_reduce save(cls) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/home/user/spark2/python/pyspark/cloudpickle.py", line 467, in save_global d),obj=obj) File "/home/user/spark2/python/pyspark/cloudpickle.py", line 631, in save_reduce save(args) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple save(element) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict pickle.Pickler.save_dict(self, obj) File "/usr/lib/python2.7/pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/home/user/spark2/python/pyspark/cloudpickle.py", line 616, in save_reduce save(cls) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/home/user/spark2/python/pyspark/cloudpickle.py", line 442, in save_global raise pickle.PicklingError("Can't pickle builtin %s" % obj) pickle.PicklingError: Can't pickle builtin  

Mi función de actualización devuelve una lista de tuplas de tipo (key, (value1, value2)) y todas ellas son cadenas como se ve a continuación:

 def update(doc): doc_id = doc[0][path_len:-ext_len] #actual file name content = doc[1].lower() new_fi = regex.split(content) old_fi = fi_table.row(doc_id) fi_table.put(doc_id, {'cf:col': ",".join(new_fi)}) if not old_fi: return [(term, ('add', doc_id)) for term in new_fi] else: new_fi = set(new_fi) old_fi = set(old_fi['cf:col'].split(',')) return [(term, ('add', doc_id)) for term in new_fi - old_fi] + \ [(term, ('del', doc_id)) for term in old_fi - new_fi] 

EDITAR: El problema radica en estas 2 funciones de hbase, la fila y la colocación. Cuando les comento, el código funciona (configurando old_fi como un diccionario vacío) pero si uno de ellos se ejecuta, produce el error anterior. Uso happybase para operar hbase en python. ¿Alguien me puede explicar qué va mal?

Spark intenta serializar el objeto de conexión para que pueda usarse dentro de los ejecutores, lo que seguramente fallará porque un objeto db connect deserializado no puede otorgar permiso de lectura / escritura a otro ámbito (o incluso a una computadora). El problema se puede reproducir intentando transmitir el objeto de conexión. Para esta instancia, hubo un problema al serializar un objeto de E / S.

El problema se resolvió en parte conectándose a la base de datos dentro de las funciones del mapa. Dado que habrá demasiadas conexiones para cada elemento RDD en la función de mapa, tuve que cambiar al procesamiento de partición para reducir las conexiones de db de 20k a aproximadamente 8-64 (según el número de particiones). Los desarrolladores de Spark deberían considerar crear una función / script de inicialización para los ejecutores para evitar este tipo de problemas de callejón sin salida.

Entonces, digamos que tengo esta función de inicio ejecutada por cada nodo, luego todos los nodos se conectarán a la base de datos (un grupo de conexiones o nodos de cuidadores del zoológico separados) porque la función de inicio y las funciones de mapa compartirán el mismo ámbito, y luego el problema se ha ido, por lo que escribe código más rápido que la solución que encontré. Al final de la ejecución, la chispa liberará / descargará estas variables definidas y el progtwig finalizará.

Si realmente es un problema de decapado para un MethodDescriptorType, puede registrar cómo pickle un MethodDescriptorType, con esto:

 def _getattr(objclass, name, repr_str): # hack to grab the reference directly try: attr = repr_str.split("'")[3] return eval(attr+'.__dict__["'+name+'"]') except: attr = getattr(objclass,name) if name == '__dict__': attr = attr[name] return attar def save_wrapper_descriptor(pickler, obj): pickler = Pickler(file, protocol) pickler.save_reduce(_getattr, (obj.__objclass__, obj.__name__, obj.__repr__()), obj=obj) return # register the following "type" with: # Pickler.dispatch[MethodDescriptorType] = save_wrapper_descriptor MethodDescriptorType = type(type.__dict__['mro']) 

Luego, si registra lo anterior en la tabla de despacho de decapado que utiliza la spark (como se muestra arriba, o con copy_reg ), puede pasar el error de decapado.