开发者

Executing Celery tasks with dependency graph

I would like to have Celery tasks that depend on the result of 2 or more other tasks. I have looked into Python+Celery: Chaining jobs? and http://pypi.python.org/pypi/celery-tasktree , but those are good only if tasks have just one dependent task.

I know about TaskSet, but there does not seem to be a way to instantly execute a callback when TaskSetResult.ready() becomes True. What I have in mind right now is to have a periodic task that polls TaskSetResult.ready() every few [milli]seconds or so and fire the callback as it retur开发者_JAVA百科ns True, but that sounds rather inelegant to me.

Any suggestions?


In the recent versions of Celery (3.0+) you can use a so-called chord to achieve the desired effect:

From http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives:

Simple chord

The chord primitive enables us to add callback to be called when all of the tasks in a group have finished executing, which is often required for algorithms that aren't embarrassingly parallel:

 >>> from celery import chord
 >>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()
 >>> res.get()
 90

Disclaimer: I haven't tried this myself yet.


mrbox is true, you can retry until the results are ready, but is not so clear in the docs that when you retry you have to pass the setid and the subtasks elements, and for recovery it you have to use the map function, below there is a sample code for explain what I mean.

def run(self, setid=None, subtasks=None, **kwargs):

    if not setid or not subtasks:
        #Is the first time that I launch this task, I'm going to launch the subtasks
        …
        tasks = []
        for slice in slices:
            tasks.append(uploadTrackSlice.subtask((slice,folder_name)))

        job = TaskSet(tasks=tasks)
        task_set_result = job.apply_async()
        setid = task_set_result.taskset_id
        subtasks = [result.task_id for result in task_set_result.subtasks]
        self.retry(exc=Exception("Result not ready"), args=[setid,subtasks])

    #Is a retry than we just have to check the results        
    tasks_result = TaskSetResult(setid, map(AsyncResult,subtasks))
    if not tasks_result.ready():
        self.retry(exc=Exception("Result not ready"), args=[setid,subtasks])
    else:    
        if tasks_result.successful():
            return tasks_result.join()
        else:
            raise Exception("Some of the tasks was failing")


IMHO you can do sth similiar to the thing done in docs- link

Or you can use retry method with max_retries=None - if one of the 'base' tasks .ready() is false, you can fire .retry() method till the both 'base' tasks are completed.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