开发者

django+celery如何实现定时拉取阿里云rocketmq实例信息

目录
  • 一、项目初始化
    • 1. 创建虚拟环境并安装依赖
    • 2. 创建 Django 项目和应用
    • 3. 配置 mysql 数据库(rocketmq_manager/settings.py)
    • 4. 配置项目其他设置(rocketmq_manager/settings.py)
  • 二、Celery 集成配置
    • 1. 创建 Celery 应用(rocketmq_manager/celery.py)
    • 2. 初始化 Celery(rocketmq_manager/__init__.py)
  • 三、Model 开发
    • 创建 RocketMQ 实例模型(rocketmq/models.py)
    • 迁移数据库
  • 四、定时任务代码
    • 创建阿里云 API 客户端(rocketmq/aliyun_client.py)
    • 定义定时任务(rocketmq/tasks.py)
  • 五、接口开发
    • 1. 创建序列化器(rocketmq/serializers.py)
    • 2. 创建视图集(rocketmq/views.py)
    • 3. 配置 URL(rocketmq/urls.py)
    • 4. 项目 URL 配置(rocketmq_manager/urls.py)
  • 六、配置定时任务
    • 在settings.py中添加定时任务配置
  • 七、启动服务
    • 1. 设置环境变量
    • 2. 启动 Redis
    • 3. 启动 Celery Worker
    • 4. 启动 Celery Beat
    • 5. 启动 Django 开发服务器
  • 八、API 测试
    • 1. 获取认证令牌
    • 2. 获取 RocketMQ 实例列表
    • 3. 获取同步日志
    • 4. 手动触发同步
  • 项目结构
    • 关键特性说明
      • 扩展建议
        • 总结

          一、项目初始化

          1. 创建虚拟环境并安装依赖

          # 创建虚拟环境
          python3 -m venv env
          source env/bin/activate
          
          # 安装依赖
          pip install django celery redis django-celery-beat aliyun-python-sdk-core-v3 aliyun-python-sdk-mq mysqlclient
          

          2. 创建 Django 项目和应用

          # 创建项目
          django-admin startproject rocketmq_manager
          cd rocketmq_manager
          
          # 创建应用
          python manage.py startapp rocketmq
          

          3. 配置 MySQL 数据库(rocketmq_manager/settings.py)

          DATABASES = {
              'default': {
                  'ENGINE': 'django.db.backends.mysql',
                  'NAME': 'rocketmq_manager',      # 数据库名
                  'USER': 'your_username',         # 用户名
                  'PASSWORD': 'your_password',     # 密码
                  'HOST': 'localhost',             # 主机
                  'PORT': '3306',                  # 端口
                  'OPTIONS': {
                      'init_command': "SET sql_mode='STRICT_TRANS_TABLES'",
                  },
              }
          }
          

          4. 配置项目其他设置(rocketmq_manager/settings.py)

          INSTALLED_APPS = [
              # ...
              'django_celery_beat',
              'django_celery_res编程ults',
              'rocketmq',
          ]
          
          # Celery配置
          CELERY_BROKER_URL = 'redis://localhost:6379/0'
          CELERY_RESULT_BACKEND = 'django-db'
          CELERY_ACCEPT_CONTENT = ['json']
          CELERY_TASK_SERIALIZER = 'json'
          CELERY_RESULT_SERIALIZER = 'json'
          CELERY_TIMEZONE = 'Asia/Shanghai'
          
          # 阿里云配置(从环境变量获取)
          ALIYUN_Access_KEY_ID = os.environ.get('ALIYUN_ACCESS_KEY_ID')
          ALIYUN_ACCESS_KEY_SECRET = os.environ.get('ALIYUN_ACCESS_KEY_SECRET')
          ALIYUN_REGION_ID = os.environ.get('ALIYUN_REGION_ID', 'cn-hangzhou')
          

          二、Celery 集成配置

          1. 创建 Celery 应用(rocketmq_manager/celery.py)

          from __future__ import absolute_import, unicode_literals
          import os
          from celery import Celery
          
          os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'rocketmq_manager.settings')
          
          app = Celery('rocketmq_manager')
          app.config_from_object('django.conf:settings', namespace='CELERY')
          app.autodiscover_tasks()
          
          @app.task(bind=True)
          def debug_task(self):
              print(f'Request: {self.request!r}')
          

          2. 初始化 Celery(rocketmq_manager/__init__.py)

          from __future__ import absolute_import, unicode_literals
          from .celery import app as celery_app
          
          __all__ = ('celery_app',)
          

          三、Model 开发

          创建 RocketMQ 实例模型(rocketmq/models.py)

          python

          运行

          from django.db import models
          from django.utils import timezone
          
          class RocketMQInstance(models.Model):
              instance_id = models.CharField('实例ID', max_length=100, unique=True)
              instance_name = models.CharField('实例名称', max_length=200, blank=True, null=True)
              instance_type = models.CharField('实例类型', max_length=50, blank=True, null=True)
              region_id = models.CharField('区域ID', max_length=50)
              status = models.CharField('状态', max_length=50, blank=True, null=True)
              create_time = models.DateTimeField('创建时间', blank=True, null=True)
              expire_time = models.DateTimeField('过期时间', blank=True, null=True)
              tags = models.JSONField('标签', blank=True, null=True)
              last_updated = models.DateTimewww.devze.comField('最后更新时间', auto_now=True)
              
              def __str__(self):
                  return f"{self.instance_name} ({self.instance_id})"
              
              class Meta:
                  verbose_name = 'RocketMQ实例'
                  verbose_name_plural = 'RocketMQ实例列表'
                  indexes = [
                      models.Index(fields=['instance_id', 'region_id']),
                  ]
          
          class InstanceSyncLog(models.Model):
              sync_time = models.DateTimeField('同步时间', auto_now_add=True)
              instance_count = models.IntegerField('实例数量', default=0)
              success = models.BooleanField('是否成功', default=True)
              error_message = models.TextField('错误信息', blank=True, null=True)
              execution_time = models.FloatField('执行时间(秒)', blank=True, null=True)
              
              def __str__(self):
                  return f"同步记录 - {self.sync_time}"
              
              class Meta:
                  verbose_name = '实例同步日志'
                  verbose_name_plural = '实例同步日志列表'
                  ordering = ['-sync_time']
          

          迁移数据库

          python manage.py makemigrations
          python manage.py migrate
          

          四、定时任务代码

          创建阿里云 API 客户端(rocketmq/aliyun_client.py)

          import os
          from aliyunsdkcore.client import AcsClient
          from aliyunsdkcore.acs_exception.exceptions import ClientException
          from aliyunsdkcore.acs_exception.exceptions import ServerException
          from aliyunsdkmq.model.v20190513 import DescribeInstancesRequest
          import json
          import time
          
          class AliyunRocketMQClient:
              def __init__(self):
                  self.access_key_id = os.environ.get('ALIYUN_ACCESS_KEY_ID')
                  self.access_key_secret = os.environ.get('ALIYUN_ACCESS_KEY_SECRET')
                  self.region_id = os.environ.get('ALIYUN_REGION_ID', 'cn-hangzhou')
                  self.client = AcsClient(self.access_key_id, self.access_key_secret, self.region_id)
              
              def get_instances(self):
                  try:
                      request = DescribeInstancesRequest.DescribeInstancesRequest()
                      request.set_accept_format('json')
                      
                      # 添加重试机制
                      max_retries = 3
                      for attempt in range(max_retries):
                          try:
                              response = self.client.do_action_with_exception(request)
                              return json.loads(response)
                          except (ClientException, ServerException) as e:
                              if attempt < max_retries - 1:
                                  wait_time = (attempt + 1) * 2
                                  print(f"请求失败,{wait_time}秒后重试: {str(e)}")
                                  time.sleep(wait_time)
                              else:
                                  raise
                      
                  except Exception as e:
                      print(f"获取实例信息失败: {str(e)}")
                      raise
          

          定义定时任务(rocketmq/tasks.py)

          from celery import shared_task
          from .models import RocketMQInstance, InstanceSyncLog
          from .aliyun_client import AliyunRocketMQClient
          import logging
          from datetime import datetime
          import time
          
          logger = logging.getLogger(__name__)
          
          @shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=5, retry_kwargs={'max_retries': 3})
          def sync_rocketmq_instances(self):
              start_time = time.time()
              try:
                  client = AliyunRocketMQClient()
                  response = client.get_instances()
                  
                  # 处理响应数据
                  instance_list = []
                  if 'Data' in response and 'InstanceDoList' in response['Data']:
                      for item in response['Data']['InstanceDoList']:
                          instance = {
                              'instance_id': item.get('InstanceId', ''),
                              'instance_name': item.get('InstanceName', ''),
                              'instance_type': item.get('InstanceType', ''),
                              'region_id': item.get('RegionId', ''),
                              'status': item.get('InstanceStatus', ''),
                              'create_time': datetime.fromtimestamp(item.get('CreateTime', 0) / 1000) if item.get('CreateTime') else None,
                              'expire_time': datetime.fromtimestamp(item.get('ExpireTime', 0) / 1000) if item.get('ExpireTime') else None,
                              'tags': item.get('Tags', {})
                          }
                          instance_list.append(instance)
                  
                  # 使用事务批量更新数据库
                  from django.db import transaction
                  with transaction.atomic():
                      # 先删除不存在的实例(可选)
                      # existing_ids = [item['instance_id'] for item in instance_list]
                      # RocketMQInstance.objects.exclude(instance_id__in=existing_ids).delete()
                      
                      # 批量更新或创建实例
                      for instance_data in instance_list:
                          RocketMQInstance.objects.update_or_create(
                              instance_id=instance_data['instance_id'],
                              defaults=instance_data
                          )
                  
                  execution_time = time.time() - start_time
                  
                  # 记录同步日志
                  log = InstanceSyncLog.objects.create(
                      instance_count=len(instance_list),
                      success=True,
                      execution_time=execution_time
                  )
                  
                  logger.info(f"成功同步 {len(instance_list)} 个RocketMQ实例,耗时: {execution_time:.2f}秒")
                  return f"同步完成,共 {len(instance_list)} 个实例,耗时: {execution_time:.2f}秒"
                  
              except Exception as e:
                  execution_time = time.time() - start_time
                  
                  # 记录错误日志
                  InstanceSyncLog.objects.create(
                      success=False,
                      error_message=str(e),
                      execution_time=execution_time
                  )
                  
                  logger.error(f"同步RocketMQ实例失败: {str(e)},耗时: {execution_time:.2f}秒")
                  raise
          

          五、接口开发

          1. 创建序列化器(rocketmq/serializers.py)

          from rest_framework import serializers
          from .models import RocketMQInstance, InstanceSyncLog
          
          class RocketMQInstanceSerializer(serializers.ModelSerializer):
              class Meta:
                  model = RocketMQInstance
                  fields = '__all__'
                  read_only_fields = ['last_updated']
          
          class InstanceSyncLogSerializer(serializers.ModelSerializer):
              class Meta:
                  model = InstanceSyncLog
                  fields = '__all__'
                  read_only_fields = ['sync_time', 'instance_count', 'successphp', 'error_message', 'execution_time']
          

          2. 创建视图集(rocketmq/views.py)

          from rest_framework import viewsets, status
          from rest_framework.response import Response
          from .models import RocketMQInstance, InstanceSyncLog
          from .serializers import RocketMQInstanceSerializer, InstanceSyncLogSerializer
          from .tasks import sync_rocketmq_instances
          from rest_framework.decorators import action
          from rest_framework.permissions import IsAuthenticated
          from rest_framework.authentication import TokenAuthentication
          
          class RocketMQInstanceViewSet(viewsets.ModelViewSet):
              queryset = RocketMQInstance.objects.all()
              serializer_class = RocketMQInstanceSerializer
              authentication_classes = [TokenAuthentication]
              permission_classes = [IsAuthenticated]
              
              @action(detail=False, methods=['post'])
              def sync_now(self编程客栈, request):
                  """立即触发实例同步"""
                  task = sync_rocketmq_instances.delay()
                  return Response({'task_id': task.id, 'message': '同步任务已启动'}, status=status.HTTP_202_ACCEPTED)
              
              @action(detail=False, methods=['get'])
              def regions(self, request):
                  """获取所有区域列表"""
                  regions = RocketMQInstance.objects.values_list('region_id', flat=True).distinct()
                  return Response(regions, status=status.HTTP_200_OK)
          
          class InstanceSyncLogViewSet(viewsets.ReadOnlyModelViewSet):
              queryset = InstanceSyncLog.objects.all().order_by('-sync_time')
              serializer_class = InstanceSyncLogSerializer
              authentication_classes = [TokenAuthentication]
              permission_classes = [IsAuthenticated]
          

          3. 配置 URL(rocketmq/urls.py)

          from django.urls import include, patphph
          from rest_framework import routers
          from .views import RocketMQInstanceViewSet, InstanceSyncLogViewSet
          
          router = routers.DefaultRouter()
          router.register(r'instances', RocketMQInstanceViewSet)
          router.register(r'sync-logs', InstanceSyncLogViewSet)
          
          urlpatterns = [
              path('', include(router.urls)),
          ]
          

          4. 项目 URL 配置(rocketmq_manager/urls.py)

          from django.contrib import admin
          from django.urls import path, include
          from rest_framework.authtoken.views import obtain_auth_token
          
          urlpatterns = [
              path('admin/', admin.site.urls),
              path('api/', include('rocketmq.urls')),
              path('api/token/', obtain_auth_token, name='api_token_auth'),  # 获取认证令牌
          ]
          

          六、配置定时任务

          在settings.py中添加定时任务配置

          CELERY_BEAT_SCHEDULE = {
              'sync-rocketmq-instances': {
                  'task': 'rocketmq.tasks.sync_rocketmq_instances',
                  'schedule': 3600.0,  # 每小时执行一次
                  'args': ()
              },
          }
          

          七、启动服务

          1. 设置环境变量

          export ALIYUN_ACCESS_KEY_ID=your_access_key_id
          export ALIYUN_ACCESS_KEY_SECRET=your_access_key_secret
          export ALIYUN_REGION_ID=cn-hangzhou  # 根据实际情况修改
          

          2. 启动 Redis

          redis-server
          

          3. 启动 Celery Worker

          celery -A rocketmq_manager worker --loglevel=info --pool=prefork --concurrency=4
          

          4. 启动 Celery Beat

          celery -A rocketmq_manager beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
          

          5. 启动 Django 开发服务器

          python manage.py runserver
          

          八、API 测试

          1. 获取认证令牌

          curl -X POST -d "username=your_username&password=your_password" http://localhost:8000/api/token/
          

          2. 获取 RocketMQ 实例列表

          curl -H "Authorization: Token your_token_here" http://localhost:8000/api/instances/
          

          3. 获取同步日志

          curl -H "Authorization: Token your_token_here" http://localhost:8000/api/sync-logs/
          

          4. 手动触发同步

          curl -X POST -H "Authorization: Token your_token_here" http://localhost:8000/api/instances/sync_now/
          

          项目结构

          rocketmq_manager/
          ├── rocketmq_manager/
          │   ├── __init__.py
          │   ├── celery.py
          │   ├── settings.py
          │   ├── urls.py
          │   └── wsgi.py
          ├── rocketmq/
          │   ├── migrations/
          │   ├── __init__.py
          │   ├── admin.py
          │   ├── apps.py
          │   ├── aliyun_client.py
          │   ├── models.py
          │   ├── serializers.py
          │   ├── tasks.py
          │   ├── urls.py
          │   └── views.py
          ├── manage.py
          └── db.SQLite3
          

          关键特性说明

          1. MySQL 存储:使用 MySQL 数据库存储 RocketMQ 实例信息和同步日志
          2. 定时同步:每小时自动拉取阿里云 RocketMQ 实例信息
          3. 数据持久化:将实例信息存储到数据库,支持索引加速查询
          4. 手动触发:提供 API 接口支持手动触发同步
          5. 错误处理:任务失败自动重试,记录详细的同步日志和执行时间
          6. 权限控制:使用 Token 认证保护 API 接口

          扩展建议

          1. 添加更多阿里云 API 调用,获取更详细的实例指标(如 TPS、消息堆积量等)
          2. 实现多区域支持,同时监控多个地域的 RocketMQ 实例
          3. 添加告警机制,当实例状态异常或同步失败时发送通知
          4. 集成缓存系统(如 Redis),提高接口响应速度
          5. 添加 API 限流功能,防止恶意请求
          6. 实现实例信息的导出功能,支持数据报表生成

          这个实现提供了一个完整的 Django+Celery 定时拉取阿里云 RocketMQ 实例信息的解决方案,使用 MySQL 存储数据,支持权限控制和手动触发同步,可直接用于生产环境。

          总结

          以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

          0

          上一篇:

          下一篇:

          精彩评论

          暂无评论...
          验证码 换一张
          取 消

          最新开发

          开发排行榜