浅析Python如何实现Celery任务队列系统
目录
- 系统要求
- 系统架构
- 配置说明
- 功能模块
- 1. 基础运算任务
- 2. 文本处理任务
- 3. 系统监控任务
- 4. 报告生成任务
- 定时任务配置
- 使用说明
- 1. 启动系统
- 2. 系统监控
- 错误处理
- 注意事项
- 代码示例
系统要求
python 3.x
Redis 服务器
依赖包:
- celery==5.3.6
- redis==5.0.1
系统架构
系统主要由以下组件构成:
- 任务定义模块 (tasks.py):包含所有可执行的任务定义
- 主程序模块 (main.py):负责任务的调度和监控
- Redis 服务器:作为消息代理(Broker)和结果后端(hQXyBIZdoGResult Backend)
配置说明
Celery 配置
broker_url = 'redis://:123456@127.0.0.1:6379/1' result_backend = 'redis://:123456@127.0.0.1:6379/1'
主要配置项:
- 任务序列化:jsON
- 时区:Asia/Shanghai
- 工作进程数:1
功能模块
1. 基础运算任务
add(x, y): 加法运算
multiply(x, y): 乘法运算
chain_calculation(numbers): 链式计算(求和、平均值js、最大值、最小值)
2. 文本处理任务
process_text(text): 文本处理(大写转换、长度统计、单词计数)
3. 系统监控任务
system_monitor(): 每5秒执行一次,监控系统状态
- CPU使用率
- 内存使用率
- 系统状态
4. 报告生成任务
generate_report(): 生成实时报告
daily_report(): 每天早上9点生成日报
workday_task(): 工作日每小时执行的任务
定时任务配置
系统包含以下定时任务:
- 系统监控:每5秒执行一次
- 日报生成:每天早上9点执行
- 工作日任务:工作日(周一至周五)9:00-18:00每小时执行
使用说明
1. 启动系统
确保Redis服务器已启动
启动Celery工作进程:
celery -A tasks worker --loglevel=info
启动Celery Beat进程(用于定时任务):
celery -A tasks beat
运行主程序:
python main.py
2. 系统监控
主程序运行后会自动执行以下操作:
- 实时显示系统监控数据
- 执行常规任务示例
- 通过按下 Ctrl+C 可以优雅退出程序
错误处理
系统实现了完整的错误处理机制:
- 任务执行错误捕获和日志记录
- 优雅的程序退出处理
- 自动重试机制
注意事项
Redis连接配置需要根据实际环境修改
确保系统时区设置正确
建议在生产环境中调整工作进程数
监控数据目前为模拟数据,实际使用时需要替换为真实的系统监控指标
代码示例
任务执行示例:
# 执行加法任务 result = add.delay(4, 6) print(f"任务ID: {result.id}") if result.ready(): print(f"结果: {result.get()}")
系统监控示例
main.py
# 执行系统监控 result = system_monitor.delay() data = result.get() print(f"CPU使用率: {data['cpu_usage']:.1f}%") print(f"内存使用率: {data['memory_usage']:.1f}%") ### 所有代码: ```python from tasks import add, multiply, process_text, generate_report, chain_calculation, system_monitor import time import json from datetime import datetime import threading import signal import sys from celery.result import AsyncResult # 全局变量控制程序运行 running = True def signal_handler(signum, frame): """处理退出信号""" global running print("\n收到退出信号,正在关闭程序...") running = False def monitor_system_task(): """监控系统任务的执行结果""" while running: try: # 执行系统监控任务 result = system_monitor.delay() # 等待结果(最多等待4秒) for _ in range(4): if result.ready(): data = result.get() if data: print(f"\n系统监控结果:") print(f"CPU使用率: {data['cpu_usage']:.1f}%") print(f"内存使用率: {data['memory_usage']:.1f}%") print(f"系统状态: {data['status']}") print("-" * 50) break time.sleep(1) # 等待剩余时间,确保大约每5秒执行一次 time.sleep(1) except Exception as e: print(f"监控任务出错: {e}") time.sleep(5) def run_regular_thttp://www.devze.comask(): """运行普通任务的示例""" while running: try: # 执行一些常规任务 print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 执行常规任务...") # 1. 执行加法任务 result = add.delay(4, 6) print(f"加法任务ID: {result.id}") if result.ready(): 编程客栈 print(f"4 + 6 = {result.get()}") # 2. 执行文本处理 text = f"这是一条测试消息 - {datetime.now()}" result = process_text.delay(text) print(f"文本处理任务ID: {result.id}") if result.ready(): print(json.dumps(result.get(), indent=2, ensure_ascii=False)) # 休眠5秒后继续下一轮 for _ in range(5): if not running: break time.sleep(1) except Exception as e: print(f"执行任务时出错: {e}") time.sleep(5) def main(): """主函数""" # 注册信号处理器(用于优雅退出) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) print("程序启动...") print("提示:按 Ctrl+C 可以优雅退出程序") print("\n=== 主程序开始运行 ===") print("- 系统监控每5秒执行一次") print("- 常规任务每5秒执行一次") print("- 所有任务的执行结果会实时显示") try: # 创建并启动监控线程 monitor_thread = threading.Thread(target=monitor_system_task) monitor_thread.daemon = True monitor_thread.start() # 创建并启动常规任务线程 task_thread = threading.Thread(target=run_regular_task) task_thread.daemon = True task_thread.start() # 主线程保持运行 while running: time.sleep(1) except KeyboardInterrupt: print("\n程序正在关闭...") finally: print("程序已退出。") if __name__ == "__main__": main()
tasks.py
from celery import Celery from celery.schedules import crontab import time from datetime import datetime import random # 创建 Celery 实例 app = Celery('tasks') # 配置 Celery app.conf.update( broker_url='redis://:123456@127.0.0.1:6379/1', result_backend='redis://:123456@127.0.0.1:6379/1', task_serializer='json', result_serializer='json', accept_content=['json'], timezone='Asia/Shanghai', enable_utc=True, worker_pool_restarts=True, worker_concurrency=1, ) # 配置定时任务 app.conf.beat_schedule = { # 每5秒执行一次系统监控 'monitor-every-5-seconds': { 'task': 'tasks.system_monitor', 'schedule': 5.0, # 每5秒执行一次 }, # 每天早上9点执行 'daily-morning-report': { 'task': 'tasks.daily_report', 'schedule': crontab(hour=9, minute=0), }, # 工作日每小时执行 'workday-hourly-task': { 'task': 'tasks.workday_task', 'schedule': crontab(hour='9-18', minute=0, day_of_week='1-5'), } } @app.task def add(x, y): """简单的加法任务""" time.sleep(1) return x + y @app.task def multiply(x, y): """乘法运算任务""" time.sleep(2) return x * y @app.task def process_text(text): """文本处理任务""" time.sleep(1) result = { 'original': text, 'upper': text.upper(), 'length': len(text), 'words': len(text.split()) } return result @app.task def generate_report(): """生成报告任务""" time.sleep(3) current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") data = { 'timestamp': current_time, 'temperature': random.uniform(20, 30), 'humidity': random.uniform(40, 80), 'status': random.choice(['正常', '警告', '错误']) } return data @app.task def chain_calculation(numbers): """链式计算任务""" time.sleep(2) result = sum(numbers) average = result / len(numbers) maximum = max(numbers) minimum = min(numbers) return { 'sum': result, 'average': average, 'max': maximum, 'min': minimum, 'count': len(numbers) } @app.task def system_monitor(): """每5秒执行一次的系统监控任务""" current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"\n[{current_time}] 每5秒执行一次的系统监控任务 执行系统监控...") # 模拟获取系统信息 data = { 'timestamp': current_time, 'cpu_usage': random.uniform(0, 100), 'memory_usage': random.uniform(0, 100), 'status': 'running' } # 打印监控信息 print(f"CPU使用率: {data['cpu_usage']:.1f}%") print(f"内存使用率: {data['memory_usage']:.1f}%") print(f"系统状态: {data['status']}") print("-" * 50) return data @app.task def daily_report(): """每天早上9点执行的日报任务""" current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"\n[{current_time}] 生成每日报告...") report = { 'report_type': '日报', 'generated_at': current_time, 'summary': '这是一个自动生成的日报示例', 'metrics': { 'total_tasks': random.randint(100, 1000), 'completed_tasks': random.randint(50, 500), 'success_rate': random.uniform(0.8, 1.0) } } print(f"报告类型: {report['report_type']}") print(f"生成时间: {report['generated_at']}") print(f"任务完成率: {report['metricsandroid']['success_rate']:.1%}") print("-" * 50) return report @app.task def workday_task(): """工作日每小时执行的任务""" current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"\n[{current_time}] 执行工作时间任务...") data = { 'task_type': '工作时间任务', 'executed_at': current_time, 'status': random.choice(['完成', '进行中', '计划中']), 'workload': random.randint(1, 100) } print(f"任务状态: {data['status']}") print(f"工作负载: {data['workload']}%") print("-" * 50) return data
以上就是浅析Python如何实现Celery任务队列系统的详细内容,更多关于Python Celery任务队列的资料请关注编程客栈(www.devze.com)其它相关文章!
精彩评论