How to tell if a task has already been queued in django-celery?
Here's my setup:
- django 1.3
- celery 2.2.6
- django-celery 2.2.4
- djkombu 0.9.2
In my settings.py file I have
BROKER_BACKEND = "djkombu.transport.DatabaseTransport"
i.e. I'm just using the database to queue tasks.
Now on to my problem: I have a user-initiated task that could take a few minutes to complete. I want the task to only run once per user, and I will cache the results of the task in a temporary file so if the user initiates the task again I just return the cached file. I have code that looks like this in my view function:
task_id = "long-task-%d" % user_id
result = tasks.some_long_task.AsyncResult(task_id)
if result.state == celery.states.PENDING:
# The next line makes a duplicate task if the user rapidly refreshes the page
tasks.some_long_task.apply_async(task_id=task_id)
return HttpResponse("Task started...")
elif result.state == celery.states.STARTED:
return HttpResponse("Task is still running, please wait...")
elif result.state == celery.states.SUCCESS:
if cached_file_still_exists():
return get_cached_file()
else:
result.forget()
tasks.some_long_task.apply_async(task_id=task_id)
return HttpResponse("Task started...")
This code almost works. But I'm running into a problem when the user rapidly reloads the page. There's a 1-3 second delay between when the task is queued and when the task is finally pulled off the queue and given to a worker. During this time, the 开发者_StackOverflowtask's state remains PENDING which causes the view logic to kick off a duplicate task.
What I need is some way to tell if the task has already been submitted to the queue so I don't end up submitting it twice. Is there a standard way of doing this in celery?
I solved this with Redis. Just set a key in redis for each task and then remove the key from redis in task's after_return method. Redis is lightweight and fast.
I don't think (as Tomek and other have suggested) that using the database is the way to do this locking. django has built-in cache framework, which should be sufficient to accomplish this locking, and must faster. See:
http://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html#cookbook-task-serial
Django can be configured to use memcached
as its cache backend, and this can be distributed across multiple machines ... this seems better to me. Thoughts?
You can cheat a bit by storing the result manually in the database. Let me explain how this will help.
For example, if using RDBMS (table with columns - task_id, state, result):
View part:
- Use transaction management.
- Use SELECT FOR UPDATE to get row where task_id == "long-task-%d" % user_id. SELECT FOR UPDATE will block other requests until this one COMMITs or ROLLBACKs.
- If it doesn't exist - set state to PENDING and start the 'some_long_task', end the request.
- If the state is PENDING - inform the user.
- If the state is SUCCESS - set state to PENDING, start the task, return the file pointed to by 'result' column. I base this on the assumption, that you want to re-run the task on getting the result. COMMIT
- If the state is ERROR - set state to PENDING, start the task, inform the user. COMMIT
Task part:
- Prepare the file, wrap in try, catch block.
- On success - UPDATE the proper row with state = SUCCESS, result.
- On failure - UPDATE the proper row with state = ERROR.
精彩评论