Pool.map multiproceso de Python para múltiples argumentos

En la biblioteca de multiprocesamiento de Python, ¿existe una variante de pool.map que admita múltiples argumentos?

text = "test" def harvester(text, case): X = case[0] text+ str(X) if __name__ == '__main__': pool = multiprocessing.Pool(processes=6) case = RAW_DATASET pool.map(harvester(text,case),case, 1) pool.close() pool.join() 

La respuesta a esto depende de la versión y de la situación. La respuesta más general para las versiones recientes de Python (desde la versión 3.3) fue descrita a continuación por JF Sebastian . 1 Utiliza el método Pool.starmap , que acepta una secuencia de tuplas de argumentos. Luego, automáticamente desempaqueta los argumentos de cada tupla y los pasa a la función dada:

 import multiprocessing from itertools import product def merge_names(a, b): return '{} & {}'.format(a, b) if __name__ == '__main__': names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie'] with multiprocessing.Pool(processes=3) as pool: results = pool.starmap(merge_names, product(names, repeat=2)) print(results) # Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ... 

Para versiones anteriores de Python, deberá escribir una función auxiliar para desempaquetar los argumentos explícitamente. Si desea utilizar with , también deberá escribir un contenedor para convertir Pool en un administrador de contexto. (Gracias a muon por señalar esto).

 import multiprocessing from itertools import product from contextlib import contextmanager def merge_names(a, b): return '{} & {}'.format(a, b) def merge_names_unpack(args): return merge_names(*args) @contextmanager def poolcontext(*args, **kwargs): pool = multiprocessing.Pool(*args, **kwargs) yield pool pool.terminate() if __name__ == '__main__': names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie'] with poolcontext(processes=3) as pool: results = pool.map(merge_names_unpack, product(names, repeat=2)) print(results) # Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ... 

En casos más simples, con un segundo argumento fijo, también puede usar partial , pero solo en Python 2.7+.

 import multiprocessing from functools import partial from contextlib import contextmanager @contextmanager def poolcontext(*args, **kwargs): pool = multiprocessing.Pool(*args, **kwargs) yield pool pool.terminate() def merge_names(a, b): return '{} & {}'.format(a, b) if __name__ == '__main__': names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie'] with poolcontext(processes=3) as pool: results = pool.map(partial(merge_names, b='Sons'), names) print(results) # Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ... 

1. Gran parte de esto se inspiró en su respuesta, que probablemente debería haber sido aceptada en su lugar. Pero como esta está atascada en la parte superior, parecía mejor mejorarla para futuros lectores.

¿Existe una variante de pool.map que soporte múltiples argumentos?

Python 3.3 incluye el método pool.starmap() :

 #!/usr/bin/env python3 from functools import partial from itertools import repeat from multiprocessing import Pool, freeze_support def func(a, b): return a + b def main(): a_args = [1,2,3] second_arg = 1 with Pool() as pool: L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)]) M = pool.starmap(func, zip(a_args, repeat(second_arg))) N = pool.map(partial(func, b=second_arg), a_args) assert L == M == N if __name__=="__main__": freeze_support() main() 

Para versiones anteriores:

 #!/usr/bin/env python2 import itertools from multiprocessing import Pool, freeze_support def func(a, b): print a, b def func_star(a_b): """Convert `f([1,2])` to `f(1,2)` call.""" return func(*a_b) def main(): pool = Pool() a_args = [1,2,3] second_arg = 1 pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg))) if __name__=="__main__": freeze_support() main() 

Salida

 1 1 2 1 3 1 

Observe cómo se itertools.izip() e itertools.repeat() aquí.

Debido al error mencionado por @unutbu, no puede usar functools.partial() o capacidades similares en Python 2.6, por lo que la simple función de envoltura func_star() debe definirse explícitamente. Vea también la solución sugerida por uptimebox .

Creo que lo de abajo será mejor.

 def multi_run_wrapper(args): return add(*args) def add(x,y): return x+y if __name__ == "__main__": from multiprocessing import Pool pool = Pool(4) results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)]) print results 

salida

 [3, 5, 7] 

Usando Python 3.3 o superior con pool.starmap():

 from multiprocessing.dummy import Pool as ThreadPool def write(i, x): print(i, "---", x) a = ["1","2","3"] b = ["4","5","6"] pool = ThreadPool(2) pool.starmap(write, zip(a,b)) pool.close() pool.join() 

Resultado:

 1 --- 4 2 --- 5 3 --- 6 

También puede zip () más argumentos si lo desea: zip(a,b,c,d,e)

En caso de que quiera que se le pase un valor constante como argumento, debe usar import itertools y luego zip(itertools.repeat(constant), a) por ejemplo.

