¿Qué pueden hacer juntos el multiprocesamiento y el eneldo?

Me gustaría usar la biblioteca de multiprocessing en Python. Lamentablemente, el multiprocessing utiliza pickle que no admite funciones con cierres, lambdas o funciones en __main__ . Los tres son importantes para mí.

 In [1]: import pickle In [2]: pickle.dumps(lambda x: x) PicklingError: Can't pickle <function  at 0x23c0e60>: it's not found as __main__. 

Afortunadamente hay dill un dill más robusto. Al parecer, el dill realiza magia en la importación para hacer que el pickle funcione

 In [3]: import dill In [4]: pickle.dumps(lambda x: x) Out[4]: "cdill.dill\n_load_type\np0\n(S'FunctionType'\np1 ... 

Esto es muy alentador, especialmente porque no tengo acceso al código fuente de multiprocesamiento. Lamentablemente, todavía no puedo hacer funcionar este ejemplo tan básico.

 import multiprocessing as mp import dill p = mp.Pool(4) print p.map(lambda x: x**2, range(10)) 

¿Por qué es esto? ¿Qué me estoy perdiendo? ¿Cuáles son exactamente las limitaciones de la combinación de multiprocessing + dill ?

Edición temporal para JF Sebastian

 mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py Temporary Edit for JF Sebastian mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py Exception in thread Thread-2: Traceback (most recent call last): File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 808, in __bootstrap_inner self.run() File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 761, in run self.__target(*self.__args, **self.__kwargs) File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks put(task) PicklingError: Can't pickle : attribute lookup __builtin__.function failed ^C ...lots of junk... [DEBUG/MainProcess] cleaning up worker 3 [DEBUG/MainProcess] cleaning up worker 2 [DEBUG/MainProcess] cleaning up worker 1 [DEBUG/MainProcess] cleaning up worker 0 [DEBUG/MainProcess] added worker [DEBUG/MainProcess] added worker [INFO/PoolWorker-5] child process calling self.run() [INFO/PoolWorker-6] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/PoolWorker-7] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/PoolWorker-8] child process calling self.run()Exception in thread Thread-2: Traceback (most recent call last): File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 808, in __bootstrap_inner self.run() File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 761, in run self.__target(*self.__args, **self.__kwargs) File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks put(task) PicklingError: Can't pickle : attribute lookup __builtin__.function failed ^C ...lots of junk... [DEBUG/MainProcess] cleaning up worker 3 [DEBUG/MainProcess] cleaning up worker 2 [DEBUG/MainProcess] cleaning up worker 1 [DEBUG/MainProcess] cleaning up worker 0 [DEBUG/MainProcess] added worker [DEBUG/MainProcess] added worker [INFO/PoolWorker-5] child process calling self.run() [INFO/PoolWorker-6] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/PoolWorker-7] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/PoolWorker-8] child process calling self.run() 

multiprocessing hace algunas malas decisiones sobre el decapado. No me malinterpretes, hace algunas buenas elecciones que le permiten encerrar ciertos tipos para que puedan usarse en la función de mapa de un grupo. Sin embargo, dado que tenemos un dill que puede hacer el decapado, el propio decapado multiproceso se vuelve un poco limitante. En realidad, si el multiprocessing utilizara pickle lugar de cPickle … y también eliminara algunas de sus propias anulaciones de decapado, entonces el dill podría hacerse cargo y dar una serialización mucho más completa para el multiprocessing .

