
十年前选 ActiveMQ,五年前换 Kafka,现在 AI 时代来了——你的消息队列,该升维了。
"烽火连三月,家书抵万金。" —— 杜甫《春望》
老李所在的电商平台,每年双十一流量峰值 30 万 TPS,订单系统、库存系统、通知系统之间的同步调用链路一旦超时,就是金钱损失。他们用 Kafka 做流处理、用 RabbitMQ 做业务解耦,两套中间件、两套运维团队、两份故障预案——这就是"消息队列双轨制"的真实痛点。
而现在,AI Agent 正在成为系统里的"新角色":LLM 推理是秒级任务,但结果通知、任务编排、异步回调却没有统一的异步通道。老李需要一个能同时搞定金融级事务消息、流式日志、AI 任务编排的统一消息平台。
Apache RocketMQ 5.x,就是这个答案。
"工欲善其事,必先利其器。" —— 《论语·卫灵公》
Apache RocketMQ 是阿里开源、捐献给 Apache 基金会的分布式消息与流处理平台,当前最新稳定版为 5.3.3(2025-05-06),同期发布了 5.4.0(2025-12-24,引入优先级消息和 RocksDB 支持)。
与 4.x 的本质区别:5.x 引入了 Proxy 层(gRPC + HTTP 双协议)、Pop 消费模式、无状态 Broker 部署,真正实现了云原生化。
功能 | 说明 | 适用场景 |
|---|---|---|
普通消息 | 基础 Pub/Sub,高吞吐 | 日志采集、行为埋点 |
顺序消息 | 分区有序,全局有序 | 订单状态流转、账务流水 |
延迟/定时消息 | 支持任意时间精度 | 支付超时取消、活动提醒 |
事务消息 | 两阶段提交,最终一致 | 跨服务分布式事务 |
批量消息 | 单次最大 4MB | 日志批量上报 |
过滤消息 | Tag/SQL92 属性过滤 | 多租户隔离消费 |
优先级消息 | 5.4.0 新增,支持消息优先级 | AI 任务队列调度 |
消息轨迹 | 全链路追踪 | 合规审计、问题排查 |
ACL 2.0 | 精细化权限控制 | 企业多团队管控 |
流式消费 | 类 Kafka Consumer | 实时数据管道 |
"登高而招,臂非加长也,而见者远。" —— 荀子《劝学》