Después de haber aprendido acerca de itertools en la respuesta de JF Sebastian , decidí ir un paso más allá y escribir un paquete parmap que se encargue de la paralelización, ofreciendo funciones de map y starmap en python-2.7 y python-3.2 (y más tarde también) que pueden tomar cualquier número de los argumentos posicionales.

Instalación

 pip install parmap 

Cómo paralelizar:

 import parmap # If you want to do: y = [myfunction(x, argument1, argument2) for x in mylist] # In parallel: y = parmap.map(myfunction, mylist, argument1, argument2) # If you want to do: z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist] # In parallel: z = parmap.starmap(myfunction, mylist, argument1, argument2) # If you want to do: listx = [1, 2, 3, 4, 5, 6] listy = [2, 3, 4, 5, 6, 7] param = 3.14 param2 = 42 listz = [] for (x, y) in zip(listx, listy): listz.append(myfunction(x, y, param1, param2)) # In parallel: listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2) 

He cargado parmap en PyPI y en un repository de github .

Como ejemplo, la pregunta se puede responder de la siguiente manera:

 import parmap def harvester(case, text): X = case[0] text+ str(X) if __name__ == "__main__": case = RAW_DATASET # assuming this is an iterable parmap.map(harvester, case, "test", chunksize=1) 

Hay una bifurcación de multiprocessing llamada pathos ( nota: use la versión en github ) que no necesita starmap mapa de starmap : las funciones del mapa reflejan la API del mapa de python, por lo tanto, el mapa puede tomar múltiples argumentos. Con pathos , generalmente también puede hacer multiprocesamiento en el intérprete, en lugar de quedarse atascado en el bloque __main__ . Pathos se debe a un lanzamiento, después de algunas actualizaciones leves, principalmente la conversión a Python 3.x.

  Python 2.7.5 (default, Sep 30 2013, 20:15:49) [GCC 4.2.1 (Apple Inc. build 5566)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> def func(a,b): ... print a,b ... >>> >>> from pathos.multiprocessing import ProcessingPool >>> pool = ProcessingPool(nodes=4) >>> pool.map(func, [1,2,3], [1,1,1]) 1 1 2 1 3 1 [None, None, None] >>> >>> # also can pickle stuff like lambdas >>> result = pool.map(lambda x: x**2, range(10)) >>> result [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] >>> >>> # also does asynchronous map >>> result = pool.amap(pow, [1,2,3], [4,5,6]) >>> result.get() [1, 32, 729] >>> >>> # or can return a map iterator >>> result = pool.imap(pow, [1,2,3], [4,5,6]) >>> result  >>> list(result) [1, 32, 729] 

Puede usar las siguientes dos funciones para evitar escribir un contenedor para cada nueva función:

 import itertools from multiprocessing import Pool def universal_worker(input_pair): function, args = input_pair return function(*args) def pool_args(function, *args): return zip(itertools.repeat(function), zip(*args)) 

Utilice la función de function con las listas de argumentos arg_0 , arg_1 y arg_2 siguiente manera:

 pool = Pool(n_core) list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2) pool.close() pool.join() 

Una mejor manera es usar el decorador en lugar de escribir la función de envoltura a mano. Especialmente cuando tiene muchas funciones para asignar, el decorador ahorrará tiempo al evitar escribir envoltorio para cada función. Por lo general, una función decorada no es seleccionable, sin embargo, podemos usar functools para functools . Más disscusiones se pueden encontrar aquí .

Aqui el ejemplo

 def unpack_args(func): from functools import wraps @wraps(func) def wrapper(args): if isinstance(args, dict): return func(**args) else: return func(*args) return wrapper @unpack_args def func(x, y): return x + y 

Entonces puedes mapearlo con argumentos comprimidos.

 np, xlist, ylist = 2, range(10), range(10) pool = Pool(np) res = pool.map(func, zip(xlist, ylist)) pool.close() pool.join() 

Por supuesto, siempre puede usar Pool.starmap en Python 3 (> = 3.3) como se menciona en otras respuestas.

Otra alternativa simple es envolver los parámetros de la función en una tupla y luego envolver los parámetros que también deben pasarse en las tuplas. Tal vez esto no sea ideal cuando se trata de grandes datos. Creo que haría copias para cada tupla.

 from multiprocessing import Pool def f((a,b,c,d)): print a,b,c,d return a + b + c +d if __name__ == '__main__': p = Pool(10) data = [(i+0,i+1,i+2,i+3) for i in xrange(10)] print(p.map(f, data)) p.close() p.join() 

Da la salida en algún orden aleatorio:

 0 1 2 3 1 2 3 4 2 3 4 5 3 4 5 6 4 5 6 7 5 6 7 8 7 8 9 10 6 7 8 9 8 9 10 11 9 10 11 12 [6, 10, 14, 18, 22, 26, 30, 34, 38, 42] 

