¿”Seleccionar” en múltiples colas de multiproceso de Python?

¿Cuál es la mejor manera de esperar (sin girar) hasta que haya algo disponible en una de las dos colas (multiprocesamiento), donde ambas residen en el mismo sistema?

No parece que haya una manera oficial de manejar esto todavía. O al menos, no basado en esto:

Puede intentar algo como lo que hace esta publicación: acceder a los identificadores de archivo subyacentes:

y luego usar seleccionar.

En realidad, puede utilizar objetos multiprocessing.Queue en select.select. es decir

que = multiprocessing.Queue() (input,[],[]) = select.select([que._reader],[],[]) 

seleccionaría que solo si está listo para ser leído.

Aunque no hay documentación al respecto. Estaba leyendo el código fuente de la biblioteca multiprocessing.queue (en linux suele ser algo como /usr/lib/python2.6/multiprocessing/queue.py) para averiguarlo.

Con Queue.Queue no encontré ninguna forma inteligente de hacer esto (y me encantaría).

Parece que el uso de subprocesos que reenvían los elementos entrantes a una Cola única en la que luego espera es una opción práctica cuando se utiliza el multiprocesamiento de forma independiente de la plataforma.

Para evitar los subprocesos, es necesario manejar tuberías / FD de bajo nivel, que son específicas de la plataforma y no son fáciles de manejar de manera coherente con la API de nivel superior.

O necesitaría Colas con la capacidad de establecer devoluciones de llamada, lo que creo que es la interfaz de nivel superior adecuada para el que debe ir. Es decir, escribirías algo como:

   singlequeue = Queue ()
   entrante_queue1.setcallback (singlequeue.put)
   entrante_queue2.setcallback (singlequeue.put)
   ...
   singlequeue.get ()

Tal vez el paquete de multiprocesamiento podría hacer crecer esta API pero aún no está allí. El concepto funciona bien con py.execnet que usa el término “canal” en lugar de “colas”, consulte aquí http://tinyurl.com/nmtr4w

Podría usar algo como el patrón Observer , en el que se notifica a los suscriptores de la cola los cambios de estado.

En este caso, podría tener su subproceso de trabajo designado como un oyente en cada cola, y cada vez que reciba una señal de listo, puede funcionar en el nuevo elemento, de lo contrario, dormir.

No estoy seguro de cómo funciona la selección en una cola de multiprocesamiento en Windows. Como seleccionar en Windows escucha sockets y no manejadores de archivos, sospecho que podría haber problemas.

Mi respuesta es hacer un hilo para escuchar cada cola de una manera bloqueada, y poner todos los resultados en una sola cola escuchada por el hilo principal, esencialmente multiplexando las colas individuales en una sola.

Mi código para hacer esto es:

 """ Allow multiple queues to be waited upon. queue,value = multiq.select(list_of_queues) """ import queue import threading class queue_reader(threading.Thread): def __init__(self,inq,sharedq): threading.Thread.__init__(self) self.inq = inq self.sharedq = sharedq def run(self): while True: data = self.inq.get() print ("thread reads data=",data) result = (self.inq,data) self.sharedq.put(result) class multi_queue(queue.Queue): def __init__(self,list_of_queues): queue.Queue.__init__(self) for q in list_of_queues: qr = queue_reader(q,self) qr.start() def select(list_of_queues): outq = queue.Queue() for q in list_of_queues: qr = queue_reader(q,outq) qr.start() return outq.get() 

La siguiente rutina de prueba muestra cómo usarlo:

 import multiq import queue q1 = queue.Queue() q2 = queue.Queue() q3 = multiq.multi_queue([q1,q2]) q1.put(1) q2.put(2) q1.put(3) q1.put(4) res=0 while not res==4: while not q3.empty(): res = q3.get()[1] print ("returning result =",res) 

Espero que esto ayude.

Tony Wallace

Nueva versión del código anterior …

No estoy seguro de cómo funciona la selección en una cola de multiprocesamiento en Windows. Como seleccionar en Windows escucha sockets y no manejadores de archivos, sospecho que podría haber problemas.

Mi respuesta es hacer un hilo para escuchar cada cola de una manera bloqueada, y poner todos los resultados en una sola cola escuchada por el hilo principal, esencialmente multiplexando las colas individuales en una sola.

Mi código para hacer esto es:

 """ Allow multiple queues to be waited upon. An EndOfQueueMarker marks a queue as "all data sent on this queue". When this marker has been accessed on all input threads, this marker is returned by the multi_queue. """ import queue import threading class EndOfQueueMarker: def __str___(self): return "End of data marker" pass class queue_reader(threading.Thread): def __init__(self,inq,sharedq): threading.Thread.__init__(self) self.inq = inq self.sharedq = sharedq def run(self): q_run = True while q_run: data = self.inq.get() result = (self.inq,data) self.sharedq.put(result) if data is EndOfQueueMarker: q_run = False class multi_queue(queue.Queue): def __init__(self,list_of_queues): queue.Queue.__init__(self) self.qList = list_of_queues self.qrList = [] for q in list_of_queues: qr = queue_reader(q,self) qr.start() self.qrList.append(qr) def get(self,blocking=True,timeout=None): res = [] while len(res)==0: if len(self.qList)==0: res = (self,EndOfQueueMarker) else: res = queue.Queue.get(self,blocking,timeout) if res[1] is EndOfQueueMarker: self.qList.remove(res[0]) res = [] return res def join(self): for qr in self.qrList: qr.join() def select(list_of_queues): outq = queue.Queue() for q in list_of_queues: qr = queue_reader(q,outq) qr.start() return outq.get() 

El siguiente código es mi rutina de prueba para mostrar cómo funciona:

 import multiq import queue q1 = queue.Queue() q2 = queue.Queue() q3 = multiq.multi_queue([q1,q2]) q1.put(1) q2.put(2) q1.put(3) q1.put(4) q1.put(multiq.EndOfQueueMarker) q2.put(multiq.EndOfQueueMarker) res=0 have_data = True while have_data: res = q3.get()[1] print ("returning result =",res) have_data = not(res==multiq.EndOfQueueMarker) 

A partir de Python 3.3 puede usar multiprocessing.connection.wait para esperar en varios objetos Queue._reader a la vez.

No lo hagas

Coloque un encabezado en los mensajes y envíelos a una cola común. Esto simplifica el código y será más limpio en general.