开发者

django celery: how to set task to run at specific interval programmatically

I found that I can set the task to run at specific interval at specific times from here, but that was only done dur开发者_Python百科ing task declaration. How do I set a task to run periodically dynamically?


The schedule is derived from a setting, and thus seems to be immutable at runtime.

You can probably accomplish what you're looking for using Task ETAs. This guarantees that your task won't run before the desired time, but doesn't promise to run the task at the designated time—if the workers are overloaded at the designated ETA, the task may run later.

If that restriction isn't an issue, you could write a task which would first run itself like:

@task
def mytask():
    keep_running = # Boolean, should the task keep running?
    if keep_running:
        run_again = # calculate when to run again
        mytask.apply_async(eta=run_again)
    # ... do the stuff you came here to do ...

The major downside of this approach is that you are relying on the taskstore to remember the tasks in flight. If one of them fails before firing off the next one, then the task will never run again. If your broker isn't persisted to disk and it dies (taking all in-flight tasks with it), then none of those tasks will run again.

You could solve these issues with some kind of transaction logging and a periodic "nanny" task whose job it is to find such repeating tasks that died an untimely death and revive them.

If I had to implement what you've described, I think this is how I would approach it.


celery.task.base.PeriodicTask defines is_due which determines when the next run should be. You could override this function to contain your custom dynamic running logic. See the docs here: http://docs.celeryproject.org/en/latest/reference/celery.task.base.html?highlight=is_due#celery.task.base.PeriodicTask.is_due

An example:

import random
from celery.task import PeriodicTask

class MyTask(PeriodicTask):

    def run(self, **kwargs):
        logger = self.get_logger(**kwargs)
        logger.info("Running my task")

    def is_due(self, last_run_at):
        # Add your logic for when to run. Mine is random
        if random.random() < 0.5:
            # Run now and ask again in a minute
            return (True, 60)
        else:
            # Don't run now but run in 10 secs
            return (True, 10)


see here http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html

i think you can't make it dynamically ... best way is create task in task :D

for example you want run something for X sec later then you create new task with x sec delay and in this task create another task for N*X sec delay ...


This should help you some... http://celery.readthedocs.org/en/latest/faq.html#can-i-change-the-interval-of-a-periodic-task-at-runtime

Once you've defined a custom schedule, assign it to your task as asksol has suggested above.

CELERYBEAT_SCHEDULE = {    
    "my_name": {
        "task": "myapp.tasks.task",
        "schedule": myschedule(),    
    }
}

You might also want to modify CELERYBEAT_MAX_LOOP_INTERVAL if you want your schedule to update more often than every five minutes.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