Ejecutando tareas “únicas” con apio.

Utilizo apio para actualizar las fonts RSS en mi sitio de agregación de noticias. Uso una @task para cada feed, y las cosas parecen funcionar bien.

Sin embargo, hay un detalle que no estoy seguro de manejar bien: todas las fonts se actualizan una vez por minuto con @periodic_task, pero ¿qué sucede si una fuente se está actualizando desde la última tarea periódica cuando se inicia una nueva? (por ejemplo, si la fuente es muy lenta o está fuera de línea y la tarea se mantiene en un ciclo de rebash)

Actualmente almaceno los resultados de las tareas y verifico su estado de esta manera:

import socket from datetime import timedelta from celery.decorators import task, periodic_task from aggregator.models import Feed _results = {} @periodic_task(run_every=timedelta(minutes=1)) def fetch_articles(): for feed in Feed.objects.all(): if feed.pk in _results: if not _results[feed.pk].ready(): # The task is not finished yet continue _results[feed.pk] = update_feed.delay(feed) @task() def update_feed(feed): try: feed.fetch_articles() except socket.error, exc: update_feed.retry(args=[feed], exc=exc) 

Tal vez haya una forma más sofisticada / robusta de lograr el mismo resultado utilizando algún mecanismo de apio que no pude ver.

De la documentación oficial: Asegurar que una tarea solo se ejecute una a la vez .

Basado en la respuesta de MattH, podrías usar un decorador como este:

 def single_instance_task(timeout): def task_exc(func): @functools.wraps(func) def wrapper(*args, **kwargs): lock_id = "celery-single-instance-" + func.__name__ acquire_lock = lambda: cache.add(lock_id, "true", timeout) release_lock = lambda: cache.delete(lock_id) if acquire_lock(): try: func(*args, **kwargs) finally: release_lock() return wrapper return task_exc 

entonces, utilízalo como tal …

 @periodic_task(run_every=timedelta(minutes=1)) @single_instance_task(60*10) def fetch_articles() yada yada... 

El uso de https://pypi.python.org/pypi/celery_once parece hacer el trabajo realmente bien, incluido el informe de errores y las pruebas de algunos parámetros para determinar su exclusividad.

Puedes hacer cosas como:

 from celery_once import QueueOnce from myapp.celery import app from time import sleep @app.task(base=QueueOnce, once=dict(keys=('customer_id',))) def start_billing(customer_id, year, month): sleep(30) return "Done!" 

que solo necesita los siguientes ajustes en tu proyecto:

 ONCE_REDIS_URL = 'redis://localhost:6379/0' ONCE_DEFAULT_TIMEOUT = 60 * 60 # remove lock after 1 hour in case it was stale 

Si está buscando un ejemplo que no usa Django, intente este ejemplo (caveat: usa Redis, que ya estaba usando).

El código de decoración es el siguiente (crédito completo para el autor del artículo, vaya a leerlo)

 import redis REDIS_CLIENT = redis.Redis() def only_one(function=None, key="", timeout=None): """Enforce only one celery task at a time.""" def _dec(run_func): """Decorator.""" def _caller(*args, **kwargs): """Caller.""" ret_value = None have_lock = False lock = REDIS_CLIENT.lock(key, timeout=timeout) try: have_lock = lock.acquire(blocking=False) if have_lock: ret_value = run_func(*args, **kwargs) finally: if have_lock: lock.release() return ret_value return _caller return _dec(function) if function is not None else _dec 

Esta solución para apio trabajando en un solo host con una mayor concurrencia 1. Otros tipos (sin dependencias como redis) de lockings de diferencia basados ​​en archivos no funcionan con una concurrencia mayor 1.

 class Lock(object): def __init__(self, filename): self.f = open(filename, 'w') def __enter__(self): try: flock(self.f.fileno(), LOCK_EX | LOCK_NB) return True except IOError: pass return False def __exit__(self, *args): self.f.close() class SinglePeriodicTask(PeriodicTask): abstract = True run_every = timedelta(seconds=1) def __call__(self, *args, **kwargs): lock_filename = join('/tmp', md5(self.name).hexdigest()) with Lock(lock_filename) as is_locked: if is_locked: super(SinglePeriodicTask, self).__call__(*args, **kwargs) else: print 'already working' class SearchTask(SinglePeriodicTask): restart_delay = timedelta(seconds=60) def run(self, *args, **kwargs): print self.name, 'start', datetime.now() sleep(5) print self.name, 'end', datetime.now()