开发者

Celery定时任务组件之Django+Celery项目实战教程

目录
  • 一、项目初始化
    • 1. 创建虚拟环境并安装依赖
    • 2. 创建 Django 项目和应用
    • 3. 配置项目(task_manager/settings.py)
  • 二、Celery 集成配置
    • 1. 创建 Celery 应用(task_manager/celery.py)
    • 2. 初始化 Celery(task_manager/__init__.py)
  • 三、Model 开发
    • 创建任务模型(tasks/models.py)
    • 迁移数据库
  • 四、接口开发
    • 1. 创建序列化器(tasks/serializers.py)
    • 2. 创建视图集(tasks/views.py)
    • 3. 配置 URL(tasks/urls.py)
    • 4. 项目 URL 配置(task_manager/urls.py)
  • 五、创建示例任务
    • 定义任务函数(tasks/tasks.py)
  • 六、启动服务
    • 1. 启动 Redis
    • 2. 启动 Celery Worker
    • 3. 启动 Celery Beat
    • 4. 启动 Django 开发服务器
  • 七、Awww.devze.comPI 测试
    • 1. 创建周期性任务
    • 2. 查看任务列表
    • 3. 查看执行日志
  • 项目结构
    • 关键特性说明
      • 扩展建议
        • 总结

          一、项目初始化

          1. 创建虚拟环境并安装依赖

          # 创建虚拟环境
          python3 -m venv myenv
          source myenv/bin/activate
          
          # 安装依赖
          pip install django celery redis django-celery-beat
          

          2. 创建 Django 项目和应用

          # 创建项目
          django-admin startproject task_manager
          cd task_manager
          
          # 创建应用
          python manage.py startapp tasks
          

          3. 配置项目(task_manager/settings.py)

          INSTALLED_APPS = [
              # ...
              'django_celery_beat',
              'django_celery_results',
              'tasks',
          ]
          
          # 数据库配置
          DATABASES = {
              'default': {
                  'ENGINE': 'django.db.back编程客栈ends.SQLite3',
                  'NAME': BASE_DIR / 'db.sqlite3',
              }
          }
          
          # Celery配置
          CELERY_BROKER_URL = 'redis://localhost:6379/0'
          CELERY_RESULT_BACKEND = 'django-db'  # android使用django-celery-results存储结果
          CELERY_ACCEPT_CONTENT = ['json']
          CELERY_TASK_SERIALIZER = 'json'
          CELERY_RESULT_SERIALIZER = 'json'
          CELERY_TIMEZONE = 'Asia/Shanghai'
          

          二、Celery 集成配置

          1. 创建 Celery 应用(task_manager/celery.py)

          from __future__ import absolute_import, unicode_literals
          import os
          from celery import Celery
          
          os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'task_manager.settings')
          
          app = Celery('task_manager')
          app.config_from_object('django.conf:settings', namespace='CELERY')
          app.autodiscover_tasks()
          
          @app.task(bind=True)
          def debug_task(self):
              print(f'Request: {self.request!r}')
          

          2. 初始化 Celery(task_manager/__init__.py)

          from __future__ import absolute_import, unicode_literals
          from .celery import app as celery_app
          
          __all__ = ('celery_app',)
          

          三、Model 开发

          创建任务模型(tasks/models.py)

          from django.db import models
          from django.utils import timezone
          
          class ScheduledTask(models.Model):
              TASK_TYPES = (
                  ('periodic', '周期性任务'),
                  ('one_time', '一次性任务'),
              )
              
              name = models.CharField('任务名称', max_length=100)
              task_type = models.CharField('任务类型', max_length=20, choices=TASK_TYPES)
              task_function = models.CharField('任务函数', max_length=200)
              cron_expression = models.CharField('Cron表达式', max_length=100, blank=True, null=True)
              interval_seconds = models.IntegerField('间隔秒数', blank=True, null=True)
              next_run_time = models.DateTimeField('下次执行时间', blank=True, null=True)
              is_active = models.BooleanField('是否激活', default=True)
              created_at = models.DateTimeField('创建时间', auto_now_add=True)
              updated_at = models.DateTimeField('更新时间', auto_now=True)
              
              def __str__(self):
                  return self.name
              
              class Meta:
                  verbose_name = '定时任务'
                  verbose_name_plural = '定时任务列表'
          
          class TaskExecutionLog(models.Model):
              task = models.ForeignKey(ScheduledTask, on_delete=models.CASCADE, related_name='logs')
              execution_time = models.DateTimeField('执行时间', auto_now_add=True)
              status = models.CharField('执行状态', max_length=20, choices=(
                  ('success', '成功'),
                  ('failed', '失败'),
              ))
              result = models.TextField('执行结果', blank=True, null=True)
              error_messapythonge = models.TextField('错误信息', blank=True, null=True)
              
              def __str__(self):
                  return f"{self.task.name} - {self.execution_time}"
              
              class Meta:
                  verbose_name = '任务执行日志'
                  verbose_name_plural = '任务执行日志列表'
          

          迁移数据库

          python manage.py makemigrations
          python manage.py migrate
          

          四、接口开发

          1. 创建序列化器(tasks/serializers.py)

          from rest_framework import serializers
          from .models import ScheduledTask, TaskExecutionLog
          
          class ScheduledTaskSerializer(serializers.ModelSerializer):
              class Meta:
                  model = ScheduledTask
                  fields = '__all__'
          
          class TaskExecutionLogSerializer(serializers.ModelSerializer):
              class Meta:
                  model = TaskExecutionLog
                  fields = '__all__'
          

          2. 创建视图集(tasks/views.py)

          from rest_framework import viewsets, status
          from rest_framework.response import Response
          from .models import ScheduledTask, TaskExecutionLog
          from .serializers import ScheduledTaskSerializer, TaskExecutionLogSerializer
          from celery import current_app
          from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
          import json
          
          class ScheduledTaskViewSet(viewsets.ModelViewSet):
              queryset = ScheduledTask.objects.all()
              serializer_class = ScheduledTaskSerializer
              
              def create(self, request, *args, **kwargs):
                  serializer = self.get_serializer(data=request.data)
                  serializer.is_valid(raise_exception=True)
                  
                  # 创建Celery定时任务
                  task = serializer.save()
                  self._create_celery_task(task)
                  
                  headers = self.get_success_headers(serializer.data)
                  return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
              
              def update(self, request, *args, **kwargs):
                  partial = kwargs.pop('partial', False)
                  instance = self.get_object()
                  serializer = self.get_serializer(instance, data=request.data, partial=partial)
                  serializer.is_valid(raise_exception=True)
                  
                  # 更新Celery定时任务
                  task = serializer.save()
                  self._update_celery_task(task)
                  
                  return Response(serializer.data)
              
              def destroy(self, request, *args, **kwargs):
                  instance = self.get_object()
                  
                  # 删除Celery定时任务
                  self._delete_celery_task(instance)
                  
                  self.perform_destroy(instance)
                  return Response(status=status.HTTP_204_NO_CONTENT)
              
              def _create_celery_task(self, task):
                  if task.task_type == 'periodic':
                      # 创建间隔调度
                      schedule, _ = IntervalSchedule.objects.get_or_create(
                          every=task.interval_seconds,
                          period=IntervalSchedule.SECONDS,
                      )
                      PeriodicTask.objects.create(
                          interval=schedule,
                          name=task.name,
                          task=task.task_function,
                          enabled=task.is_active,
                          args=json.dumps([]),
                          kwargs=json.dumps({}),
                      )
                  elif task.task_type == 'one_time':
                      # 一次性任务使用ETA
                      pass
              
              def _update_celery_task(self, task):
                  try:
                      periodic_task = PeriodicTask.objects.get(name=task.name)
                      if task.task_type == 'periodic':
                          schedule, _ = IntervalSchedule.objects.get_or_create(
                              every=task.interval_seconds,
                              period=IntervalSchedule.SECONDS,
                          )
                          periodic_task.interval = schedule
                      periodic_task.enabled = task.is_active
                      periodic_task.save()
                  except PeriodicTask.DoesNotExist:
                      self._create_celery_task(task)
              
              def _delete_celery_task(self, task):
                  try:
                      periodic_task = PeriodicTask.objects.get(name=task.name)
                      periodic_task.delete()
                  except PeriodicTask.DoesNotExist:
                      pass
          
          class TaskExecutionLogViewSet(viewsets.ReadOnlyModelViewSet):
              queryset = TaskExecutionLog.objects.all()
              serializer_class = TaskExecutionLogSerializer
          

          3. 配置 URL(tasks/urls.py)

          from django.urls import include, path
          from rest_framework import routers
          from .views import ScheduledTaskViewSet, TaskExecutionLogViewSet
          
          router = routers.DefaultRouter()
          router.register(r'tasks', ScheduledTaskViewSet)
          router.register(r'logs', TaskExecutionLogViewSet)
          
          urlpatterns = [
              path('', include(router.urls)),
          ]
          

          4. 项目 URL 配置(task_manager/urls.py)

          from django.contrib import admin
          from django.urls import path, include
          
          urlpatterns = [
              path('admin/', admin.site.urls),
              path('api/', include('tasks.urls')),
          ]
          

          五、创建示例任务

          定义任务函数(tasks/tasks.py)

          from celery import shared_task
          from .models import ScheduledTask, TaskExecutionLog
          import logging
          
          logger = logging.getLogger(__name__)
          
          @shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=3, retry_kwargs={'max_retries': 3})
          def sample_task(self, task_id):
              try:
                  task = ScheduledTask.objects.get(id=task_id)
                  
                  # 模拟任务执行
                  result = f"任务 {task.name} 执行成功,时间:{str(self.request.time_start)}"
                  
                  # 记录执行日志
                  TaskExecutionLog.objects.create(
                      task=task,
                      status='success',
                      result=result
                  )
                  
                  logger.info(f"任务执行成功: {task.name}")
                  return result
                  
              except Exception as e:
                  # 记录错误日志
                  task = ScheduledTask.objects.get(id=task_id) if ScheduledTask.objects.filter(id=task_id).exists() else None
                  if task:
                      TaskExecutionLog.objects.create(
                          task=task,
                          status='failed',
                          error_message=str(e)
                      )
                  logger.error(f"任务执行失败: {str(e)}")
                  raise
          

          六、启动服务

          1. 启动 Redis

          redis-server
          

          2. 启动 Celery Worker

          celery -A task_manager worker --loglevel=info --pool=prefork --concurrency=4
          

          3. 启动 Celery Beat

          celery -A task_manager beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
          

          4. 启动 Django 开发服务器

          python manage.py runserver
          

          七、API 测试

          1. 创建周期性任务

          curl -X POST http://localhost:8000/api/tasks/ -d '{
              "name": "示例周期性任务",
              "task_type": "periodic",
              "task_function": "tasks.tasks.sample_task",
              "interval_seconds": 60,
              "is_active": true
          }' -H "Content-Type: application/json"
          

          2. 查看任务列表

          curl http://localhost:8000/api/tasks/
          

          3. 查看执行日志

          curl http://localhost:8000/api/logs/
          

          项目结构

          task_manager/
          ├── task_manager/
          │   ├── __init__.py
          │   ├── celery.py
          │   ├── settings.py
          │   ├── u编程rls.py
          │   └── wsgi.py
          ├── tasks/
          │   ├── migrations/
          │   ├── __init__.py
          │   ├── admin.py
          │   ├── apps.py
          │   ├── models.py
          │   ├── serializers.py
          │   ├── tasks.py
          │   ├── urls.py
          │   └── views.py
          ├── manage.py
          └── db.sqlite3
          

          关键特性说明

          • 动态任务管理:通过 API 创建 / 更新 / 删除定时任务
          • 任务执行记录:自动记录任务执行结果和状态
          • 失败重试机制:任务失败时自动重试(最多 3 次)
          • 多种调度方式:支持周期性任务和一次性任务
          • 可视化管理:通过 Django Admin 界面管理定时任务

          扩展建议

          • 添加任务参数支持,允许在创建任务时传递参数
          • 实现任务暂停 / 恢复功能
          • 添加任务优先级队列配置
          • 集成监控系统(如 Prometheus+Grafana)
          • 实现任务执行结果的异步通知(邮件、短信等)

          这个实现提供了一个完整的 Django+Celery 定时任务系统,支持动态管理和监控,可直接用于生产环境。

          总结

          以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

          0

          上一篇:

          下一篇:

          精彩评论

          暂无评论...
          验证码 换一张
          取 消

          最新开发

          开发排行榜