RocketMQ 消息Message的结构和使用方式详解
目录
- RocketMQ 消息(Message)详解
- 一、什么是 Message?
- ✅ 定义:
- 二、Message 的核心结构
- 三、Message 各字段详解
- 1.Topic(主题)
- 2.Body(消息体)
- 3.Tags(标签)
- 4.Keys(消息键)
- 5.Properties(属性)
- 6.DelayTimeLevel(延迟级别)
- 四、Message 的生命周期
- 五、Message 的存储机制
- 1.http://www.devze.comCommitLog
- 2.ConsumeQueue
- 3.IndexFile
- 六、Message 的发送方式回顾
- 七、最佳实践与注意事http://www.devze.com项
- 八、常见问题排查
- ✅ 总结:Message 核心要点
RocketMQ 消息(Message)详解
在 Apache RocketMQ 中,消息(Message) 是数据传输的最小单元,是生产者与消费者之间通信的“载体”。理解 Message
的结构、属性、生命周期和使用方式,是掌握 RocketMQ 的核心基础。
推荐阅读:深入理解Apache RocketMQ 中Message 消息的核心概念
一、什么是 Message?
✅ 定义:
Mes编程sage 是 RocketMQ 中封装实际业务数据的对象,包含消息体(Body)和一系列元数据(如 Topic、Tag、Key、Properties 等),用于在生产者与消费者之间传递信息。
类比:就像一封信,信纸是内容(Body),信封上写着收件人(Topic)、标签(Tag)、编号(Key)等信息。
二、Message 的核心结构
一个 Message
对象主要由以下几个部分组成:
字段 | 类型 | 是否必填 | 说明 |
---|---|---|---|
Topic | String | ✅ 必填 | 消息所属的主题,用于路由和分类 |
Body | byte[] | ✅ 必填 | 消息的实际内容,通常为序列化后的 jsON、Protobuf 等 |
Tags | String | ❌ 可选 | 子分类标签,用于消费者过滤(如 CREATE , CANCEL ) |
Keys | String | ❌ 可选 | 消息的唯一键或业务主键(如订单号),用于排查、索引 |
Flag | int | ❌ 可选 | 消息标志位(如是否压缩) |
DelayTimeLevel | int | ❌ 可选 | 延迟消息级别(1~18),实现定时投递 |
Properties | Map<String, String> | ❌ 可选 | 自定义属性,RocketMQ 内部也使用它存储系统属性 |
三、Message 各字段详解
1.Topic(主题)
- 消息的逻辑分类,决定消息被发送到哪个队列。
- 必须提前创建或允许自动创建。
- 示例:
ORDER_TOPIC
,USER_LOG_TOPIC
new Message("ORDER_TOPIC", ...);
2.Body(消息体)
- 实际传输的数据,必须是字节数组。
- 通常通过 JSON、Protobuf、Hessian 等序列化框架编码。
String content = "{\"orderId\":\"1001\",\"userId\":10086}"; Message msg = new Message(topic, tag, content.getBytes(StandardCharsets.UTF_8));
⚠️ 注意:
- 单条消息大小默认最大 4MB(可配置)
- 过大消息会影响性能,建议拆分或使用外部存储(如上传文件后传 URL)
3.Tags(标签)
- 用于对同一 Topic 下的消息进行二次分类。
- 消费者可通过
subscribe("Topic", "TagA || TagB")
进行过滤。
// 发送 new Message("ORDER_TOPIC", "CREATE", "创建订单".getBytes()); new Message("ORDER_TOPIC", "PAY", "支付完成".getBytes()); // 订阅 CREATE 类型消息 consumer.subscribe("ORDER_TOPIC", "CREATE");
✅ 优势:轻量级过滤,避免消费者接收无关消息。
⚠️ 注意:Tags 是字符串匹配,不支持正则(但支持
*
通配和||
多选)
4.Keys(消息键)
- 为消息设置唯一标识或业务主键(如订单号、用户ID)。
- 支持通过
mqadmin queryMsgByKey
命令查询消息。 - 支持索引,便于问题排查。
Message msg = new Message(...); msg.setKeys("ORDER_20240501001");
✅ 建议:关键业务消息务必设置 Keys,便于追踪。
5.Properties(属性)
- 键值对形式的扩展字段,可用于:
- 存储自定义上下文(如 traceId、tenantId)
- RocketMQ 内部使用(如
RECONSUME_TIME
、DELAY
、TRAN_MSG
)
msg.putUserProperty("traceId", "abc123"); msg.putUserProperty("source", "web");
⚠️ 注意:系统属性以
PREFIX_SYS_PROP
开头,不要冲突。
6.DelayTimeLevel(延迟级别)
- 设置消息延迟投递时间,实现“定时任务”功能。
- 取值范围:1~18,对应不同延迟时间:
级别 | 时间 |
---|---|
1 | 1s |
2 | 5s |
3 | 10s |
4 | 30s |
5 | 1m |
6 | 2m |
7 | 3m |
8 | 4m |
9 | 5m |
10 | 6m |
11 | 7m |
12 | 8m |
13 | 9m |
14 | 10m |
15 | 20m |
16 | 30m |
17 | 1h |
18 | 2h |
Message msg = new Message("DELAY_TOPIC", "TAG", "延迟消息".getBytes()); msg.setDelayTimeLevel(5); // 延迟1分钟 producer.send(msg);
⚠️ 注意:延迟消息不保证精确时间,存在轻微误差。
四、Message 的生命周期
1. 生产者创建 Message 对象 ↓ 2. 发送到 Broker(写入 CommitLog) ↓ 3. 构建 ConsumeQueue 和 IndexFile ↓ 4. 消费者拉取消息(根据 Topic + Queue) ↓ 5. 处理成功 → 提交 Offset ↓ 6. 消息过期(默认 72 小时)→ 被删除
✅ 消息是持久化存储的,即使消费者未上线,消息也不会丢失。
五、Message 的存储机制
虽然 Message 是逻辑对象,但在 Broker 端有严格的物理存储结构:
1.CommitLog
- 所有消息按到达顺序追加写入 CommitLog 文件(顺序写,高性能)
- 每个消息包含:Topic、Queue、Body、Properties 等完整信息
2.ConsumeQueue
- 每个 Topic 的每个 MessageQueue 对应一个 ConsumeQueue
- 存储消息的逻辑偏移量、大小、物理位置,用于快速定位消息
ConsumeQueue/{Topic}/{QueueId}/ ├── 00000000000000000000 └── ...
3.IndexFile
- 可选索引文件,支持通过 Keys 或时间范围 查询消息
- 用于排查问题(如“查找某个订单的消息”)
IndexFile/index_1714567890000
六、Message 的发送方式回顾
方式 | 说明 |
---|---|
同步发送 | 阻塞等待结果,适用于关键消息 |
异步发送 | 回调通知结果,高吞吐场景 |
单向发送 | 不关心结果,日志类消息 |
事务消息 | 半消息 + 本地事务 + 提编程客栈交/回滚 |
所有方式发送的都是
Message
对象。
七、最佳实践与注意事项
实践 | 说明 |
---|---|
✅ 设置 Topic 和 Tag | 合理分类,便于管理和过滤 |
✅ 关键消息设置 Keys | 便于通过 mqadmin 查询 |
✅ 控制 Body 大小 | ≤ 4MB,避免影响性能 |
✅ 使用 UTF-8 编码 | 防止乱码 |
✅ 避免空 Body | 可能导致异常 |
✅ 合理使用延迟消息 | 替代部分定时任务,但不要滥用 |
✅ 自定义属性用 putUserProperty | 避免覆盖系统属性 |
八、常见问题排查
问题 | 原因 | 解决方案 |
---|---|---|
MessageExt is null | 拉取超时或无消息 | 正常现象,重试即可 |
msg put message to store error | 消息过大或磁盘满 | 检查大小限制和磁盘空间 |
延迟消息未按时投递 | 时间误差或 Broker 压力大 | 接受轻微延迟,或使用外部调度系统 |
通过 Key 查不到消息 | IndexFile 未生成或已过期 | 检查 messageIndexEnable 配置 |
消息重复 | 网络重试、Rebalance | 消费者做幂等处理 |
✅ 总结:Message 核心要点
维度 | 说明 |
---|---|
角色 | 消息传输的基本单元 |
组成 | Topic + Body + Tags + Keys + Properties + Delay |
大小限制 | 默认 ≤ 4MB |
存储方式 | 顺序写 CommitLog,索引通过 ConsumeQueue 和 IndexFile |
可查询性 | 支持按javascript Key、时间、Offset 查询 |
扩展性 | 支持自定义属性,灵活传递上下文 |
高级功能 | 支持延迟、事务、顺序消息 |
一句话总结:
Message 是 RocketMQ 的“数据包” —— 它不仅是业务数据的载体,更是路由、过滤、追踪、延迟、事务等功能的基础。
设计好 Message 的结构与属性,才能让消息系统真正高效、可靠、易维护。
掌握 Message
,你就掌握了 RocketMQ 的“语言”。
到此这篇关于RocketMQ 消息Message的结构和使用方式详解的文章就介绍到这了,更多相关RocketMQ 消息Message内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!
精彩评论