小结:5.x Proxy 层是关键创新——生产者/消费者只与 Proxy 通信,Broker 内部实现对客户端透明,扩缩容不影响业务。
"法者,治之端也。" —— 荀子《君道》
RocketMQ 使用Apache License 2.0。
权利 / 限制 | 个人用途 | 商业企业用途 |
|---|---|---|
免费使用、修改 | ✅ 完全自由 | ✅ 可商用,无需付费 |
二次分发修改版 | ✅ 允许 | ✅ 允许,需保留原始版权声明 |
私有化部署 | ✅ 不必开源 | ✅ 不强制开源改动 |
专利授权 | ✅ 贡献者授权免费使用 | ✅ 同上 |
商标使用 | ⚠️ 不得使用 Apache/RocketMQ 商标 | ⚠️ 同左 |
阿里云 RocketMQ | 💰 云托管版收费 | 💰 按量付费,与 OSS 版功能有差异 |
结论:Apache 2.0 是对商业最友好的开源协议之一。企业可以放心基于 RocketMQ 做产品,只需保留 LICENSE 文件即可。与 LGPL(受侵染)、GPL(必须开源)相比,Apache 2.0 没有"传染性"。
"纸上得来终觉浅,绝知此事要躬行。" —— 陆游
services:
namesrv:
image: apache/rocketmq:5.3.2
container_name: rmq-namesrv
ports:
- "9876:9876"
command: sh mqnamesrv
environment:
JAVA_OPT_EXT: "-Xms256m -Xmx256m"
networks:
- rocketmq
broker:
image: apache/rocketmq:5.3.2
container_name: rmq-broker
depends_on:
- namesrv
ports:
- "10909:10909"
- "10911:10911"
- "10912:10912"
environment:
NAMESRV_ADDR: "rmq-namesrv:9876"
JAVA_OPT_EXT: "-Xms512m -Xmx512m"
command: sh mqbroker -n rmq-namesrv:9876 -c /home/rocketmq/broker.conf
volumes:
- ./broker.conf:/home/rocketmq/broker.conf:ro
networks:
- rocketmq
proxy:
image: apache/rocketmq:5.3.2
container_name: rmq-proxy
depends_on:
- broker
- namesrv
ports:
- "18080:8080"
- "18081:8081"
environment:
NAMESRV_ADDR: "rmq-namesrv:9876"
JAVA_OPT_EXT: "-Xms256m -Xmx256m"
command: sh mqproxy
restart: on-failure
networks:
- rocketmq
dashboard:
image: apacherocketmq/rocketmq-dashboard:latest
container_name: rmq-dashboard
depends_on:
- namesrv
ports:
- "9999:8080"
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmq-namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
networks:
- rocketmq
networks:
rocketmq:
driver: bridge# 启动全部服务
docker compose up -d
# 查看服务状态
docker compose ps
# 查看 NameServer 日志
docker logs rmq-namesrv | grep "boot success"
# 验证 Broker 注册
docker exec rmq-broker sh mqadmin clusterList -n rmq-namesrv:9876访问 http://localhost:9999,可以看到以下功能页面:
┌─────────────────────────────────────────────────────┐
│ RocketMQ Dashboard 登录 │
├────────┬────────────────────────────────────────────┤
│ 驾驶舱 │ 集群概览 / TPS曲线 / 消息堆积趋势 │
│ 集群 │ Broker节点列表 / 内存/磁盘使用率 │
│ 主题 │ Topic管理 / 订阅关系 / 消息查询 │
│ 消费者 │ ConsumerGroup / 消费进度 / 积压告警 │
│ 生产者 │ ProducerGroup / 发送TPS统计 │
│ 消息 │ MessageId查询 / 消息轨迹 / 重发 │
└────────┴────────────────────────────────────────────┘
📸截图说明:Dashboard 主页展示 Broker 节点状态(绿色=在线),驾驶舱页面实时显示入队 TPS 和出队 TPS 折线图,消费者页面可以看到每个 ConsumerGroup 的消息积压数量(lag),这是运维最常盯的指标。
"博学之,审问之,慎思之,明辨之,笃行之。" —— 《中庸》
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.18</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>rocketmq:
name-server: 127.0.0.1:9876
producer:
group: demo-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 2场景设计:用户下单 → 异步发送订单确认短信通知
// 生产者:OrderController.java
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/ POST /order/create?userId=1001&amount=299.00 /
@PostMapping("/create")
public String createOrder(String userId, BigDecimal amount) {
OrderMessage msg = new OrderMessage(userId, amount, "ORDER\_" + System.currentTimeMillis());
// 发送普通消息
rocketMQTemplate.convertAndSend("ORDER\_NOTIFY\_TOPIC", msg);
return "订单创建成功,通知已发送: " + msg.getOrderId();
}
}
// 消费者:OrderNotifyConsumer.java
@Component
@RocketMQMessageListener(
topic = "ORDER\_NOTIFY\_TOPIC",
consumerGroup = "order-notify-consumer"
)
public class OrderNotifyConsumer implements RocketMQListener<OrderMessage> {
@Override
public void onMessage(OrderMessage message) {
// 模拟发送短信
System.out.printf("[短信通知] 用户 %s,您的订单 %s 已创建,金额 %.2f 元%n",
message.getUserId(), message.getOrderId(), message.getAmount());
}
}验证:
curl -X POST "http://localhost:8080/order/create?userId=1001&amount=299.00"
# 控制台输出:[短信通知] 用户 1001,您的订单 ORDER\_xxx 已创建,金额 299.00 元场景设计:订单状态必须按 创建→支付→发货→完成 顺序处理
// 发送顺序消息(按 orderId 哈希分区)
@PostMapping("/order/status-flow")
public String orderStatusFlow(String orderId) {
String[] statuses = {"CREATED", "PAID", "SHIPPED", "COMPLETED"};
for (String status : statuses) {
// 第三个参数是 hashKey,相同 orderId 进同一队列(有序)
rocketMQTemplate.syncSendOrderly("ORDER\_STATUS\_TOPIC",
new OrderStatusMsg(orderId, status), orderId);
}
return "顺序消息发送完成";
}
// 消费者:顺序消费
@Component
@RocketMQMessageListener(
topic = "ORDER\_STATUS\_TOPIC",
consumerGroup = "order-status-consumer",
consumeMode = ConsumeMode.ORDERLY // 关键:顺序消费模式
)
public class OrderStatusConsumer implements RocketMQListener<OrderStatusMsg> {
@Override
public void onMessage(OrderStatusMsg msg) {
System.out.printf("[状态流转] 订单 %s → %s%n", msg.getOrderId(), msg.getStatus());
}
}场景设计:下单后 30 分钟未支付,自动取消订单
// 下单时发送延迟消息(5s 用于测试,生产用 1800000ms)
@PostMapping("/order/with-timeout")
public String orderWithTimeout(String orderId) {
Message<String> message = MessageBuilder
.withPayload(orderId)
.build();
// delayMillis: 30分钟 = 1800000ms,测试用 10000ms
rocketMQTemplate.syncSend("ORDER\_TIMEOUT\_TOPIC", message, 3000, 3);
// RocketMQ 内置级别: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
return "订单 " + orderId + " 已创建,30分钟内请支付";
}
@Component
@RocketMQMessageListener(topic = "ORDER\_TIMEOUT\_TOPIC", consumerGroup = "order-timeout-consumer")
public class OrderTimeoutConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String orderId) {
System.out.printf("[超时取消] 订单 %s 超时未支付,执行自动取消%n", orderId);
// 业务:查支付状态,未支付则取消订单
}
}场景设计:扣减库存与创建订单,保证最终一致性
// 发送事务消息
@PostMapping("/order/transaction")
public String transactionOrder(String productId, Integer quantity) {
String orderId = "TXN\_" + System.currentTimeMillis();
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"ORDER\_TRANSACTION\_TOPIC",
MessageBuilder.withPayload(new TxOrderMsg(orderId, productId, quantity)).build(),
orderId // arg 传递给事务监听器
);
return "事务消息状态: " + result.getLocalTransactionState();
}
// 本地事务执行器
@Component
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
String orderId = (String) arg;
// 执行本地数据库操作(扣库存、创建订单)
boolean success = inventoryService.deduct(orderId);
return success ? COMMIT : ROLLBACK;
} catch (Exception e) {
return UNKNOWN; // 触发回查
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// Broker 回查本地事务状态(防止网络问题导致状态丢失)
TxOrderMsg payload = JSON.parseObject(msg.getPayload().toString(), TxOrderMsg.class);
boolean exists = orderService.existsOrder(payload.getOrderId());
return exists ? COMMIT : ROLLBACK;
}
}场景设计:VIP 用户订单走高优先级通道消费
// 发送带 Tag 的消息
public void sendWithTag(String userId, boolean isVip, String orderId) {
String tag = isVip ? "VIP" : "NORMAL";
rocketMQTemplate.convertAndSend("ORDER\_FILTER\_TOPIC:" + tag,
new OrderMessage(userId, orderId));
}
// VIP 订单消费者(只消费 VIP Tag)
@Component
@RocketMQMessageListener(
topic = "ORDER\_FILTER\_TOPIC",
consumerGroup = "vip-order-consumer",
selectorType = SelectorType.TAG,
selectorExpression = "VIP" // 只消费 VIP 标签
)
public class VipOrderConsumer implements RocketMQListener<OrderMessage> {
@Override
public void onMessage(OrderMessage msg) {
System.out.println("[VIP专属] 高优先级处理订单: " + msg.getOrderId());
}
}场景:一个可视化的消息发送测试页面
<!-- templates/mq-demo.html (Thymeleaf) -->
<!DOCTYPE html>
<html>
<head>
<title>RocketMQ Demo Console</title>
<style>
body { font-family: 'Courier New', monospace; background: #1a1a2e; color: #e0e0e0; padding: 20px; }
.card { background: #16213e; border: 1px solid #0f3460; border-radius: 8px; padding: 20px; margin: 10px 0; }
button { background: #e94560; color: white; border: none; padding: 8px 20px; border-radius: 4px; cursor: pointer; }
input, select { background: #0f3460; color: white; border: 1px solid #e94560; padding: 6px; border-radius: 4px; }
.log { background: #000; padding: 10px; height: 200px; overflow-y: auto; font-size: 12px; }
</style>
</head>
<body>
<h2>🚀 RocketMQ 5.x Feature Demo</h2>
<div class="card">
<h3>消息类型测试</h3>
<select id="msgType">
<option value="normal">普通消息</option>
<option value="delay">延迟消息(10s)</option>
<option value="order">顺序消息</option>
<option value="transaction">事务消息</option>
</select>
<input id="payload" placeholder="消息内容" value="Hello RocketMQ 5.x" />
<button onclick="sendMsg()">发送</button>
</div>
<div class="card">
<h3>消费日志</h3>
<div class="log" id="log"></div>
</div>
<script>
function sendMsg() {
const type = document.getElementById('msgType').value;
const payload = document.getElementById('payload').value;
fetch(`/demo/send?type=${type}&payload=${encodeURIComponent(payload)}`)
.then(r => r.text())
.then(result => {
document.getElementById('log').innerHTML +=
`<div style="color:#4ecca3">[${new Date().toLocaleTimeString()}] ${result}</div>`;
});
}
// 每 2s 刷新消费日志
setInterval(() => {
fetch('/demo/logs').then(r => r.json()).then(logs => {
if (logs.length > 0) {
const logDiv = document.getElementById('log');
logDiv.innerHTML += logs.map(l =>
`<div style="color:#ffd460">[消费] ${l}</div>`).join('');
}
});
}, 2000);
</script>
</body>
</html>"凡事预则立,不预则废。" —— 《礼记·中庸》

# 步骤1:每台机器拉取镜像
docker pull apache/rocketmq:5.3.2
# 步骤2:启动2个 NameServer(两台机器分别执行)
docker run -d --name rmq-ns \
-p 9876:9876 \
--restart always \
apache/rocketmq:5.3.2 sh mqnamesrv
# 步骤3:broker.conf(Dledger 配置关键项)
cat > /data/rocketmq/broker-a.conf << EOF
brokerClusterName=RaftCluster
brokerName=broker-a
namesrvAddr=10.0.0.5:9876;10.0.0.6:9876
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-10.0.0.1:40911;n1-10.0.0.2:40911;n2-10.0.0.3:40911
dLegerSelfId=n0
storePathRootDir=/data/rocketmq/store
EOF
# 步骤4:启动 Broker(三副本 Raft 至少需要3节点)
docker run -d --name rmq-broker-a \
-p 10911:10911 -p 40911:40911 \
-v /data/rocketmq/broker-a.conf:/opt/rocketmq/conf/broker.conf \
-v /data/rocketmq/store:/root/store \
-e NAMESRV\_ADDR="10.0.0.5:9876;10.0.0.6:9876" \
--restart always \
apache/rocketmq:5.3.2 sh mqbroker -c /opt/rocketmq/conf/broker.conf
# 步骤5:验证集群状态
docker exec rmq-broker-a sh mqadmin clusterList -n 10.0.0.5:9876问题现象 | 原因 | 解决方案 |
|---|---|---|
| Topic 未创建,autoCreate 未开启 | 加 |
Consumer 一直 WAIT,不消费 | IP 不通,Broker 返回的内网 IP 客户端无法访问 | 设置 |
消息堆积,消费 TPS 低 | 消费线程数不足 | 调高 |
事务消息回查超时 | 本地事务执行超时 | 检查数据库慢查询,优化本地事务逻辑 |
Dashboard 连不上 Broker | 防火墙/容器网络隔离 | 确保 9876/10911/10909 端口可访问 |
| Broker 磁盘满 | 清理磁盘或扩容,检查 |
顺序消息消费乱序 | 使用了 CONCURRENTLY 模式 | 改为 |
ACL 鉴权失败 | 5.3.3+ 强制使用 ACL 2.0 | 删除旧 ACL 1.0 配置,使用新格式重新配置 |
"千里之行,始于足下。" —— 老子《道德经》
AI Coding 时代,大模型推理是同步阻塞的,但任务编排、结果分发、多 Agent 协同必须是异步解耦的。RocketMQ 在 AI 场景中扮演以下角色:

AI Coding 实践建议:
rocketmq-spring-boot-starter:2.3.2"知彼知己,百战不殆。" —— 《孙子兵法》
维度 | RocketMQ 5.x | Kafka 3.x | RabbitMQ 3.x |
|---|---|---|---|
消息模型 | 队列+流 双模 | 流式日志 | AMQP 路由 |
事务消息 | ✅ 原生支持 | ❌ 无 | ⚠️ 有限支持 |
延迟消息 | ✅ 任意精度(5.x) | ❌ 无 | ✅ 插件支持 |
顺序消息 | ✅ 分区顺序+全局顺序 | ✅ 分区顺序 | ⚠️ 有限 |
消息查询 | ✅ MsgId+Key+时间范围 | ❌ 基本无 | ❌ 无 |
消息轨迹 | ✅ 全链路追踪 | ❌ 无 | ❌ 无 |
吞吐量 | 10万+ TPS | 百万+ TPS | 1-5万 TPS |
适合场景 | 业务消息+金融事务 | 日志/流计算 | 任务队列/RPC |
云原生 | ✅ K8s Operator | ✅ | ⚠️ 一般 |
学习曲线 | 中等 | 中等 | 较低 |
# Kafka(KRaft 模式,无需 ZooKeeper)
docker run -d --name kafka \
-p 9092:9092 \
-e KAFKA\_CFG\_NODE\_ID=0 \
-e KAFKA\_CFG\_PROCESS\_ROLES=controller,broker \
-e KAFKA\_CFG\_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA\_CFG\_CONTROLLER\_QUORUM\_VOTERS=0@localhost:9093 \
bitnami/kafka:3.7# RabbitMQ(含管理界面)
docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
-e RABBITMQ\_DEFAULT\_USER=admin \
-e RABBITMQ\_DEFAULT\_PASS=admin123 \
rabbitmq:3-management
# 访问:http://localhost:15672老李结论:如果你的系统强依赖事务消息、延迟消息、消息轨迹,选 RocketMQ;纯日志流处理选 Kafka;简单任务队列选 RabbitMQ。三者并非竞争关系,大厂通常混用。
"为学日益,为道日损。" —— 老子

结合 RocketMQ 使用场景,推荐以下 Claude Skill:
Skill | 理由 |
|---|---|
claude_code_terminal | 用 Claude Code 直接生成和调试 Docker Compose 配置、Java 代码,AI 辅助减少 80% 配置错误 |
claude_code_vscode | 在 VSCode 中让 Claude 实时补全 |
excel | 将 RocketMQ 监控指标导出为 Excel,做容量规划分析和告警阈值设定 |
强烈推荐 Claude Code:在 AI 时代,RocketMQ 的 Producer/Consumer 模板代码完全可以由 Claude Code 自动生成,只需告知:版本、消息类型、业务场景描述。典型 Prompt:
使用 rocketmq-spring-boot-starter:2.3.2,
生成一个处理电商订单超时取消的延迟消息 Consumer,
消息体包含 orderId 和 createTime,
消费时检查数据库支付状态后决定是否取消订单
阶段 | 关键决策 | 推荐选择 |
|---|---|---|
技术选型 | 业务消息 vs 日志流 | RocketMQ vs Kafka |
消息类型 | 普通/顺序/延迟/事务 | 按业务一致性要求选 |
单机验证 | Docker Compose | 5.3.2 含 Proxy+Dashboard |
高可用 | 3节点 Dledger Raft | 自动选主,无单点 |
监控运维 | Dashboard + Prometheus | 重点看 Lag 和磁盘 |
AI 集成 | 异步任务队列 | 优先级消息+事务消息 |
开发提效 | AI Coding | Claude Code 生成模板 |
协议合规 | Apache 2.0 | 商业友好,可放心使用 |
总结:RocketMQ 5.x 不只是消息队列,它是分布式系统的异步神经中枢。在 AI Agent 爆发的 2025 年,一个能同时搞定事务一致性和 AI 任务编排的消息平台,比你以为的更重要。部署一套、跑通 Demo、接入监控,三步走,今天就能上生产。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。