En Pika o RabbitMQ, ¿cómo verifico si algún consumidor está consumiendo actualmente?

Me gustaría verificar si un consumidor / trabajador está presente para consumir un mensaje que estoy a punto de enviar.

Si no hay ningún Trabajador , comenzaría algunos trabajadores (tanto los consumidores como los editores están en una sola máquina) y luego publicaría los Mensajes .

Si hay una función como connection.check_if_has_consumers , la implementaría de esta forma:

 import pika import workers # code for publishing to worker queue connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # if there are no consumers running (would be nice to have such a function) if not connection.check_if_has_consumers(queue="worker_queue", exchange=""): # start the workers in other processes, using python's `multiprocessing` workers.start_workers() # now, publish with no fear of your queues getting filled up channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True) channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin", properties=pika.BasicProperties(delivery_mode=2)) connection.close() 

Pero no puedo encontrar ninguna función con la funcionalidad check_if_has_consumers en pika .

¿Hay alguna manera de lograr esto, usando pika ? ¿O tal vez, hablando directamente con El Conejo ?

No estoy completamente seguro, pero realmente creo que RabbitMQ estaría al tanto del número de consumidores suscritos a diferentes colas, ya que les envía mensajes y acepta recibos.

Acabo de comenzar con RabbitMQ hace 3 horas … cualquier ayuda es bienvenida …

Aquí está el código workers.py que escribí, si es que te ayuda …

 import multiprocessing import pika def start_workers(num=3): """start workers as non-daemon processes""" for i in xrange(num): process = WorkerProcess() process.start() class WorkerProcess(multiprocessing.Process): """ worker process that waits infinitly for task msgs and calls the `callback` whenever it gets a msg """ def __init__(self): multiprocessing.Process.__init__(self) self.stop_working = multiprocessing.Event() def run(self): """ worker method, open a channel through a pika connection and start consuming """ connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost') ) channel = connection.channel() channel.queue_declare(queue='worker_queue', auto_delete=False, durable=True) # don't give work to one worker guy until he's finished channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='worker_queue') # do what `channel.start_consuming()` does but with stopping signal while len(channel._consumers) and not self.stop_working.is_set(): channel.transport.connection.process_data_events() channel.stop_consuming() connection.close() return 0 def signal_exit(self): """exit when finished with current loop""" self.stop_working.set() def exit(self): """exit worker, blocks until worker is finished and dead""" self.signal_exit() while self.is_alive(): # checking `is_alive()` on zombies kills them time.sleep(1) def kill(self): """kill now! should not use this, might create problems""" self.terminate() self.join() def callback(channel, method, properties, body): """pika basic consume callback""" print 'GOT:', body # do some heavy lifting here result = save_to_database(body) print 'DONE:', result channel.basic_ack(delivery_tag=method.delivery_tag) 

EDITAR:

Tengo que seguir adelante, así que aquí hay una solución que voy a tomar, a menos que se presente un mejor enfoque,

Entonces, RabbitMQ tiene estas API de administración de HTTP , funcionan después de que haya activado el complemento de administración y en la mitad de la página de API de HTTP hay

/ api / connections – Una lista de todas las conexiones abiertas.

/ api / connections / name – Una conexión individual. BORRAR se cerrará la conexión.

Por lo tanto, si conecto mis Trabajadores y mis Producidos por nombres / usuarios de Conexión diferentes, podré verificar si la Conexión de Trabajador está abierta … (puede haber problemas cuando el trabajador muere …)

Estaremos esperando una mejor solución …

EDITAR:

acabo de encontrar esto en la documentación de rabbitmq, pero sería un error para Python:

 shobhit@oracle:~$ sudo rabbitmqctl -p vhostname list_queues name consumers Listing queues ... worker_queue 0 ...done. 

para que yo pudiera hacer algo como

 subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'") 

hacky … aún espero que pika tenga alguna función de python para hacer esto …

Gracias,

Yo también estaba investigando esto también. Después de leer la fuente y los documentos, encontré lo siguiente en channel.py:

 @property def consumer_tags(self): """Property method that returns a list of currently active consumers :rtype: list """ return self._consumers.keys() 

Mi propia prueba fue exitosa. Utilicé lo siguiente donde mi objeto de canal es self._channel:

 if len(self._channel.consumer_tags) == 0: LOGGER.info("Nobody is listening. I'll come back in a couple of minutes.") ... 

De hecho, encontré esto en un accidente buscando un problema diferente, pero una cosa que puede ayudarlo es con la función Basic_Publish, hay un parámetro “Inmediato” cuyo valor predeterminado es Falso.

Una idea que podría hacer es establecer la Marca inmediata en Verdadero, que requerirá que un consumidor la consum de inmediato, en lugar de sentarse en una cola. Si un trabajador no está disponible para consumir el mensaje, rechazará un error y le indicará que inicie otro trabajador.

Dependiendo del rendimiento de su sistema, esto podría generar una gran cantidad de trabajadores adicionales, o generar trabajadores para reemplazar a los trabajadores muertos. Para el problema anterior, puede escribir un sistema similar a un administrador que simplemente rastree a los trabajadores a través de una cola de control, donde puede decirle a un “Runner” como un proceso para eliminar procesos de trabajadores que ya no son necesarios.