开发者

Task management daemon

i have to do some long-time (2-3 days i think) tasks with my django ORM data. I look around and didnt find any good solutions.

django-tasks - http://code.google.com/p/django-tasks/ is not well documented, and i dont have any ideas how to use it.

celery - http://ask.github.com/celery/ is excessive for my tasks. Is it good for longtime tasks?

So, what i need to do, i just get all data or parts of da开发者_Go百科ta from my database, like:

Entry.objects.all()

And then i need execute same function for each one of QuerySet.

I think it should work around 2-3 days.

So, maybe someone explain for me how to build it.

P.S:at the moment i have only one idea, use cron and database to store process execution timeline.


Use Celery Sub-Tasks. This will allow you to start a long-running task (with many short-running subtasks underneath it), and keep good data on it's execution status within Celery's task result store. As an added bonus, subtasks will be spread across worker proccesses allowing you to take full advantage of multi-core servers or even multiple servers in order to reduce task runtime.

  • http://ask.github.com/celery/userguide/tasksets.html#task-sets
  • http://docs.celeryproject.org/en/latest/reference/celery.task.sets.html

EDIT: example:

import time, logging as log
from celery.task import task
from celery.task.sets import TaskSet
from app import Entry

@task(send_error_emails=True)
def long_running_analysis():
    entries = list(Entry.objects.all().values('id'))
    num_entries = len(entries)
    taskset = TaskSet(analyse_entry.subtask(entry.id) for entry in entries)
    results = taskset.apply_async()
    while not results.ready()
        time.sleep(10000)
        print log.info("long_running_analysis is %d% complete",
                       completed_count()*100/num_entries)
    if results.failed():
        log.error("Analysis Failed!")
    result_set = results.join() # brings back results in 
                                # the order of entries
    #perform collating or count or percentage calculations here
    log.error("Analysis Complete!")

@task
def analyse_entry(id): # inputs must be serialisable
    logger = analyse_entry.get_logger()
    entry = Entry.objects.get(id=id)
    try:
        analysis = entry.analyse()
        logger.info("'%s' found to be %s.", entry, analysis['status'])
        return analysis # must be a dict or serialisable.
    except Exception as e:
        logger.error("Could not process '%s': %s", entry, e)
        return None 

If your calculations cannot be seggregated to per-entry tasks, you can always set it up so that one subtask performs tallys, one subtask performs another analysis type. and this will still work, and will still allow you to benifit from parelelleism.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