rebashs de grupo de multiprocesamiento python

¿Hay una manera de reenviar una parte de los datos para su procesamiento, si el cálculo original falló, usando un grupo simple?

import random from multiprocessing import Pool def f(x): if random.getrandbits(1): raise ValueError("Retry this computation") return x*x p = Pool(5) # If one of these f(x) calls fails, retry it with another (or same) process p.map(f, [1,2,3]) 

Si puede (o no le importa) volver a intentarlo de inmediato, use un decorador que complete la función:

 import random from multiprocessing import Pool from functools import wraps def retry(f): @wraps(f) def wrapped(*args, **kwargs): while True: try: return f(*args, **kwargs) except ValueError: pass return wrapped @retry def f(x): if random.getrandbits(1): raise ValueError("Retry this computation") return x*x p = Pool(5) # If one of these f(x) calls fails, retry it with another (or same) process p.map(f, [1,2,3]) 

Puede usar una Queue para retroalimentar fallas en el Pool través de un bucle en el Process inicio:

 import multiprocessing as mp import random def f(x): if random.getrandbits(1): # on failure / exception catch fqput(x) return None return x*x def f_init(q): fq = q def main(pending): total_items = len(pending) successful = [] failure_tracker = [] q = mp.Queue() p = mp.Pool(None, f_init, [q]) results = p.imap(f, pending) retry_results = [] while len(successful) < total_items: successful.extend([r for r in results if not r is None]) successful.extend([r for r in retry_results if not r is None]) failed_items = [] while not q.empty(): failed_items.append(q.get()) if failed_items: failure_tracker.append(failed_items) retry_results = p.imap(f, failed_items); p.close() p.join() print "Results: %s" % successful print "Failures: %s" % failure_tracker if __name__ == '__main__': main(range(1, 10)) 

La salida es así:

 Results: [1, 4, 36, 49, 25, 81, 16, 64, 9] Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []] 

Un Pool no puede ser compartido entre múltiples procesos. De ahí este enfoque basado en la Queue . Si intenta pasar una agrupación como parámetro a los procesos de agrupaciones, obtendrá este error:

 NotImplementedError: pool objects cannot be passed between processes or pickled 

Alternativamente, puede probar algunos bashs inmediatos dentro de su función f , para evitar la sobrecarga de sincronización. Realmente es una cuestión de qué tan pronto su función debería esperar para volver a intentarlo, y de qué tan probable es un éxito si se reintenta de inmediato.


Respuesta antigua: para completar, aquí está mi respuesta anterior, que no es tan óptima como volver a enviarla directamente a la piscina, pero podría ser relevante dependiendo del caso de uso, ya que proporciona una forma natural de lidiar con / limitar n -rebashs de nivel:

Puede usar una Queue para agregar fallos y reenviar al final de cada ejecución, en múltiples ejecuciones:

 import multiprocessing as mp import random def f(x): if random.getrandbits(1): # on failure / exception catch fqput(x) return None return x*x def f_init(q): fq = q def main(pending): run_number = 1 while pending: jobs = pending pending = [] q = mp.Queue() p = mp.Pool(None, f_init, [q]) results = p.imap(f, jobs) p.close() p.join() failed_items = [] while not q.empty(): failed_items.append(q.get()) successful = [r for r in results if not r is None] print "(%d) Succeeded: %s" % (run_number, successful) print "(%d) Failed: %s" % (run_number, failed_items) print pending = failed_items run_number += 1 if __name__ == '__main__': main(range(1, 10)) 

con salida como esta:

 (1) Succeeded: [9, 16, 36, 81] (1) Failed: [2, 1, 5, 7, 8] (2) Succeeded: [64] (2) Failed: [2, 1, 5, 7] (3) Succeeded: [1, 25] (3) Failed: [2, 7] (4) Succeeded: [49] (4) Failed: [2] (5) Succeeded: [4] (5) Failed: []