¿Cómo puedo recuperar el valor de retorno de una función pasada a multiprocessing.Process?

En el código de ejemplo a continuación, me gustaría recuperar el valor de retorno de la función worker . ¿Cómo puedo hacer esto? ¿Dónde se almacena este valor?

Código de ejemplo:

 import multiprocessing def worker(procnum): '''worker function''' print str(procnum) + ' represent!' return procnum if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) jobs.append(p) p.start() for proc in jobs: proc.join() print jobs 

Salida:

 0 represent! 1 represent! 2 represent! 3 represent! 4 represent! [, , , , ] 

Parece que no puedo encontrar el atributo relevante en los objetos almacenados en jobs .

Gracias de antemano, blz

Utiliza la variable compartida para comunicarte. Por ejemplo como este:

 import multiprocessing def worker(procnum, return_dict): '''worker function''' print str(procnum) + ' represent!' return_dict[procnum] = procnum if __name__ == '__main__': manager = multiprocessing.Manager() return_dict = manager.dict() jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,return_dict)) jobs.append(p) p.start() for proc in jobs: proc.join() print return_dict.values() 

Creo que el enfoque sugerido por @sega_sai es el mejor. Pero realmente necesita un ejemplo de código, así que aquí va:

 import multiprocessing from os import getpid def worker(procnum): print 'I am number %d in process %d' % (procnum, getpid()) return getpid() if __name__ == '__main__': pool = multiprocessing.Pool(processes = 3) print pool.map(worker, range(5)) 

Lo que imprimirá los valores de retorno:

 I am number 0 in process 19139 I am number 1 in process 19138 I am number 2 in process 19140 I am number 3 in process 19139 I am number 4 in process 19140 [19139, 19138, 19140, 19139, 19140] 

Si está familiarizado con el map (el Python 2 incorporado) esto no debería ser demasiado difícil. De lo contrario echa un vistazo al enlace de sega_Sai .

Note lo poco que se necesita el código. (También tenga en cuenta cómo se reutilizan los procesos).

Este ejemplo muestra cómo utilizar una lista de instancias de multiprocesamiento.Pipe para devolver cadenas desde un número arbitrario de procesos:

 import multiprocessing def worker(procnum, send_end): '''worker function''' result = str(procnum) + ' represent!' print result send_end.send(result) def main(): jobs = [] pipe_list = [] for i in range(5): recv_end, send_end = multiprocessing.Pipe(False) p = multiprocessing.Process(target=worker, args=(i, send_end)) jobs.append(p) pipe_list.append(recv_end) p.start() for proc in jobs: proc.join() result_list = [x.recv() for x in pipe_list] print result_list if __name__ == '__main__': main() 

Salida:

 0 represent! 1 represent! 2 represent! 3 represent! 4 represent! ['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!'] 

Esta solución usa menos recursos que un multiprocesamiento.

  • un tubo
  • al menos un locking
  • un amortiguador
  • un hilo

o un multiproceso.SimpleQueue que utiliza

  • un tubo
  • al menos un locking

Es muy instructivo mirar la fuente de cada uno de estos tipos.

Parece que deberías usar la clase multiprocessing.Pool en su lugar y usar los métodos .apply () .apply_async (), map ()

http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult

Puede utilizar la función de exit incorporada para establecer el código de salida de un proceso. Se puede obtener del atributo exitcode del proceso:

 import multiprocessing def worker(procnum): print str(procnum) + ' represent!' exit(procnum) if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) jobs.append(p) p.start() result = [] for proc in jobs: proc.join() result.append(proc.exitcode) print result 

Salida:

 0 represent! 1 represent! 2 represent! 3 represent! 4 represent! [0, 1, 2, 3, 4] 

Por alguna razón, no pude encontrar un ejemplo general de cómo hacer esto con la Queue cualquier lugar (incluso los ejemplos de documentos de Python no generan múltiples procesos), así que esto es lo que obtuve después de 10 bashs:

 def add_helper(queue, arg1, arg2): # the func called in child processes ret = arg1 + arg2 queue.put(ret) def multi_add(): # spawns child processes q = Queue() processes = [] rets = [] for _ in range(0, 100): p = Process(target=add_helper, args=(q, 1, 2)) processes.append(p) p.start() for p in processes: ret = q.get() # will block rets.append(ret) for p in processes: p.join() return rets 

