Django集成Celery之状态监控与任务管理详解
目录
- 使用 Django 管理 Celery Worker
- 安装 Django 和相关包
- 创建 Django 项目和应用
- 配置 Django 和 Celery
- 创建一个 Celery 任务
- 注册自定义的 TaskResultAdmin
- 创建 Django Admin 界面的自定义模板
- 运行 Django 和 Celery
- 使用 Django Admin 管理 Celery 任务
- 启动 Django 本地的 Celery Worker
- 创建 Celery 任务
- 创建触发任务的视图
- 更新 Celery 配置
- 启动 Celery Worker 和 Django 服务器
- 触发任务并在 Django Admin 界面中查看
- 在 Django Admin 界面查看任务状态
- 启动远程的 Celery Worker
- 安装 Celery Worker
- 配置 Celery Worker
- 定义任务
- 启动 Celery Worker
- 启动和测试
- 使用 Django Admin 管理 Flask 启动的 Celery Worker 的常见问题
- 检查 Flask 应用的 Celery 配置
- 检查 Django 的 Celery 配置
- 同步数据库
- 确保 Django Admin 中注册了相关模型
- 测试 Celery 任务
- 检查 Celery Worker 配置
- 总结
如何通过 Django 来管理 Celery 任务?通过 Django Admin 界面提供任务的查询、查看、重试、终止等功能?下面是一个完整的步骤指南。
使用 Django 管理 Celery Worker
安装 Django 和相关包
首先,创建一个新的虚拟环境并安装所需的包。
python -m venv myenv source myenv/bin/activate # Windows 系统使用: myenv\Scripts\activate pip install django django-celery-results django-celery-beat celery
创建 Django 项目和应用
django-admin startproject myproject cd myproject django-admin startapp myapp
配置 Django 和 Celery
在 myproject/settings.py
文件中添加以下内容:
INSTALLED_APPS = [ ..., 'django_celery_results', 'django_celery_beat', 'myapp', # 确保 app 在这个列表里 ] CELERY_BROKER_URL = 'Redis://localhost:6379/0' # 使用 Redis 作为示例,可以根据需求更改 CELERY_RESULT_BACKEND = 'django-db' CELERY_CACHE_BACKEND = 'django-cache' CELERY_TRACK_STARTED = True CELERY_SEND_EVENTS = True # 确保已经配置了数据库 DATABASES = { 'default': { 'ENGINE': 'django.db.backends.SQLite3', 'NAME': BASE_DIR / 'db.sqlite3', } } # 配置 Django 缓存(可选) CACHES = { 'default': { 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', } }
在 myproject
目录中创建一个 celery.py
文件:
from __future__ import absolute_import, unicode_literals import os from celery import Celery from django.conf import settings # 设置 Django 的配置模块 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings') app = Celery('myproject') # 从 Django 的设置中配置 Celery app.config_from_object('django.conf:settings', namespace='CELERY') # 自动发现任务 # app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print(f'Request: {self.request!r}')
修改 myproject/__init__.py
文件,使得 Django 在启动时加载 Celery:
from __future__ import absolute_import, unicode_literals # 这将确保当 Django 启动时加载 app.py from .celery import app as celery_app __all__ = ('celery_app',)
创建一个 Celery 任务
在 myapp/tasks.py
中创建一个简单的 Celery 任务:
from celery import shared_task @shared_task def add(x, y): return x + y
注册自定义的 TaskResultAdmin
我们需要在自定义 TaskResultAdmin
之前先取消已经注册的模型。
在 myapp/admin.py
中做如下修改:
from django.contrib import admin from django_celery_results.models import TaskResult from django_celery_results.admin import TaskResultAdmin as DefaultTaskResultAdmin from django.urls import path from django.shortcuts import redirect from celery.result import AsyncResult from myproject.celery import app # 取消已经注册的 TaskResult admin.site.unregister(TaskResult) # 创建一个自定义的 TaskResultAdmin 继承自默认的 TaskResultAdmin class CustomTaskResultAdmin(DefaultTaskResultAdmin): change_list_template = "admin/celery_task_changelist.html" def get_urls(self): urls = super().get_urls() custom_urls = [ path('retry/<task_id>/', self.admin_site.admin_view(self.retry_task), name='retry-task'), path('terminate/<task_id>/', self.admin_site.admin_view(self.terminate_task), name='terminate-task'), ] return custom_urls + urls def retry_task(self, request, task_id, *args, **kwargs): AsyncResult(task_id, app=app).reapply() self.message_user(request, f'Task {task_id} retried successfully.') return redirect('..') def terminate_task(self, request, task_id, *args, **kwargs): AsyncResult(task_id, app=app).revoke(terminate=True) self.message_user(request, f'Task {task_id} terminated successfully.') return redirect('..') # 注册自定义的 TaskResultAdmin admin.site.register(TaskResult, CustomTaskResultAdmin)
TaskResult
模型已经被 django_celery_results
自动注册到 Django Admin 中了。
我们可以通过继承 django_celery_results
的 TaskResultAdmin
并覆盖的方式来避免重复注册模型。
创建 Django Admin 界面的自定义模板
在 Django 项目中创建以下目录结构 templates/admin
并在 admin
文件夹内创建 celery_task_changelist.html
:
{% extends "admin/change_list.html" %} {% block result_list %} {{ block.super }} <script> function handleTask(action, task_id) { fetch(`/${action}/${task_id}/`, { method: 'POST', headers: { 'X-CSRFToken': document.querySelector('[name=csrfmiddlewaretoken]') .value, }, }).then((response) => { if (response.ok) { location.reload(); } else { alert('Action failed.'); } }); } </script> <div> <form method="post"> {% csrf_token %} {% for result in cl.result_list %} <button type="button" onclick="handleTask('retry', '{{ result.task_id }}')"> Retry </button> <button type="button" onclick="handleTask('terminate', '{{ result.task_id }}')" > Terminate </button> {% endfor %} </form> </div> {% endblock %}
确保自定义的模板路径正确。对于默认 Django 项目模板目录,模板文件夹应该在 myproject/templates/admin/celery_task_changelist.html
。
运行 Django 和 Celery
- 应用数据库迁移:
python manage.py migrate
- 启动 Django 服务器:
python manage.py runserver
- 启动 Celery worker:
celery -A myproject worker -l info
使用 Django Admin 管理 Celery 任务
打开浏览器并访问 http://127.0.0.1:8000/admin/
,Celery 任务将会在 Django admin 界面中显示,并且可以通过点击按钮来进行查询、查看、重试和终止等操作。
这样就完成了通过 Django Admin 界面管理 Celery 任务的完整步骤。如有需要可以进一步定制和优化界面和功能。
启动 Django 本地的 Celery Worker
为了在启动 Celery Worker 后向 Worker 发起任务,并在 Django Admin 界面演示查询、查看、重试和终止任务,可以按以下步骤进行操作:
创建 Celery 任务
在 myapp/tasks.py
中定义一些示例任务:
# myapp/tasks.py from celery import shared_task import time @shared_task def add(x, y): time.sleep(10) # 模拟长时间运行的任务 return x + y @shared_task def long_running_task(duration): time.sleep(duration) return f"Task completed after {duration} seconds"# myapp/tasks.py from celery import shared_task import time @shared_task def add(x, y): time.sleep(10) # 模拟长时间运行的任务 return x + y @shared_task def long_running_task(duration): time.sleep(duration) return f"Task completed after {duration} seconds"
创建触发任务的视图
为了便于演示,可以创建一些视图来触发这些任务。更新 urls.py
和 views.py
编程文件。
- 在
myapp/views.py
中:
# myapp/views.py from django.http import jsonResponse from myapp.tasks import add, long_running_task def trigger_add_task(request): add.delay(3, 4) return JsonResponse({'status': android'Task add (3, 4) triggered'}) def trigger_long_running_task(request): long_running_task.delay(30) # 任务运行30秒 return JsonResponse({'status': 'Long running task for 30 seconds triggered'})
- 在
myapp/urls.py
中:
# myapp/urls.py from django.urls import path from .views import trigger_add_task, trigger_long_running_task urlpatterns = [ path('trigger-add-task/', trigger_add_task, name='trigger-add-task'), path('trigger-long-task/', trigger_long_running_task, name='trigger-long-task'), ]
- 在
myproject/urls.py
中:
# myproject/urls.py from django.contrib import admin from django.urls import path, include urlpatterns = [ path(http://www.devze.com'admin/', admin.site.urls), path('tasks/', include('myapp.urls')), ]
更新 Celery 配置
确保 settings.py
中配置了 Celery:
# myproject/settings.py CELERY_BROKER_URL = 'redis://localhost:6379/0' # 使用 Redis 作为示例,可以根据需求更改 CELERY_RESULT_BACKEND = 'django-db'
启动 Celery Worker 和 Django 服务器
确保已经启动了 Redis 服务:
redis-server
然后分别启动 Django 服务器和 Celery Worker:
# 启动 Django 服务器 python manage.py runserver # 启动 Celery pythonWorker celery -A myproject worker -l info
触发任务并在 Django Admin 界面中查看
打开浏览器并访问以下 URL 以触发任务:
http://127.0.0.1:8000/tasks/trigger-add-task/
- 触发增加任务http://127.0.0.1:8000/tasks/trigger-long-task/
- 触发长时间运行任务
通过这些 URL 触发 Celery 任务。然后可以通过 Django Admin 界面进行查询、查看、重试和终止这些任务。
在 Django Admin 界面查看任务状态
打开浏览器并访问 http://127.0.0.1:8000/admin/
,登陆 Django Admin 界面,导航到 Task Results
部分。应该能看到适当的任务列表,并通过之前在自定义 TaskResultAdmin
中定义的操作进行重试和终止任务。
这些步骤能够通过 Django 和 Celery 演示触发任务并在 Django Admin 界面中进行查询、查看、重试、终止等操作。
启动远程的 Celery Worker
要通过 Django Admin 管理和监控在远程服务器上单独运行且由独立代码仓库维护的 Celery Worker,需要配置和协调多个独立的系统。
安装 Celery Worker
在远程服务器上,创建一个独立的项目(假设名字为 worker_project
),并安装所需的依赖:
# 在远程服务器上 python -m venv venv source venv/bin/activate pip install celery redis
配置 Celery Worker
- 在
worker_project
内部配置 Celery(worker_project/celery.py
):
from __future__ import absolute_import, jsunicode_literals import os from celery import Celery # 设置 Django 的配置模块 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'worker_project.settings') app = Celery('worker_project') app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print(f'Request: {self.request!r}')
- 在
worker_project/settings.py
中配置 Celery:
CELERY_BROKER_URL = 'redis://your_redis_server:6379/0' # 替换为实际的 Redis 地址 CELERY_RESULT_BACKEND = 'redis://your_redis_server:6379/0'
定义任务
创建一些测试任务(worker_project/tasks.py
):
from celery import shared_task import time @shared_task def add(x, y): time.sleep(10) # 模拟长时间运行的任务 return x + y @shared_task def long_running_task(duration): time.sleep(duration) return f"Task completed after {duration} seconds"
启动 Celery Worker
celery -A worker_project worker -l info
启动和测试
启动本地 Django 服务器:
python manage.py runserver
确保远程服务器上的 Celery Worker 已经在运行。
触发任务并在 Django Admin 中查看:
- 访问
http://127.0.0.1:8000/tasks/trigger-add-task/
- 触发增加任务 - 访问
http://127.0.0.1:8000/tasks/trigger-long-task/
- 触发长时间运行任务
通过 http://127.0.0.1:8000/admin/
登录 Django Admin 界面,导航到 Task Results
部分,您应该能看到这些任务并管理它们(如重试和终止)。
以上配置实现了在本地的 Django 项目中通过 Django Admin 管理和监控在远程服务器上单独运行的 Celery Worker,并通过 Redis 进行通信。这种架构可以在实际生产环境中更好地分离职责并提高系统的健壮性和扩展性。
使用 Django Admin 管理 Flask 启动的 Celery Worker 的常见问题
在使用 Flask App 启动远程 Celery Worker,并在 Django Admin 对这些 Worker 进行监控和管理时,可能会遇到诸如 Django Admin 界面没有显示 Celery Worker 任务和任务执行结果的问题,可能有以下几个原因:
- 结果后端配置错误:确保 Flask 和 Django 使用相同的结果后端(result backend)。
- Django 配置错误:确保 Django 已正确配置 Celery 结果后端。
- Flask 应用没有保存结果:确保 Flask 的 Celery 配置没有禁用结果保存功能。
要修复这个问题,请按以下步骤检查和修正设置:
检查 Flask 应用的 Celery 配置
确保 Flask 应用中的 Celery 配置了正确的结果后端,并且没有禁用任务结果的存储。例如:
celery_app = Celery( configs.celery.name, task_cls=FlaskTask, broker=app.config["CELERY_BROKER_URL"], backend=app.config["CELERY_BACKEND"], # 确保配置了结果后端 task_ignore_result=False, # 确保不忽略任务结果 ) celery_app.conf.update( result_backend=app.config["CELERY_RESULT_BACKEND"], # 确保配置了结果后端 broker_connection_retry_on_startup=True, ) # 确保没有不必要的配置禁用结果存储
检查 Django 的 Celery 配置
在 settings.py
中,确保定义了 Celery 结果后端,并且配置与 Flask 中的配置一致:
# Celery 配置 CELERY_BROKER_URL = 'redis://localhost:6379/0' # 替换为实际的 Broker URL CELERY_RESULT_BACKEND = 'django-db' # 使用 Django 数据库作为结果后端 CELERY_CACHE_BACKEND = 'django-cache' CELERY_RESULT_PERSISTENT = True # 安装的应用程序 INSTALLED_APPS = [ # 其他应用 'django_celery_results', 'django_celery_beat', ] # 其他配置
同步数据库
确保 Django 数据库与 Celery 结果模型一致:
python manage.py migrate django_celery_results python manage.py migrate django_celery_beat
确保 Django Admin 中注册了相关模型
确保在 admin.py
中注册了 django_celery_results
和 django_celery_beat
的模型,以便在 Admin 界面中查看:
from django.contrib import admin from django_celery_results.models import TaskResult from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule admin.site.register(TaskResult) admin.site.register(PeriodicTask) admin.site.register(IntervalSchedule) admin.site.register(CrontabSchedule)
测试 Celery 任务
确保从 Flask 发送的 Celery 任务能正确存储结果:
@celery_app.task(bind=True) def debug_task(self, *args, **kwargs): print(f'Request: {self.request!r}') return 'Test Result'
在 Flask 应用中调用这个任务:
debug_task.delay()
然后检查 Django Admin 界面中的任务结果是否显示。
检查 Celery Worker 配置
确保 celery worker
是在一个共享的 Broker 和 Backend 上运行:
celery -A your_flask_app_name worker --loglevel=info
通过这些步骤,应该能确保在 Django Admin 界面中正确显示 Flask 应用中 Celery Worker 发起的任务和任务执行结果。
如果问题仍然存在,检查日志和配置是否有任何错误,并确保 Flask 和 Django 的所有 Celery 配置和数据库访问是有效且一致的。
总结
通过 Django Admin 管理 Celery Worker 任务是一种方便的方式,可以通过简单的配置和定制来实现任务的查询、查看、重试和终止等操作。
通过本文提供的步骤和示例,您可以轻松地在 Django 项目中集成 Celery Worker,并通过 Django Admin 界面对任务进行管理和监控。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。
精彩评论