Una mejor solución para python2:

 from multiprocessing import Pool def func((i, (a, b))): print i, a, b return a + b pool = Pool(3) pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))]) 

2 3 4

1 2 3

0 1 2

afuera[]:

[3, 5, 7]

# “Cómo tomar múltiples argumentos”.

 def f1(args): a, b, c = args[0] , args[1] , args[2] return a+b+c if __name__ == "__main__": import multiprocessing pool = multiprocessing.Pool(4) result1 = pool.map(f1, [ [1,2,3] ]) print(result1) 

Otra forma es pasar una lista de listas a una rutina de un argumento:

 import os from multiprocessing import Pool def task(args): print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1] pool = Pool() pool.map(task, [ [1,2], [3,4], [5,6], [7,8] ]) 

Uno puede construir una lista de argumentos con su método favorito.

Desde Python 3.4.4, puedes usar multiprocessing.get_context () para obtener un objeto de contexto para usar múltiples métodos de inicio:

 import multiprocessing as mp def foo(q, h, w): q.put(h + ' ' + w) print(h + ' ' + w) if __name__ == '__main__': ctx = mp.get_context('spawn') q = ctx.Queue() p = ctx.Process(target=foo, args=(q,'hello', 'world')) p.start() print(q.get()) p.join() 

O simplemente reemplaza

 pool.map(harvester(text,case),case, 1) 

por:

 pool.apply_async(harvester(text,case),case, 1) 

En la documentación oficial se afirma que apoya solo un argumento iterable. Me gusta usar apply_async en tales casos. En tu caso yo haría:

 from multiprocessing import Process, Pool, Manager text = "test" def harvester(text, case, q = None): X = case[0] res = text+ str(X) if q: q.put(res) return res def block_until(q, results_queue, until_counter=0): i = 0 while i < until_counter: results_queue.put(q.get()) i+=1 if __name__ == '__main__': pool = multiprocessing.Pool(processes=6) case = RAW_DATASET m = Manager() q = m.Queue() results_queue = m.Queue() # when it completes results will reside in this queue blocking_process = Process(block_until, (q, results_queue, len(case))) blocking_process.start() for c in case: try: res = pool.apply_async(harvester, (text, case, q = None)) res.get(timeout=0.1) except: pass blocking_process.join() 
 text = "test" def unpack(args): return args[0](*args[1:]) def harvester(text, case): X = case[0] text+ str(X) if __name__ == '__main__': pool = multiprocessing.Pool(processes=6) case = RAW_DATASET # args is a list of tuples # with the function to execute as the first item in each tuple args = [(harvester, text, c) for c in case] # doing it this way, we can pass any function # and we don't need to define a wrapper for each different function # if we need to use more than one pool.map(unpack, args) pool.close() pool.join() 

Este es un ejemplo de la rutina que utilizo para pasar múltiples argumentos a una función de un argumento utilizada en una bifurcación pool.imap :

 from multiprocessing import Pool # Wrapper of the function to map: class makefun: def __init__(self, var2): self.var2 = var2 def fun(self, i): var2 = self.var2 return var1[i] + var2 # Couple of variables for the example: var1 = [1, 2, 3, 5, 6, 7, 8] var2 = [9, 10, 11, 12] # Open the pool: pool = Pool(processes=2) # Wrapper loop for j in range(len(var2)): # Obtain the function to map pool_fun = makefun(var2[j]).fun # Fork loop for i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0): print(var1[i], '+' ,var2[j], '=', value) # Close the pool pool.close() 

Hay muchas respuestas aquí, pero ninguna parece proporcionar un código compatible con Python 2/3 que funcione en cualquier versión. Si desea que su código solo funcione , esto funcionará para cualquiera de las versiones de Python:

 # For python 2/3 compatibility, define pool context manager # to support the 'with' statement in Python 2 if sys.version_info[0] == 2: from contextlib import contextmanager @contextmanager def multiprocessing_context(*args, **kwargs): pool = multiprocessing.Pool(*args, **kwargs) yield pool pool.terminate() else: multiprocessing_context = multiprocessing.Pool 

Después de eso, puedes usar el multiprocesamiento de manera regular de Python 3, como quieras. Por ejemplo:

 def _function_to_run_for_each(x): return x.lower() with multiprocessing_context(processes=3) as pool: results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim']) print(results) 

Trabajará en Python 2 o Python 3.

para python2, puedes usar este truco

 def fun(a,b): return a+b pool = multiprocessing.Pool(processes=6) b=233 pool.map(lambda x:fun(x,b),range(1000))