Queue es una Queue locking segura para subprocesos que puede utilizar para almacenar los valores de retorno de los procesos secundarios. Así que tienes que pasar la cola a cada proceso. Algo menos obvio aquí es que debes get() de la cola antes de join los Process o, de lo contrario, la cola se llena y bloquea todo.

Actualización para aquellos que están orientados a objetos (probado en Python 3.4):

 from multiprocessing import Process, Queue class Multiprocessor(): def __init__(self): self.processes = [] self.queue = Queue() @staticmethod def _wrapper(func, queue, args, kwargs): ret = func(*args, **kwargs) queue.put(ret) def run(self, func, *args, **kwargs): args2 = [func, self.queue, args, kwargs] p = Process(target=self._wrapper, args=args2) self.processes.append(p) p.start() def wait(self): rets = [] for p in self.processes: ret = self.queue.get() rets.append(ret) for p in self.processes: p.join() return rets # tester if __name__ == "__main__": mp = Multiprocessor() num_proc = 64 for _ in range(num_proc): # queue up multiple tasks running `sum` mp.run(sum, [1, 2, 3, 4, 5]) ret = mp.wait() # get all results print(ret) assert len(ret) == num_proc and all(r == 15 for r in ret) 

Para cualquier otra persona que esté buscando cómo obtener un valor de un Process utilizando la Queue :

 import multiprocessing ret = {'foo': False} def worker(queue): ret = queue.get() ret['foo'] = True queue.put(ret) if __name__ == '__main__': queue = multiprocessing.Queue() queue.put(ret) p = multiprocessing.Process(target=worker, args=(queue,)) p.start() print queue.get() # Prints {"foo": True} p.join() 

Modifiqué un poco la respuesta de vartec ya que necesitaba obtener los códigos de error de la función. (¡Gracias, vertec! Es un truco impresionante)

Esto también se puede hacer con un manager.list pero creo que es mejor tenerlo en un dict y almacenar una lista dentro de él. De esa manera, mantendremos la función y los resultados, ya que no podemos estar seguros del orden en que se rellenará la lista.

 from multiprocessing import Process import time import datetime import multiprocessing def func1(fn, m_list): print 'func1: starting' time.sleep(1) m_list[fn] = "this is the first function" print 'func1: finishing' # return "func1" # no need for return since Multiprocess doesnt return it =( def func2(fn, m_list): print 'func2: starting' time.sleep(3) m_list[fn] = "this is function 2" print 'func2: finishing' # return "func2" def func3(fn, m_list): print 'func3: starting' time.sleep(9) # if fail wont join the rest because it never populate the dict # or do a try/except to get something in return. raise ValueError("failed here") # if we want to get the error in the manager dict we can catch the error try: raise ValueError("failed here") m_list[fn] = "this is third" except: m_list[fn] = "this is third and it fail horrible" # print 'func3: finishing' # return "func3" def runInParallel(*fns): # * is to accept any input in list start_time = datetime.datetime.now() proc = [] manager = multiprocessing.Manager() m_list = manager.dict() for fn in fns: # print fn # print dir(fn) p = Process(target=fn, name=fn.func_name, args=(fn, m_list)) p.start() proc.append(p) for p in proc: p.join() # 5 is the time out print datetime.datetime.now() - start_time return m_list, proc if __name__ == '__main__': manager, proc = runInParallel(func1, func2, func3) # print dir(proc[0]) # print proc[0]._name # print proc[0].name # print proc[0].exitcode # here you can check what did fail for i in proc: print i.name, i.exitcode # name was set up in the Process line 53 # here will only show the function that worked and where able to populate the # manager dict for i, j in manager.items(): print dir(i) # things you can do to the function print i, j 

Una solución simple:

 import multiprocessing output=[] data = range(0,10) def f(x): return x**2 def handler(): p = multiprocessing.Pool(64) r=p.map(f, data) return r if __name__ == '__main__': output.append(handler()) print(output[0]) 

Salida:

 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]