Multiprocesamiento de Python PicklingError: No se puede encurtir

Lamento no poder reproducir el error con un ejemplo más simple, y mi código es demasiado complicado para publicar. Si ejecuto el progtwig en el shell de IPython en lugar del Python normal, las cosas funcionan bien.

Busqué algunas notas anteriores sobre este problema. Todos fueron causados ​​por el uso de la función pool to call definida dentro de una función class. Pero este no es el caso para mí.

Exception in thread Thread-3: Traceback (most recent call last): File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner self.run() File "/usr/lib64/python2.7/threading.py", line 505, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks put(task) PicklingError: Can't pickle : attribute lookup __builtin__.function failed 

Apreciaría cualquier ayuda.

Actualización : la función I pickle se define en el nivel superior del módulo. Aunque llama a una función que contiene una función anidada. es decir, f() llama a g() llama a h() que tiene una función anidada i() , y estoy llamando a pool.apply_async(f) . f() , g() , h() están todos definidos en el nivel superior. Intenté un ejemplo más simple con este patrón y funciona bien.

Aquí hay una lista de lo que puede ser encurtido . En particular, las funciones solo son seleccionables si están definidas en el nivel superior de un módulo.

Esta pieza de código:

 import multiprocessing as mp class Foo(): @staticmethod def work(self): pass if __name__ == '__main__': pool = mp.Pool() foo = Foo() pool.apply_async(foo.work) pool.close() pool.join() 

produce un error casi idéntico al que publicaste:

 Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 505, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks put(task) PicklingError: Can't pickle : attribute lookup __builtin__.function failed 

El problema es que todos los métodos de pool utilizan una queue.Queue para pasar las tareas a los procesos de trabajo. Todo lo que pasa a través de la queue.Queue La queue.Queue debe ser seleccionable, y foo.work no es seleccionable ya que no está definido en el nivel superior del módulo.

Se puede arreglar definiendo una función en el nivel superior, que llama a foo.work() :

 def work(foo): foo.work() pool.apply_async(work,args=(foo,)) 

Observe que foo es seleccionable, ya que Foo se define en el nivel superior y foo.__dict__ es seleccionable.

pathos.multiprocesssing , en lugar de multiprocessing . pathos.multiprocessing es una bifurcación de multiprocessing que utiliza dill . dill puede serializar casi cualquier cosa en python, por lo que puedes enviar mucho más en paralelo. La bifurcación de pathos también tiene la capacidad de trabajar directamente con múltiples funciones de argumento, según sea necesario para los métodos de clase.

 >>> from pathos.multiprocessing import ProcessingPool as Pool >>> p = Pool(4) >>> class Test(object): ... def plus(self, x, y): ... return x+y ... >>> t = Test() >>> p.map(t.plus, x, y) [4, 6, 8, 10] >>> >>> class Foo(object): ... @staticmethod ... def work(self, x): ... return x+1 ... >>> f = Foo() >>> p.apipe(f.work, f, 100)  >>> res = _ >>> res.get() 101 

Obtenga pathos (y si lo desea, dill ) aquí: https://github.com/uqfoundation

Como han dicho otros, el multiprocessing solo puede transferir objetos de Python a procesos de trabajo que pueden ser decapados. Si no puede reorganizar su código como lo describe unutbu, puede usar las capacidades ampliadas de decapado / descortezado de dill para transferir datos (especialmente datos de código) como se muestra a continuación.

Esta solución solo requiere la instalación de dill y no otras bibliotecas como pathos :

 import os from multiprocessing import Pool import dill def run_dill_encoded(payload): fun, args = dill.loads(payload) return fun(*args) def apply_async(pool, fun, args): payload = dill.dumps((fun, args)) return pool.apply_async(run_dill_encoded, (payload,)) if __name__ == "__main__": pool = Pool(processes=5) # asyn execution of lambda jobs = [] for i in range(10): job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1)) jobs.append(job) for job in jobs: print job.get() print # async execution of static method class O(object): @staticmethod def calc(): return os.getpid() jobs = [] for i in range(10): job = apply_async(pool, O.calc, ()) jobs.append(job) for job in jobs: print job.get() 

Descubrí que también puedo generar exactamente esa salida de error en un fragmento de código que funciona perfectamente al intentar usar el generador de perfiles en él.

Tenga en cuenta que esto fue en Windows (donde el forking es un poco menos elegante).

Estaba corriendo:

 python -m profile -o output.pstats  

Y encontró que al eliminar el perfil se eliminó el error y la ubicación del perfil lo restauró. También me estaba volviendo loca porque sabía que el código solía funcionar. Estaba revisando para ver si algo había actualizado pool.py ... luego tuve una sensación de hundimiento y eliminé el perfil y eso fue todo.

Publicar aquí para los archivos en caso de que alguien más se tope con él.

Esta solución solo requiere la instalación de dill y no otras bibliotecas como pathos

 def apply_packed_function_for_map((dumped_function, item, args, kwargs),): """ Unpack dumped function as target function and call it with arguments. :param (dumped_function, item, args, kwargs): a tuple of dumped function and its arguments :return: result of target function """ target_function = dill.loads(dumped_function) res = target_function(item, *args, **kwargs) return res def pack_function_for_map(target_function, items, *args, **kwargs): """ Pack function and arguments to object that can be sent from one multiprocessing.Process to another. The main problem is: «multiprocessing.Pool.map*» or «apply*» cannot use class methods or closures. It solves this problem with «dill». It works with target function as argument, dumps it («with dill») and returns dumped function with arguments of target function. For more performance we dump only target function itself and don't dump its arguments. How to use (pseudo-code): ~>>> import multiprocessing ~>>> images = [...] ~>>> pool = multiprocessing.Pool(100500) ~>>> features = pool.map( ~... *pack_function_for_map( ~... super(Extractor, self).extract_features, ~... images, ~... type='png' ~... **options, ~... ) ~... ) ~>>> :param target_function: function, that you want to execute like target_function(item, *args, **kwargs). :param items: list of items for map :param args: positional arguments for target_function(item, *args, **kwargs) :param kwargs: named arguments for target_function(item, *args, **kwargs) :return: tuple(function_wrapper, dumped_items) It returs a tuple with * function wrapper, that unpack and call target function; * list of packed target function and its' arguments. """ dumped_function = dill.dumps(target_function) dumped_items = [(dumped_function, item, args, kwargs) for item in items] return apply_packed_function_for_map, dumped_items 

También funciona para matrices numpy.

 Can't pickle : attribute lookup __builtin__.function failed 

Este error también se producirá si tiene alguna función incorporada dentro del objeto modelo que se pasó al trabajo asíncrono.

Así que asegúrese de verificar que los objetos del modelo que se pasan no tengan funciones incorporadas. (En nuestro caso, estábamos usando la función FieldTracker() de django-model-utils dentro del modelo para rastrear un campo determinado). Aquí está el enlace al tema relevante de GitHub.