Detectar si el apio está disponible / en ejecución

Estoy usando apio para gestionar tareas asíncronas. De vez en cuando, sin embargo, el proceso de apio disminuye, lo que hace que ninguna de las tareas se ejecute. Me gustaría poder verificar el estado del apio y asegurarme de que todo funciona bien y, si detecto algún problema, aparece un mensaje de error para el usuario. De la documentación de Celery Worker, parece que podría usar ping o inspeccionar para esto, pero ping se siente mal y no está claro exactamente cómo se debe usar inspeccionar (si inspect (). Registered () ¿está vacío?).

Cualquier orientación sobre esto sería apreciada. Básicamente lo que busco es un método así:

def celery_is_alive(): from celery.task.control import inspect return bool(inspect().registered()) # is this right?? 

EDITAR: Ni siquiera parece que registered () está disponible en apio 2.3.3 (a pesar de que la documentación 2.1 lo enumera). Tal vez ping es la respuesta correcta.

EDIT: Ping tampoco parece hacer lo que pensé que haría, así que todavía no estoy seguro de la respuesta aquí.

Aquí está el código que he estado usando. celery.task.control.Inspect.stats() devuelve un dictado que contiene muchos detalles sobre los trabajadores disponibles actualmente, Ninguno si no hay trabajadores en ejecución, o genera un IOError si no puede conectarse al intermediario de mensajes. Estoy utilizando RabbitMQ: es posible que otros sistemas de mensajería se comporten de forma ligeramente diferente. Esto funcionó en Celery 2.3.xy 2.4.x; No estoy seguro de lo lejos que va.

 def get_celery_worker_status(): ERROR_KEY = "ERROR" try: from celery.task.control import inspect insp = inspect() d = insp.stats() if not d: d = { ERROR_KEY: 'No running Celery workers were found.' } except IOError as e: from errno import errorcode msg = "Error connecting to the backend: " + str(e) if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED': msg += ' Check that the RabbitMQ server is running.' d = { ERROR_KEY: msg } except ImportError as e: d = { ERROR_KEY: str(e)} return d 

Lo siguiente me funcionó:

 import socket from kombu import Connection celery_broker_url = "amqp://localhost" try: conn = Connection(celery_broker_url) conn.ensure_connection(max_retries=3) except socket.error: raise RuntimeError("Failed to connect to RabbitMQ instance at {}".format(celery_broker_url)) 

Para verificar lo mismo usando la línea de comandos en caso de que apio se esté ejecutando como demonio,

  • Active virtualenv y vaya al directorio donde está la ‘aplicación’
  • Ahora ejecuta: celery -A [app_name] status
  • Se mostrará si el apio está arriba o no, más no. de nodos en linea

Fuente: http://michal.karzynski.pl/blog/2014/05/18/setting-up-an-asynchronous-task-queue-for-django-using-celery-redis/

De la documentación de apio 4.2 :

 from your_celery_app import app def get_celery_worker_status(): i = app.control.inspect() stats = i.stats() registered_tasks = i.registered() active_tasks = i.active() scheduled_tasks = i.scheduled() result = { 'stats': stats, 'registered_tasks': registered_tasks, 'active_tasks': active_tasks, 'scheduled_tasks': scheduled_tasks } return result 

por supuesto que podría / debería mejorar el código con el manejo de errores …

Un método para probar si algún trabajador está respondiendo es enviar una transmisión de ‘ping’ y regresar con un resultado exitoso en la primera respuesta.

 from .celery import app # the celery 'app' created in your project def is_celery_working(): result = app.control.broadcast('ping', reply=True, limit=1) return bool(result) # True if at least one result 

Esto transmite un ‘ping’ y esperará hasta un segundo para las respuestas. Tan pronto como llegue la primera respuesta, devolverá un resultado. Si desea un resultado False más rápido, puede agregar un argumento de timeout para reducir el tiempo de espera antes de rendirse.