¿Por qué puedo pasar un método de instancia a multiprocessing.Process, pero no a multiprocessing.Pool?

Estoy intentando escribir una aplicación que aplica una función simultáneamente con un multiprocessing.Pool . Me gustaría que esta función fuera un método de instancia (para poder definirla de manera diferente en diferentes subclases). Esto no parece ser posible; Como he aprendido en otros lugares, los métodos aparentemente no pueden ser decapados . Entonces, ¿por qué el inicio de un proceso de multiprocessing.Process con un método enlazado funciona como un objective? El siguiente código:

 import multiprocessing def test1(): print "Hello, world 1" def increment(x): return x + 1 class testClass(): def process(self): process1 = multiprocessing.Process(target=test1) process1.start() process1.join() process2 = multiprocessing.Process(target=self.test2) process2.start() process2.join() def pool(self): pool = multiprocessing.Pool(1) for answer in pool.imap(increment, range(10)): print answer print for answer in pool.imap(self.square, range(10)): print answer def test2(self): print "Hello, world 2" def square(self, x): return x * x def main(): c = testClass() c.process() c.pool() if __name__ == "__main__": main() 

Produce esta salida:

 Hello, world 1 Hello, world 2 1 2 3 4 5 6 7 8 9 10 Exception in thread Thread-2: Traceback (most recent call last): File "C:\Python27\Lib\threading.py", line 551, in __bootstrap_inner self.run() File "C:\Python27\Lib\threading.py", line 504, in run self.__target(*self.__args, **self.__kwargs) File "C:\Python27\Lib\multiprocessing\pool.py", line 319, in _handle_tasks put(task) PicklingError: Can't pickle : attribute lookup __builtin__.instancemethod failed 

¿Por qué los procesos pueden manejar los métodos enlazados, pero no los grupos?

El módulo de pickle normalmente no puede pickle métodos de instancia:

 >>> import pickle >>> class A(object): ... def z(self): print "hi" ... >>> a = A() >>> pickle.dumps(az) Traceback (most recent call last): File "", line 1, in  File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps Pickler(file, protocol).dump(obj) File "/usr/local/lib/python2.7/pickle.py", line 224, in dump self.save(obj) File "/usr/local/lib/python2.7/pickle.py", line 306, in save rv = reduce(self.proto) File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex raise TypeError, "can't pickle %s objects" % base.__name__ TypeError: can't pickle instancemethod objects 

Sin embargo, el módulo de multiprocessing tiene un Pickler personalizado que agrega algún código para habilitar esta función :

 # # Try making some callable types picklable # from pickle import Pickler class ForkingPickler(Pickler): dispatch = Pickler.dispatch.copy() @classmethod def register(cls, type, reduce): def dispatcher(self, obj): rv = reduce(obj) self.save_reduce(obj=obj, *rv) cls.dispatch[type] = dispatcher def _reduce_method(m): if m.im_self is None: return getattr, (m.im_class, m.im_func.func_name) else: return getattr, (m.im_self, m.im_func.func_name) ForkingPickler.register(type(ForkingPickler.save), _reduce_method) 

Puede replicar esto usando el módulo copy_reg para verlo funcionar por sí mismo:

 >>> import copy_reg >>> def _reduce_method(m): ... if m.im_self is None: ... return getattr, (m.im_class, m.im_func.func_name) ... else: ... return getattr, (m.im_self, m.im_func.func_name) ... >>> copy_reg.pickle(type(az), _reduce_method) >>> pickle.dumps(az) "c__builtin__\ngetattr\np0\n(ccopy_reg\n_reconstructor\np1\n(c__main__\nA\np2\nc__builtin__\nobject\np3\nNtp4\nRp5\nS'z'\np6\ntp7\nRp8\n." 

Cuando usas Process.start para generar un nuevo proceso en Windows, selecciona todos los parámetros que ForkingPickler proceso hijo usando este ForkingPickler personalizado :

 # # Windows # else: # snip... from pickle import load, HIGHEST_PROTOCOL def dump(obj, file, protocol=None): ForkingPickler(file, protocol).dump(obj) # # We define a Popen class similar to the one from subprocess, but # whose constructor takes a process object as its argument. # class Popen(object): ''' Start a subprocess to run the code of a process object ''' _tls = thread._local() def __init__(self, process_obj): # create pipe for communication with child rfd, wfd = os.pipe() # get handle for read end of the pipe and make it inheritable ... # start process ... # set attributes of self ... # send information to child prep_data = get_preparation_data(process_obj._name) to_child = os.fdopen(wfd, 'wb') Popen._tls.process_handle = int(hp) try: dump(prep_data, to_child, HIGHEST_PROTOCOL) dump(process_obj, to_child, HIGHEST_PROTOCOL) finally: del Popen._tls.process_handle to_child.close() 