Hasta que eso suceda, hay una bifurcación de multiprocessing llamada pathos (la versión de lanzamiento es un poco obsoleta, desafortunadamente) que elimina las limitaciones anteriores. Pathos también agrega algunas características agradables que el multiprocesamiento no tiene, como multi-args en la función de mapa. Pathos se debe a un lanzamiento, después de algunas actualizaciones leves, principalmente la conversión a Python 3.x.

 Python 2.7.5 (default, Sep 30 2013, 20:15:49) [GCC 4.2.1 (Apple Inc. build 5566)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> import dill >>> from pathos.multiprocessing import ProcessingPool >>> pool = ProcessingPool(nodes=4) >>> result = pool.map(lambda x: x**2, range(10)) >>> result [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 

y solo para mostrar un poco de lo que puede hacer pathos.multiprocessing

 >>> def busy_add(x,y, delay=0.01): ... for n in range(x): ... x += n ... for n in range(y): ... y -= n ... import time ... time.sleep(delay) ... return x + y ... >>> def busy_squared(x): ... import time, random ... time.sleep(2*random.random()) ... return x*x ... >>> def squared(x): ... return x*x ... >>> def quad_factory(a=1, b=1, c=0): ... def quad(x): ... return a*x**2 + b*x + c ... return quad ... >>> square_plus_one = quad_factory(2,0,1) >>> >>> def test1(pool): ... print pool ... print "x: %s\n" % str(x) ... print pool.map.__name__ ... start = time.time() ... res = pool.map(squared, x) ... print "time to results:", time.time() - start ... print "y: %s\n" % str(res) ... print pool.imap.__name__ ... start = time.time() ... res = pool.imap(squared, x) ... print "time to queue:", time.time() - start ... start = time.time() ... res = list(res) ... print "time to results:", time.time() - start ... print "y: %s\n" % str(res) ... print pool.amap.__name__ ... start = time.time() ... res = pool.amap(squared, x) ... print "time to queue:", time.time() - start ... start = time.time() ... res = res.get() ... print "time to results:", time.time() - start ... print "y: %s\n" % str(res) ... >>> def test2(pool, items=4, delay=0): ... _x = range(-items/2,items/2,2) ... _y = range(len(_x)) ... _d = [delay]*len(_x) ... print map ... res1 = map(busy_squared, _x) ... res2 = map(busy_add, _x, _y, _d) ... print pool.map ... _res1 = pool.map(busy_squared, _x) ... _res2 = pool.map(busy_add, _x, _y, _d) ... assert _res1 == res1 ... assert _res2 == res2 ... print pool.imap ... _res1 = pool.imap(busy_squared, _x) ... _res2 = pool.imap(busy_add, _x, _y, _d) ... assert list(_res1) == res1 ... assert list(_res2) == res2 ... print pool.amap ... _res1 = pool.amap(busy_squared, _x) ... _res2 = pool.amap(busy_add, _x, _y, _d) ... assert _res1.get() == res1 ... assert _res2.get() == res2 ... print "" ... >>> def test3(pool): # test against a function that should fail in pickle ... print pool ... print "x: %s\n" % str(x) ... print pool.map.__name__ ... start = time.time() ... res = pool.map(square_plus_one, x) ... print "time to results:", time.time() - start ... print "y: %s\n" % str(res) ... >>> def test4(pool, maxtries, delay): ... print pool ... m = pool.amap(busy_add, x, x) ... tries = 0 ... while not m.ready(): ... time.sleep(delay) ... tries += 1 ... print "TRY: %s" % tries ... if tries >= maxtries: ... print "TIMEOUT" ... break ... print m.get() ... >>> import time >>> x = range(18) >>> delay = 0.01 >>> items = 20 >>> maxtries = 20 >>> from pathos.multiprocessing import ProcessingPool as Pool >>> pool = Pool(nodes=4) >>> test1(pool)  x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17] map time to results: 0.0553691387177 y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289] imap time to queue: 7.91549682617e-05 time to results: 0.102381229401 y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289] amap time to queue: 7.08103179932e-05 time to results: 0.0489699840546 y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289] >>> test2(pool, items, delay)  > > > >>> test3(pool)  x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17] map time to results: 0.0523059368134 y: [1, 3, 9, 19, 33, 51, 73, 99, 129, 163, 201, 243, 289, 339, 393, 451, 513, 579] >>> test4(pool, maxtries, delay)  TRY: 1 TRY: 2 TRY: 3 TRY: 4 TRY: 5 TRY: 6 TRY: 7 [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34]