Trabajo de multiprocesamiento de Python para la tarea de apio pero AttributeError

Hice una función multiprocesada como esta,

import multiprocessing import pandas as pd import numpy as np def _apply_df(args): df, func, kwargs = args return df.apply(func, **kwargs) def apply_by_multiprocessing(df, func, **kwargs): workers = kwargs.pop('workers') pool = multiprocessing.Pool(processes=workers) result = pool.map(_apply_df, [(d, func, kwargs) for d in np.array_split(df, workers)]) pool.close() return pd.concat(list(result)) def square(x): return x**x if __name__ == '__main__': df = pd.DataFrame({'a':range(10), 'b':range(10)}) apply_by_multiprocessing(df, square, axis=1, workers=4) ## run by 4 processors 

Arriba, “apply_by_multiprocessing” puede ejecutar Pandas Dataframe en paralelo. Pero cuando llego a la tarea de apio, Levantó AssertionError: el objeto ‘Trabajador’ no tiene atributo ‘_config’.

 from celery import shared_task @shared_task def my_multiple_job(): df = pd.DataFrame({'a':range(10), 'b':range(10)}) apply_by_multiprocessing(df, square, axis=1, workers=4) 

Su traza de error es así.

  File "/Users/yong27/work/goldstar/kinmatch/utils.py", line 14, in apply_by_multiprocessing pool = multiprocessing.Pool(processes=workers) File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 118, in Pool context=self.get_context()) File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 146, in __init__ self._setup_queues() File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 238, in _setup_queues self._inqueue = self._ctx.SimpleQueue() File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 111, in SimpleQueue return SimpleQueue(ctx=self.get_context()) File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 336, in __init__ self._rlock = ctx.Lock() File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 66, in Lock return Lock(ctx=self.get_context()) File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 164, in __init__ SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx) File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 60, in __init__ kind, value, maxvalue, self._make_name(), File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 118, in _make_name return '%s-%s' % (process.current_process()._config['semprefix'], AttributeError: 'Worker' object has no attribute '_config' 

Parece que porque el trabajador del apio no es un proceso normal. ¿Como puedó resolver esté problema? Estoy usando Python3.4, Django 1.6.2, apio 3.1.10, django-apio 3.1.9, pandas 0.12.0.

Este problema tiene una buena respuesta en esta otra pregunta.

Básicamente, es un problema conocido de Celery y se proporciona un truco sucio : funcionó para mí, acabo de agregar el siguiente código en el mismo archivo donde se definen mis tareas:

 from celery.signals import worker_process_init from multiprocessing import current_process @worker_process_init.connect def fix_multiprocessing(**kwargs): try: current_process()._config except AttributeError: current_process()._config = {'semprefix': '/mp'} 

No sé por qué no funciona el multiprocesamiento, pero te recomiendo que uses la tarea de grupo de apio.

 from celery import task, group def feeds_fetch(feeds): g = group(fetch_one.s(feed) for feed in feeds) g.apply_async() @task() def fetch_one(feed): return feed.fetch()