Tenga en cuenta la sección “enviar información al niño”. Está utilizando la función de dump , que usa ForkingPickler para ForkingPickler los datos, lo que significa que su método de instancia puede ser decapado.

Ahora, cuando usa métodos de multiprocessing.Pool de enviar un método a un proceso secundario, está utilizando un multiprocessing.Pipe Tubería para recoger los datos. En Python 2.7, multiprocessing.Pipe se implementa en C, y llama a pickle_dumps directamente , por lo que no aprovecha el ForkingPickler . Eso significa que el método de instancia de decapado no funciona.

Sin embargo, si utiliza copy_reg para registrar el tipo de copy_reg de instancemethod , en lugar de un Pickler personalizado, todos los bashs de decapado se verán afectados. Así que puedes usar eso para habilitar métodos de instancia de decapado, incluso a través de Pool :

 import multiprocessing import copy_reg import types def _reduce_method(m): if m.im_self is None: return getattr, (m.im_class, m.im_func.func_name) else: return getattr, (m.im_self, m.im_func.func_name) copy_reg.pickle(types.MethodType, _reduce_method) def test1(): print("Hello, world 1") def increment(x): return x + 1 class testClass(): def process(self): process1 = multiprocessing.Process(target=test1) process1.start() process1.join() process2 = multiprocessing.Process(target=self.test2) process2.start() process2.join() def pool(self): pool = multiprocessing.Pool(1) for answer in pool.imap(increment, range(10)): print(answer) print for answer in pool.imap(self.square, range(10)): print(answer) def test2(self): print("Hello, world 2") def square(self, x): return x * x def main(): c = testClass() c.process() c.pool() if __name__ == "__main__": main() 

Salida:

 Hello, world 1 Hello, world 2 GOT (0, 0, (True, 1)) GOT (0, 1, (True, 2)) GOT (0, 2, (True, 3)) GOT (0, 3, (True, 4)) GOT (0, 4, (True, 5)) 1GOT (0, 5, (True, 6)) GOT (0, 6, (True, 7)) 2 GOT (0, 7, (True, 8)) 3 GOT (0, 8, (True, 9)) GOT (0, 9, (True, 10)) 4 5 6 7 8 9 10 GOT (1, 0, (True, 0)) 0 GOT (1, 1, (True, 1)) 1 GOT (1, 2, (True, 4)) 4 GOT (1, 3, (True, 9)) 9 GOT (1, 4, (True, 16)) 16 GOT (1, 5, (True, 25)) 25 GOT (1, 6, (True, 36)) 36 GOT (1, 7, (True, 49)) 49 GOT (1, 8, (True, 64)) 64 GOT (1, 9, (True, 81)) 81 GOT None 

También tenga en cuenta que en Python 3.x, pickle puede pickle tipos de métodos de instancia de forma nativa, por lo que nada de esto importa más. 🙂

Aquí hay una alternativa que uso algunas veces, y funciona en Python2.x:

Puede crear un “alias” de nivel superior para los métodos de instancia, que acepte un objeto cuyos métodos de instancia desea ejecutar en un grupo, y haga que llame a los métodos de instancia por usted:

 import functools import multiprocessing def _instance_method_alias(obj, arg): """ Alias for instance method that allows the method to be called in a multiprocessing pool """ obj.instance_method(arg) return class MyClass(object): """ Our custom class whose instance methods we want to be able to use in a multiprocessing pool """ def __init__(self): self.my_string = "From MyClass: {}" def instance_method(self, arg): """ Some arbitrary instance method """ print(self.my_string.format(arg)) return # create an object of MyClass obj = MyClass() # use functools.partial to create a new method that always has the # MyClass object passed as its first argument _bound_instance_method_alias = functools.partial(_instance_method_alias, obj) # create our list of things we will use the pool to map l = [1,2,3] # create the pool of workers pool = multiprocessing.Pool() # call pool.map, passing it the newly created function pool.map(_bound_instance_method_alias, l) # cleanup pool.close() pool.join() 

Este código produce esta salida:

De MyClass: 1
De MyClass: 2
De MyClass: 3

Una limitación es que no puede usar esto para los métodos que modifican el objeto. Cada proceso obtiene una copia del objeto al que está llamando los métodos, por lo que los cambios no se propagarán al proceso principal. Si no necesita modificar el objeto de los métodos que está llamando, esta puede ser una solución simple.

Esta es una forma más fácil de trabajar en Python 2, simplemente envuelva el método de instancia original. Funciona bien en MacOSX y Linux, no funciona en Windows, Python 2.7 probado

 from multiprocessing import Pool class Person(object): def __init__(self): self.name = 'Weizhong Tu' def calc(self, x): print self.name return x ** 5 def func(x, p=Person()): return p.calc(x) pool = Pool() print pool.map(func, range(10))