Compartiendo una cola de resultados entre varios procesos.

La documentación para el módulo de multiprocessing muestra cómo pasar una cola a un proceso iniciado con multiprocessing.Process . Proceso. Pero, ¿cómo puedo compartir una cola con procesos de trabajo asíncronos iniciados con apply_async ? No necesito uniones dinámicas ni nada más, solo una forma de que los trabajadores (repetidamente) informen sus resultados a la base.

 import multiprocessing def worker(name, que): que.put("%d is done" % name) if __name__ == '__main__': pool = multiprocessing.Pool(processes=3) q = multiprocessing.Queue() workers = pool.apply_async(worker, (33, q)) 

Esto falla con: RuntimeError: Queue objects should only be shared between processes through inheritance . Entiendo lo que esto significa, y entiendo el consejo de heredar en lugar de exigir decapado / despeje (y todas las restricciones especiales de Windows). Pero, ¿cómo paso la cola de una manera que funcione? No puedo encontrar un ejemplo, y he probado varias alternativas que fallaron de varias maneras. ¿Ayuda por favor?

Intente utilizar el multiprocesamiento. Gestor para gestionar su cola y también para que sea accesible a diferentes trabajadores.

 import multiprocessing def worker(name, que): que.put("%d is done" % name) if __name__ == '__main__': pool = multiprocessing.Pool(processes=3) m = multiprocessing.Manager() q = m.Queue() workers = pool.apply_async(worker, (33, q)) 

multiprocessing.Pool ya tiene una cola de resultados compartida, no es necesario involucrar adicionalmente a Manager.Queue .

Además, los procesos de trabajo no se inician con .apply_async() , esto ya sucede cuando se crea una instancia de Pool . Lo que se inicia cuando llama a pool.apply_async() es un nuevo “trabajo”. Los procesos de trabajo de Pool ejecutan la función multiprocessing.pool.worker bajo el capó. Esta función se encarga de procesar las nuevas “tareas” transferidas a través del Pool._inqueue interno del Pool._inqueue y de enviar los resultados al padre a través del Pool._outqueue . Su func especificada se ejecutará dentro de multiprocessing.pool.worker . func solo tiene que return algo y el resultado se enviará automáticamente a los padres.

.apply_async() inmediatamente (de forma asíncrona) devuelve un objeto AsyncResult (alias para ApplyResult ). .get() llamar a .get() (está bloqueando) en ese objeto para recibir el resultado real. Otra opción sería registrar una función de callback , que se activa tan pronto como el resultado esté listo.

 from multiprocessing import Pool def busy_foo(i): """Dummy function simulating cpu-bound work.""" for _ in range(int(10e6)): # do stuff pass return i if __name__ == '__main__': with Pool(4) as pool: print(pool._outqueue) # DEMO results = [pool.apply_async(busy_foo, (i,)) for i in range(10)] # `.apply_async()` immediately returns AsyncResult (ApplyResult) object print(results[0]) # DEMO results = [res.get() for res in results] print(f'result: {results}') 

Ejemplo de salida:

   result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 

Nota: Especificar el .get() timeout para .get() no detendrá el procesamiento real de la tarea dentro del trabajador, solo desbloquea al padre en espera al generar un multiprocessing.TimeoutError .