开发者

RocketMQ 消息Message的结构和使用方式详解

目录
  • RocketMQ 消息(Message)详解
    • 一、什么是 Message?
      • ✅ 定义:
    • 二、Message 的核心结构
      • 三、Message 各字段详解
        • 1.Topic(主题)
        • 2.Body(消息体)
        • 3.Tags(标签)
        • 4.Keys(消息键)
        • 5.Properties(属性)
        • 6.DelayTimeLevel(延迟级别)
      • 四、Message 的生命周期
        • 五、Message 的存储机制
          • 1.http://www.devze.comCommitLog
          • 2.ConsumeQueue
          • 3.IndexFile
        • 六、Message 的发送方式回顾
          • 七、最佳实践与注意事http://www.devze.com
            • 八、常见问题排查
              • ✅ 总结:Message 核心要点

              RocketMQ 消息(Message)详解

              在 Apache RocketMQ 中,消息(Message) 是数据传输的最小单元,是生产者与消费者之间通信的“载体”。理解 Message 的结构、属性、生命周期和使用方式,是掌握 RocketMQ 的核心基础。

              推荐阅读:深入理解Apache RocketMQ 中Message 消息的核心概念

              一、什么是 Message?

              ✅ 定义:

              Mes编程sage 是 RocketMQ 中封装实际业务数据的对象,包含消息体(Body)和一系列元数据(如 Topic、Tag、Key、Properties 等),用于在生产者与消费者之间传递信息。

              类比:就像一封信,信纸是内容(Body),信封上写着收件人(Topic)、标签(Tag)、编号(Key)等信息。

              二、Message 的核心结构

              一个 Message 对象主要由以下几个部分组成:

              字段类型是否必填说明
              TopicString✅ 必填消息所属的主题,用于路由和分类
              Bodybyte[]✅ 必填消息的实际内容,通常为序列化后的 jsON、Protobuf 等
              TagsString❌ 可选子分类标签,用于消费者过滤(如 CREATE, CANCEL
              KeysString❌ 可选消息的唯一键或业务主键(如订单号),用于排查、索引
              Flagint❌ 可选消息标志位(如是否压缩)
              DelayTimeLevelint❌ 可选延迟消息级别(1~18),实现定时投递
              PropertiesMap<String, String>❌ 可选自定义属性,RocketMQ 内部也使用它存储系统属性

              三、Message 各字段详解

              1.Topic(主题)

              • 消息的逻辑分类,决定消息被发送到哪个队列。
              • 必须提前创建或允许自动创建。
              • 示例:ORDER_TOPIC, USER_LOG_TOPIC
              new Message("ORDER_TOPIC", ...);
              

              2.Body(消息体)

              • 实际传输的数据,必须是字节数组。
              • 通常通过 JSON、Protobuf、Hessian 等序列化框架编码。
              String content = "{\"orderId\":\"1001\",\"userId\":10086}";
              Message msg = new Message(topic, tag, content.getBytes(StandardCharsets.UTF_8));
              

              ⚠️ 注意:

              • 单条消息大小默认最大 4MB(可配置)
              • 过大消息会影响性能,建议拆分或使用外部存储(如上传文件后传 URL)

              3.Tags(标签)

              • 用于对同一 Topic 下的消息进行二次分类
              • 消费者可通过 subscribe("Topic", "TagA || TagB") 进行过滤。
              // 发送
              new Message("ORDER_TOPIC", "CREATE", "创建订单".getBytes());
              new Message("ORDER_TOPIC", "PAY", "支付完成".getBytes());
              // 订阅 CREATE 类型消息
              consumer.subscribe("ORDER_TOPIC", "CREATE");

              ✅ 优势:轻量级过滤,避免消费者接收无关消息。

              ⚠️ 注意:Tags 是字符串匹配,不支持正则(但支持 * 通配和 || 多选)

              4.Keys(消息键)

              • 为消息设置唯一标识或业务主键(如订单号、用户ID)。
              • 支持通过 mqadmin queryMsgByKey 命令查询消息。
              • 支持索引,便于问题排查。
              Message msg = new Message(...);
              msg.setKeys("ORDER_20240501001");
              

              ✅ 建议:关键业务消息务必设置 Keys,便于追踪。

              5.Properties(属性)

              • 键值对形式的扩展字段,可用于:
                • 存储自定义上下文(如 traceId、tenantId)
                • RocketMQ 内部使用(如 RECONSUME_TIMEDELAYTRAN_MSG
              msg.putUserProperty("traceId", "abc123");
              msg.putUserProperty("source", "web");
              

              ⚠️ 注意:系统属性以 PREFIX_SYS_PROP 开头,不要冲突。

              6.DelayTimeLevel(延迟级别)

              • 设置消息延迟投递时间,实现“定时任务”功能。
              • 取值范围:1~18,对应不同延迟时间:
              级别时间
              11s
              25s
              310s
              430s
              51m
              62m
              73m
              84m
              95m
              106m
              117m
              128m
              139m
              1410m
              1520m
              1630m
              171h
              182h
              Message msg = new Message("DELAY_TOPIC", "TAG", "延迟消息".getBytes());
              msg.setDelayTimeLevel(5); // 延迟1分钟
              producer.send(msg);
              

              ⚠️ 注意:延迟消息不保证精确时间,存在轻微误差。

              四、Message 的生命周期

              1. 生产者创建 Message 对象
                 ↓
              2. 发送到 Broker(写入 CommitLog)
                 ↓
              3. 构建 ConsumeQueue 和 IndexFile
                 ↓
              4. 消费者拉取消息(根据 Topic + Queue)
                 ↓
              5. 处理成功 → 提交 Offset
                 ↓
              6. 消息过期(默认 72 小时)→ 被删除
              

              ✅ 消息是持久化存储的,即使消费者未上线,消息也不会丢失。

              五、Message 的存储机制

              虽然 Message 是逻辑对象,但在 Broker 端有严格的物理存储结构:

              1.CommitLog

              • 所有消息按到达顺序追加写入 CommitLog 文件(顺序写,高性能)
              • 每个消息包含:Topic、Queue、Body、Properties 等完整信息

              2.ConsumeQueue

              • 每个 Topic 的每个 MessageQueue 对应一个 ConsumeQueue
              • 存储消息的逻辑偏移量、大小、物理位置,用于快速定位消息
              ConsumeQueue/{Topic}/{QueueId}/
                 ├── 00000000000000000000
                 └── ...
              

              3.IndexFile

              • 可选索引文件,支持通过 Keys 或时间范围 查询消息
              • 用于排查问题(如“查找某个订单的消息”)
              IndexFile/index_1714567890000
              

              六、Message 的发送方式回顾

              方式说明
              同步发送阻塞等待结果,适用于关键消息
              异步发送回调通知结果,高吞吐场景
              单向发送不关心结果,日志类消息
              事务消息半消息 + 本地事务 + 提编程客栈交/回滚

              所有方式发送的都是 Message 对象。

              七、最佳实践与注意事项

              实践说明
              ✅ 设置 Topic 和 Tag合理分类,便于管理和过滤
              ✅ 关键消息设置 Keys便于通过 mqadmin 查询
              ✅ 控制 Body 大小≤ 4MB,避免影响性能
              ✅ 使用 UTF-8 编码防止乱码
              ✅ 避免空 Body可能导致异常
              ✅ 合理使用延迟消息替代部分定时任务,但不要滥用
              ✅ 自定义属性用 putUserProperty避免覆盖系统属性

              八、常见问题排查

              问题原因解决方案
              MessageExt is null拉取超时或无消息正常现象,重试即可
              msg put message to store error消息过大或磁盘满检查大小限制和磁盘空间
              延迟消息未按时投递时间误差或 Broker 压力大接受轻微延迟,或使用外部调度系统
              通过 Key 查不到消息IndexFile 未生成或已过期检查 messageIndexEnable 配置
              消息重复网络重试、Rebalance消费者做幂等处理

              ✅ 总结:Message 核心要点

              维度说明
              角色消息传输的基本单元
              组成Topic + Body + Tags + Keys + Properties + Delay
              大小限制默认 ≤ 4MB
              存储方式顺序写 CommitLog,索引通过 ConsumeQueue 和 IndexFile
              可查询性支持按javascript Key、时间、Offset 查询
              扩展性支持自定义属性,灵活传递上下文
              高级功能支持延迟、事务、顺序消息

              一句话总结:

              Message 是 RocketMQ 的“数据包” —— 它不仅是业务数据的载体,更是路由、过滤、追踪、延迟、事务等功能的基础。

              设计好 Message 的结构与属性,才能让消息系统真正高效、可靠、易维护。

              掌握 Message,你就掌握了 RocketMQ 的“语言”。

              到此这篇关于RocketMQ 消息Message的结构和使用方式详解的文章就介绍到这了,更多相关RocketMQ 消息Message内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

              0

              上一篇:

              下一篇:

              精彩评论

              暂无评论...
              验证码 换一张
              取 消

              最新开发

              开发排行榜