Celery periodic_task ejecutando varias veces en paralelo

Tengo un código periódico muy simple usando el enhebrado de Celery; simplemente imprime “Pre” y “Post” y duerme en el medio. Es una adaptación de esta pregunta de StackOverflow y este sitio web vinculado.

from celery.task import task from celery.task import periodic_task from django.core.cache import cache from time import sleep import main import cutout_score from threading import Lock import socket from datetime import timedelta from celery.decorators import task, periodic_task def single_instance_task(timeout): def task_exc(func): def wrapper(*args, **kwargs): lock_id = "celery-single-instance-" + func.__name__ acquire_lock = lambda: cache.add(lock_id, "true", timeout) release_lock = lambda: cache.delete(lock_id) if acquire_lock(): try: func() finally: release_lock() return wrapper return task_exc LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes @periodic_task(run_every = timedelta(seconds=2)) def test(): lock_id = "lock" # cache.add fails if if the key already exists acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE) # memcache delete is very slow, but we have to use it to take # advantage of using add() for atomic locking release_lock = lambda: cache.delete(lock_id) if acquire_lock(): try: print 'pre' sleep(20) print 'post' finally: release_lock() return print 'already in use...' 

Este código nunca se imprime 'already in use...' ; El mismo fenómeno ocurre cuando uso el decorador @single_instance_task .

¿Sabes lo que está mal?

Edición: he simplificado la pregunta para que no se escriba en la memoria (usando un caché global o django); Todavía nunca veo 'already in use...'


Edición: cuando agrego el siguiente código a mi archivo settings.py de Django (cambiando el código de https://docs.djangoproject.com/en/dev/topics/cache/ todo funciona como se espera, pero solo cuando uso el puerto 11211 (por extraño que parezca, mi servidor está en el puerto 8000)

 CACHES = { 'default': { 'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache', 'LOCATION': [ '127.0.0.1:11211' ] } } 

¿Cómo estás ejecutando apio? No estoy familiarizado con una opción roscada.

Si se está ejecutando varios procesos, no hay variables “globales” que compartan la memoria entre los trabajadores.

Si desea compartir un contador entre todos los trabajadores, le sugiero que use cache.incr .

P.ej:

 In [1]: from django.core.cache import cache In [2]: cache.set('counter',0) In [3]: cache.incr('counter') Out[3]: 1 In [4]: cache.incr('counter') Out[4]: 2 

Actualizar

Qué sucede si obliga a sus tareas a superponerse al dormir, por ejemplo:

 print "Task on %r started" % (self,) sleep(20) print "Task on %r stopped" % (self,) 

Si no obtiene “ya en uso …” ejecutando esto con más frecuencia durante 20 segundos, entonces sabe que el caché no se comporta como se esperaba.


Otra actualización

¿Ha configurado un backend de caché en su configuración de django? Por ejemplo

Si no, puede que esté utilizando el caché ficticio , que en realidad no hace ningún almacenamiento en caché, solo implementa la interfaz … que suena como una causa convincente de su problema.