Celery task schedule (Ensuring a task is only executed one at a time)
It is invalid to access local variables since you can have several celery workers running tasks. And those workers might even be on different hosts. So, basically, there is as many is_locked
variable instances as many Celery workers are running
your async_work
task. Thus, even though your code won't raise any errors you wouldn't get desired effect with it.
To achieve you goal you need to configure Celery to run only one worker. Since any worker can process a single task at any given time you get what you need.
EDIT:
According to Workers Guide > Concurrency:
By default multiprocessing is used to perform concurrent execution of tasks, but you can also use Eventlet. The number of worker processes/threads can be changed using the
--concurrency
argument and defaults to the number of CPUs available on the machine.
Thus you need to run the worker like this:
$ celery worker --concurrency=1
EDIT 2:
Surprisingly there's another solution, moreover it is even in the official docs, see the Ensuring a task is only executed one at a time article.
You probably don't want to use concurrency=1
for your celery workers - you want your tasks to be processed concurrently. Instead you can use some kind of locking mechanism. Just ensure timeout for cache is bigger than time to finish your task.
Redis
import redis
from contextlib import contextmanager
redis_client = redis.Redis(host='localhost', port=6378)
@contextmanager
def redis_lock(lock_name):
"""Yield 1 if specified lock_name is not already set in redis. Otherwise returns 0.
Enables sort of lock functionality.
"""
status = redis_client.set(lock_name, 'lock', nx=True)
try:
yield status
finally:
redis_client.delete(lock_name)
@task()
def async_work(info):
with redis_lock('my_lock_name') as acquired:
do_some_work()
Memcache
Example inspired by celery documentation
from contextlib import contextmanager
from django.core.cache import cache
@contextmanager
def memcache_lock(lock_name):
status = cache.add(lock_name, 'lock')
try:
yield status
finally:
cache.delete(lock_name)
@task()
def async_work(info):
with memcache_lock('my_lock_name') as acquired:
do_some_work()