开发者

浅析Python如何实现Celery任务队列系统

目录
  • 系统要求
  • 系统架构
  • 配置说明
  • 功能模块
    • 1. 基础运算任务
    • 2. 文本处理任务
    • 3. 系统监控任务
    • 4. 报告生成任务
  • 定时任务配置
    • 使用说明
      • 1. 启动系统
      • 2. 系统监控
    • 错误处理
      • 注意事项
        • 代码示例

          系统要求

          python 3.x

          Redis 服务器

          依赖包:

          • celery==5.3.6
          • redis==5.0.1

          系统架构

          系统主要由以下组件构成:

          • 任务定义模块 (tasks.py):包含所有可执行的任务定义
          • 主程序模块 (main.py):负责任务的调度和监控
          • Redis 服务器:作为消息代理(Broker)和结果后端(hQXyBIZdoGResult Backend)

          配置说明

          Celery 配置

          broker_url = 'redis://:123456@127.0.0.1:6379/1'
          result_backend = 'redis://:123456@127.0.0.1:6379/1'
          

          主要配置项:

          • 任务序列化:jsON
          • 时区:Asia/Shanghai
          • 工作进程数:1

          功能模块

          1. 基础运算任务

          add(x, y): 加法运算

          multiply(x, y): 乘法运算

          chain_calculation(numbers): 链式计算(求和、平均值js、最大值、最小值)

          2. 文本处理任务

          process_text(text): 文本处理(大写转换、长度统计、单词计数)

          3. 系统监控任务

          system_monitor(): 每5秒执行一次,监控系统状态

          • CPU使用率
          • 内存使用率
          • 系统状态

          4. 报告生成任务

          generate_report(): 生成实时报告

          daily_report(): 每天早上9点生成日报

          workday_task(): 工作日每小时执行的任务

          定时任务配置

          系统包含以下定时任务:

          • 系统监控:每5秒执行一次
          • 日报生成:每天早上9点执行
          • 工作日任务:工作日(周一至周五)9:00-18:00每小时执行

          使用说明

          1. 启动系统

          确保Redis服务器已启动

          启动Celery工作进程:

          celery -A tasks worker --loglevel=info
          

          启动Celery Beat进程(用于定时任务):

          celery -A tasks beat
          

          运行主程序:

          python main.py
          

          2. 系统监控

          主程序运行后会自动执行以下操作:

          • 实时显示系统监控数据
          • 执行常规任务示例
          • 通过按下 Ctrl+C 可以优雅退出程序

          错误处理

          系统实现了完整的错误处理机制:

          • 任务执行错误捕获和日志记录
          • 优雅的程序退出处理
          • 自动重试机制

          注意事项

          Redis连接配置需要根据实际环境修改

          确保系统时区设置正确

          建议在生产环境中调整工作进程数

          监控数据目前为模拟数据,实际使用时需要替换为真实的系统监控指标

          代码示例

          任务执行示例:

          # 执行加法任务
          result = add.delay(4, 6)
          print(f"任务ID: {result.id}")
          if result.ready():
              print(f"结果: {result.get()}")
          

          系统监控示例

          main.py

          # 执行系统监控
          result = system_monitor.delay()
          data = result.get()
          print(f"CPU使用率: {data['cpu_usage']:.1f}%")
          print(f"内存使用率: {data['memory_usage']:.1f}%")
          ### 所有代码:
          
          ```python
          from tasks import add, multiply, process_text, generate_report, chain_calculation, system_monitor
          import time
          import json
          from datetime import datetime
          import threading
          import signal
          import sys
          from celery.result import AsyncResult
          
          # 全局变量控制程序运行
          running = True
          
          def signal_handler(signum, frame):
              """处理退出信号"""
              global running
              print("\n收到退出信号,正在关闭程序...")
              running = False
          
          def monitor_system_task():
              """监控系统任务的执行结果"""
              while running:
                  try:
                      # 执行系统监控任务
                      result = system_monitor.delay()
                      
                      # 等待结果(最多等待4秒)
                      for _ in range(4):
                          if result.ready():
                              data = result.get()
                              if data:
                                  print(f"\n系统监控结果:")
                                  print(f"CPU使用率: {data['cpu_usage']:.1f}%")
                                  print(f"内存使用率: {data['memory_usage']:.1f}%")
                                  print(f"系统状态: {data['status']}")
                                  print("-" * 50)
                              break
                          time.sleep(1)
                      
                      # 等待剩余时间,确保大约每5秒执行一次
                      time.sleep(1)
                      
                  except Exception as e:
                      print(f"监控任务出错: {e}")
                      time.sleep(5)
          
          def run_regular_thttp://www.devze.comask():
              """运行普通任务的示例"""
              while running:
                  try:
                      # 执行一些常规任务
                      print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 执行常规任务...")
                      
                      # 1. 执行加法任务
                      result = add.delay(4, 6)
                      print(f"加法任务ID: {result.id}")
                      if result.ready():
                  编程客栈        print(f"4 + 6 = {result.get()}")
                      
                      # 2. 执行文本处理
                      text = f"这是一条测试消息 - {datetime.now()}"
                      result = process_text.delay(text)
                      print(f"文本处理任务ID: {result.id}")
                      if result.ready():
                          print(json.dumps(result.get(), indent=2, ensure_ascii=False))
                      
                      # 休眠5秒后继续下一轮
                      for _ in range(5):
                          if not running:
                              break
                          time.sleep(1)
                          
                  except Exception as e:
                      print(f"执行任务时出错: {e}")
                      time.sleep(5)
          
          def main():
              """主函数"""
              # 注册信号处理器(用于优雅退出)
              signal.signal(signal.SIGINT, signal_handler)
              signal.signal(signal.SIGTERM, signal_handler)
              
              print("程序启动...")
              print("提示:按 Ctrl+C 可以优雅退出程序")
              print("\n=== 主程序开始运行 ===")
              print("- 系统监控每5秒执行一次")
              print("- 常规任务每5秒执行一次")
              print("- 所有任务的执行结果会实时显示")
              
              try:
                  # 创建并启动监控线程
                  monitor_thread = threading.Thread(target=monitor_system_task)
                  monitor_thread.daemon = True
                  monitor_thread.start()
                  
                  # 创建并启动常规任务线程
                  task_thread = threading.Thread(target=run_regular_task)
                  task_thread.daemon = True
                  task_thread.start()
                  
                  # 主线程保持运行
                  while running:
                      time.sleep(1)
                      
              except KeyboardInterrupt:
                  print("\n程序正在关闭...")
              finally:
                  print("程序已退出。")
          
          if __name__ == "__main__":
              main()
          

          tasks.py

          from celery import Celery
          from celery.schedules import crontab
          import time
          from datetime import datetime
          import random
          
          # 创建 Celery 实例
          app = Celery('tasks')
          
          # 配置 Celery
          app.conf.update(
              broker_url='redis://:123456@127.0.0.1:6379/1',
              result_backend='redis://:123456@127.0.0.1:6379/1',
              task_serializer='json',
              result_serializer='json',
              accept_content=['json'],
              timezone='Asia/Shanghai',
              enable_utc=True,
              worker_pool_restarts=True,
              worker_concurrency=1,
          )
          
          # 配置定时任务
          app.conf.beat_schedule = {
              # 每5秒执行一次系统监控
              'monitor-every-5-seconds': {
                  'task': 'tasks.system_monitor',
                  'schedule': 5.0,  # 每5秒执行一次
              },
              # 每天早上9点执行
              'daily-morning-report': {
                  'task': 'tasks.daily_report',
                  'schedule': crontab(hour=9, minute=0),
              },
              # 工作日每小时执行
              'workday-hourly-task': {
                  'task': 'tasks.workday_task',
                  'schedule': crontab(hour='9-18', minute=0, day_of_week='1-5'),
              }
          }
          
          @app.task
          def add(x, y):
              """简单的加法任务"""
              time.sleep(1)
              return x + y
          
          @app.task
          def multiply(x, y):
              """乘法运算任务"""
              time.sleep(2)
              return x * y
          
          @app.task
          def process_text(text):
              """文本处理任务"""
              time.sleep(1)
              result = {
                  'original': text,
                  'upper': text.upper(),
                  'length': len(text),
                  'words': len(text.split())
              }
              return result
          
          @app.task
          def generate_report():
              """生成报告任务"""
              time.sleep(3)
              current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
              data = {
                  'timestamp': current_time,
                  'temperature': random.uniform(20, 30),
                  'humidity': random.uniform(40, 80),
                  'status': random.choice(['正常', '警告', '错误'])
              }
              return data
          
          @app.task
          def chain_calculation(numbers):
              """链式计算任务"""
              time.sleep(2)
              result = sum(numbers)
              average = result / len(numbers)
              maximum = max(numbers)
              minimum = min(numbers)
              return {
                  'sum': result,
                  'average': average,
                  'max': maximum,
                  'min': minimum,
                  'count': len(numbers)
              }
          
          @app.task
          def system_monitor():
              """每5秒执行一次的系统监控任务"""
          
              current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
              print(f"\n[{current_time}] 每5秒执行一次的系统监控任务 执行系统监控...")
              
              # 模拟获取系统信息
              data = {
                  'timestamp': current_time,
                  'cpu_usage': random.uniform(0, 100),
                  'memory_usage': random.uniform(0, 100),
                  'status': 'running'
              }
              
              # 打印监控信息
              print(f"CPU使用率: {data['cpu_usage']:.1f}%")
              print(f"内存使用率: {data['memory_usage']:.1f}%")
              print(f"系统状态: {data['status']}")
              print("-" * 50)
              
              return data
          
          @app.task
          def daily_report():
              """每天早上9点执行的日报任务"""
              current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
              print(f"\n[{current_time}] 生成每日报告...")
              
              report = {
                  'report_type': '日报',
                  'generated_at': current_time,
                  'summary': '这是一个自动生成的日报示例',
                  'metrics': {
                      'total_tasks': random.randint(100, 1000),
                      'completed_tasks': random.randint(50, 500),
                      'success_rate': random.uniform(0.8, 1.0)
                  }
              }
              
              print(f"报告类型: {report['report_type']}")
              print(f"生成时间: {report['generated_at']}")
              print(f"任务完成率: {report['metricsandroid']['success_rate']:.1%}")
              print("-" * 50)
              
              return report
          
          @app.task
          def workday_task():
              """工作日每小时执行的任务"""
              current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
              print(f"\n[{current_time}] 执行工作时间任务...")
              
              data = {
                  'task_type': '工作时间任务',
                  'executed_at': current_time,
                  'status': random.choice(['完成', '进行中', '计划中']),
                  'workload': random.randint(1, 100)
              }
              
              print(f"任务状态: {data['status']}")
              print(f"工作负载: {data['workload']}%")
              print("-" * 50)
              
              return data
          

          以上就是浅析Python如何实现Celery任务队列系统的详细内容,更多关于Python Celery任务队列的资料请关注编程客栈(www.devze.com)其它相关文章!

          0

          上一篇:

          下一篇:

          精彩评论

          暂无评论...
          验证码 换一张
          取 消

          最新开发

          开发排行榜