Celery and routing
I need to run some tasks on the specific celeryd instance. So I configured queues:
celeryconfig.py:
CELERY_QUEUES = {
'celery': {
'exchange': 'celery',
'binding_key': 'celery',
},
'import': {
'exchange': 'import',
'binding_key': 'import.products',
},
}
CELERY_ROUTES = {
'celery_tasks.import_tasks.test': {
'queue': 'import',
'routing_key': 'import.products',
},
}
import_tasks.py:
@task
def test():
print 'test'
@task(exc开发者_Python百科hange='import', routing_key='import.products')
def test2
print 'test2'
then I start celeryd:
celeryd -c 2 -l INFO -Q import
And try to execute that tasks. 'test' executes but 'test2' do not. But I don't want to specify every importing task in the CELERY_ROUTES. How can I specify which queue should execute task in the task definition?
Oh, forgot to say that I've used send_task function to execute tasks. And this function doesn't import tasks. It just sends the name of the task to the queue.
So instead of this:
from celery.execute import send_task
result = send_task(args.task, task_args, task_kwargs)
I wrote:
from celery import current_app as celery_app, registry as celery_registry
celery_imports = celery_app.conf.get('CELERY_IMPORTS')
if celery_imports:
for module in celery_imports:
__import__(module)
task = celery_registry.tasks.get(args.task)
if task:
result = task.apply_async(task_args, task_kwargs)
See Roman's solution -- http://www.imankulov.name/posts/celery-for-internal-api.html -- to access tasks by name, but also with ability to specify queues and whatnot as if you imported the task module.
I found solution that almost satisfied me:
class CustomRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if task.startswith('celery_tasks.import_tasks'):
return {'exchange': 'import',
'routing_key': 'import.products'}
CELERY_ROUTES = (
CustomRouter(),
)
Problem is that now I can't use names for tasks.
精彩评论