Necesita una cola de mensajes asíncrona segura para subprocesos

Estoy buscando una clase de Python (preferiblemente parte del lenguaje estándar, en lugar de una biblioteca de terceros) para administrar los mensajes asíncronos de “estilo de transmisión”.

Tendré un hilo que pone mensajes en la cola (el método ‘putMessageOnQueue’ no debe bloquearse) y luego varios otros hilos que estarán todos esperando mensajes, habiendo llamado probablemente alguna función de locking ‘waitForMessage’. Cuando se coloca un mensaje en la cola, quiero que cada uno de los hilos en espera obtenga su propia copia del mensaje.

He examinado la clase de Queue incorporada, pero no creo que esto sea adecuado porque el consumo de mensajes parece implicar eliminarlos de la cola, por lo que solo uno de los subprocesos del cliente verá cada uno.

Esto parece que debería ser un caso de uso común, ¿alguien puede recomendar una solución?

Creo que el enfoque típico de esto es usar una cola de mensajes separada para cada hilo, y empujar el mensaje en cada cola que haya registrado previamente un interés en recibir tales mensajes.

Algo como esto debería funcionar, pero es un código no probado …

 from time import sleep from threading import Thread from Queue import Queue class DispatcherThread(Thread): def __init__(self, *args, **kwargs): super(DispatcherThread, self).__init__(*args, **kwargs) self.interested_threads = [] def run(self): while 1: if some_condition: self.dispatch_message(some_message) else: sleep(0.1) def register_interest(self, thread): self.interested_threads.append(thread) def dispatch_message(self, message): for thread in self.interested_threads: thread.put_message(message) class WorkerThread(Thread): def __init__(self, *args, **kwargs): super(WorkerThread, self).__init__(*args, **kwargs) self.queue = Queue() def run(self): # Tell the dispatcher thread we want messages dispatcher_thread.register_interest(self) while 1: # Wait for next message message = self.queue.get() # Process message # ... def put_message(self, message): self.queue.put(message) dispatcher_thread = DispatcherThread() dispatcher_thread.start() worker_threads = [] for i in range(10): worker_thread = WorkerThread() worker_thread.start() worker_threads.append(worker_thread) dispatcher_thread.join() 

Creo que este es un ejemplo más directo (tomado del ejemplo de Cola en Python Lib )

 from threading import Thread from Queue import Queue num_worker_threads = 2 def worker(): while True: item = q.get() do_work(item) q.task_done() q = Queue() for i in range(num_worker_threads): t = Thread(target=worker) t.daemon = True t.start() for item in source(): q.put(item) q.join() # block until all tasks are done