Evitar las condiciones de carrera en las colas de multiprocesamiento de Python 3

Estoy tratando de encontrar el peso máximo de alrededor de 6.1 billones de artículos (personalizados) y me gustaría hacer esto con el parallel processing. Para mi aplicación particular, hay mejores algoritmos que no requieren mi iteración de más de 6.100 millones de elementos, pero el libro de texto que los explica está sobre mi cabeza y mi jefe quiere que esto se haga en 4 días. Pensé que tengo una mejor oportunidad con el servidor de lujo de mi empresa y el parallel processing. Sin embargo, todo lo que sé sobre el parallel processing proviene de leer la documentación de Python . Lo que quiere decir que estoy bastante perdido …

Mi teoría actual es configurar un proceso de alimentación, una cola de entrada, un grupo completo (por ejemplo, 30) de procesos de trabajo y una cola de salida (encontrar el elemento máximo en la cola de salida será trivial). Lo que no entiendo es cómo el proceso del alimentador puede decirle al proceso de trabajo cuándo debe dejar de esperar que los elementos lleguen a la cola de entrada.

Había pensado en usar multiprocessing.Pool.map_async en mi iterable de 6.1E9 elementos, pero toma casi 10 minutos recorrer los elementos sin hacerles nada. A menos que esté malinterpretando algo … , tener map_async iterar a través de ellos para asignarlos a los procesos se podría hacer mientras los procesos comienzan su trabajo. ( Pool también proporciona imap pero la documentación dice que es similar al map , que no parece funcionar de forma asíncrona. Quiero asíncrono, ¿verdad? )

Preguntas relacionadas : ¿Quiero usar concurrent.futures lugar de multiprocessing ? No podría ser la primera persona en implementar un sistema de dos colas (así es exactamente cómo funcionan las líneas en cada tienda de delicatessen en Estados Unidos …) así que, ¿hay una forma más Pythonic / incorporada para hacer esto?

Aquí hay un esqueleto de lo que estoy tratando de hacer. Ver el bloque de comentarios en el medio.

 import multiprocessing as mp import queue def faucet(items, bathtub): """Fill bathtub, a process-safe queue, with 6.1e9 items""" for item in items: bathtub.put(item) bathtub.close() def drain_filter(bathtub, drain): """Put maximal item from bathtub into drain. Bathtub and drain are process-safe queues. """ max_weight = 0 max_item = None while True: try: current_item = bathtub.get() # The following line three lines are the ones that I can't # quite figure out how to trigger without a race condition. # What I would love is to trigger them AFTER faucet calls # bathtub.close and the bathtub queue is empty. except queue.Empty: drain.put((max_weight, max_item)) return else: bathtub.task_done() if not item.is_relevant(): continue current_weight = item.weight if current_weight > max_weight: max_weight = current_weight max_item = current_item def parallel_max(items, nprocs=30): """The elements of items should have a method `is_relevant` and an attribute `weight`. `items` itself is an immutable iterator object. """ bathtub_q = mp.JoinableQueue() drain_q = mp.Queue() faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q)) worker_procs = mp.Pool(processes=nprocs) faucet_proc.start() worker_procs.apply_async(drain_filter, bathtub_q, drain_q) finalists = [] for i in range(nprocs): finalists.append(drain_q.get()) return max(finalists) 

AQUI ESTA LA RESPUESTA

Encontré una respuesta muy completa a mi pregunta y una introducción suave a la multitarea del director de comunicaciones de la Fundación Python, Doug Hellman. Lo que quería era el patrón de “píldora venenosa”. Compruébelo aquí: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html

Props a @MRAB por publicar el kernel de ese concepto.

Puede poner un elemento de terminación especial, como Ninguno, en la cola. Cuando un trabajador lo ve, puede devolverlo para que lo vean los demás trabajadores y luego terminar. Alternativamente, puede poner un elemento de terminación especial por trabajador en la cola.