RocketMQ 作为阿里开源的分布式消息中间件,凭借高吞吐量、低延迟、高可靠性等特性,被广泛应用于分布式系统的异步通信、解耦、削峰填谷等场景。Spring Boot 作为主流的微服务开发框架,其自动配置机制能极大简化与第三方组件的整合过程。本文将从实战角度出发,详细讲解 Spring Boot 如何整合 RocketMQ,覆盖普通消息发送、消息消费、消息重试等核心流程,并提供完整的代码示例。
一、环境准备
1. 基础环境依赖
- JDK 8 及以上(RocketMQ 对 JDK 版本有一定要求,推荐 8/11)
- Maven 3.6+(项目构建工具)
- RocketMQ 4.9.7(本文使用的稳定版本,可根据需求选择最新版本)
- Spring Boot 2.7.15(与 RocketMQ Starter 适配的版本)
2. RocketMQ 服务部署
首先需要搭建 RocketMQ 服务环境,可选择本地单机部署或集群部署,本文以本地单机为例:
- 从 RocketMQ 官网 下载对应版本的安装包,解压到本地目录。
- 启动 NameServer:
# 进入 RocketMQ 解压目录的 bin 文件夹 cd rocketmq-all-4.9.7-bin-release/bin # 启动 NameServer(Windows 系统执行 mqnamesrv.cmd) nohup sh mqnamesrv & - 启动 Broker:
# 启动 Broker(Windows 系统执行 mqbroker.cmd) nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true &注:
autoCreateTopicEnable=true表示自动创建主题,方便测试,生产环境建议提前手动创建主题。
二、项目初始化与依赖配置
1. 创建 Spring Boot 项目
通过 Spring Initializr(https://start.spring.io/)创建一个 Spring Boot 项目,选择基础依赖(如 Spring Web),也可手动创建 Maven 项目并配置 pom.xml。
2. 引入 RocketMQ 依赖
在 pom.xml 中添加 RocketMQ Spring Boot Starter 依赖,注意版本适配(本文使用 2.2.3 版本,与 RocketMQ 4.9.7 适配):
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.15</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>spring-boot-rocketmq-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-boot-rocketmq-demo</name> <description>Spring Boot RocketMQ 整合示例</description> <properties> <java.version>1.8</java.version> <rocketmq-spring-boot-starter.version>2.2.3</rocketmq-spring-boot-starter.version> </properties> <dependencies> <!-- Spring Web 依赖,用于提供接口测试 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- RocketMQ Spring Boot Starter --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>${rocketmq-spring-boot-starter.version}</version> </dependency> <!-- 测试依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>3. 配置 RocketMQ 连接信息
在application.yml(或 application.properties)中配置 RocketMQ 的 NameServer 地址、生产者组等信息:
spring: application: name: spring-boot-rocketmq-demo # RocketMQ 配置 rocketmq: # NameServer 地址,多个地址用分号分隔 name-server: localhost:9876 # 生产者配置 producer: # 生产者组名,必须唯一 group: demo-producer-group # 发送消息的超时时间,默认 3000ms send-message-timeout: 3000 # 消息体最大长度,默认 4MB max-message-size: 4194304 # 压缩消息的阈值,默认 4KB compress-message-body-threshold: 4096 # 重试次数,默认 2 次 retry-times-when-send-failed: 2 # 异步发送失败时是否重试其他 Broker,默认 false retry-next-server: false三、消息发送:普通消息、同步 / 异步 / 单向发送
RocketMQ 支持同步发送、异步发送、单向发送三种消息发送方式,适用于不同的业务场景:
- 同步发送:发送后等待 Broker 响应,可靠性最高,适用于重要消息(如订单创建)。
- 异步发送:发送后不阻塞,通过回调函数处理响应,适用于需要高吞吐量且允许少量延迟的场景。
- 单向发送:发送后不等待响应,性能最高,可靠性最低,适用于日志收集等不重要消息。
1. 封装消息发送工具类
创建RocketMQProducerService类,注入RocketMQTemplate(由 RocketMQ Starter 提供的模板类,简化消息发送),实现三种发送方式:
package com.example.demo.service; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * RocketMQ 生产者服务 */ @Service public class RocketMQProducerService { // 注入 RocketMQ 模板类 @Resource private RocketMQTemplate rocketMQTemplate; /** * 同步发送消息 * @param topic 主题名(可携带标签,格式:topic:tag) * @param message 消息内容 * @return 发送结果 */ public SendResult sendSyncMessage(String topic, String message) { // 构建消息(可添加消息头,如自定义 key) Message<String> msg = MessageBuilder.withPayload(message) .setHeader(RocketMQHeaders.KEYS, "sync-key-" + System.currentTimeMillis()) .build(); // 同步发送 return rocketMQTemplate.syncSend(topic, msg); } /** * 异步发送消息 * @param topic 主题名 * @param message 消息内容 */ public void sendAsyncMessage(String topic, String message) { Message<String> msg = MessageBuilder.withPayload(message) .setHeader(RocketMQHeaders.KEYS, "async-key-" + System.currentTimeMillis()) .build(); // 异步发送,通过 SendCallback 处理回调 rocketMQTemplate.asyncSend(topic, msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 发送成功处理 System.out.println("异步发送消息成功:" + sendResult); } @Override public void onException(Throwable e) { // 发送失败处理 System.err.println("异步发送消息失败:" + e.getMessage()); } }); } /** * 单向发送消息(不关心发送结果) * @param topic 主题名 * @param message 消息内容 */ public void sendOneWayMessage(String topic, String message) { Message<String> msg = MessageBuilder.withPayload(message) .setHeader(RocketMQHeaders.KEYS, "oneway-key-" + System.currentTimeMillis()) .build(); // 单向发送 rocketMQTemplate.sendOneWay(topic, msg); } }2. 编写测试接口
创建MessageSendController控制器,提供 HTTP 接口测试消息发送:
package com.example.demo.controller; import org.apache.rocketmq.client.producer.SendResult; import com.example.demo.service.RocketMQProducerService; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * 消息发送测试控制器 */ @RestController @RequestMapping("/message") public class MessageSendController { @Resource private RocketMQProducerService rocketMQProducerService; // 测试同步发送 @GetMapping("/sync/send") public String sendSyncMessage(@RequestParam String msg) { // 主题名:demo-topic,标签:demo-tag(标签用于消息过滤) String topic = "demo-topic:demo-tag"; SendResult sendResult = rocketMQProducerService.sendSyncMessage(topic, msg); return "同步发送消息成功:" + sendResult; } // 测试异步发送 @GetMapping("/async/send") public String sendAsyncMessage(@RequestParam String msg) { String topic = "demo-topic:demo-tag"; rocketMQProducerService.sendAsyncMessage(topic, msg); return "异步发送消息请求已提交"; } // 测试单向发送 @GetMapping("/oneway/send") public String sendOneWayMessage(@RequestParam String msg) { String topic = "demo-topic:demo-tag"; rocketMQProducerService.sendOneWayMessage(topic, msg); return "单向发送消息完成"; } }四、消息消费:消费者配置与消息监听
RocketMQ 的消费端通过消息监听器监听指定主题的消息,Spring Boot 整合后可通过注解@RocketMQMessageListener快速实现消费逻辑。
1. 编写消息消费者
创建RocketMQConsumerService类,实现RocketMQListener接口,通过注解配置消费者组、监听的主题和标签:
package com.example.demo.consumer; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * RocketMQ 消费者服务 * 注解说明: * - consumerGroup:消费者组名,必须唯一 * - topic:监听的主题名 * - selectorExpression:标签表达式,* 表示所有标签,也可指定具体标签(如 demo-tag) * - messageModel:消息模式,CLUSTERING(集群模式)/BROADCASTING(广播模式),默认集群模式 * - consumeMode:消费模式,CONCURRENTLY(并发消费)/ORDERLY(顺序消费),默认并发消费 */ @Component @RocketMQMessageListener( consumerGroup = "demo-consumer-group", topic = "demo-topic", selectorExpression = "*", messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY ) public class RocketMQConsumerService implements RocketMQListener<String> { /** * 消息消费逻辑 * @param message 消息内容 */ @Override public void onMessage(String message) { System.out.println("接收到消息:" + message); // 模拟业务处理 // 注意:消费端抛出异常会触发消息重试 // handleBusiness(message); } /** * 模拟业务处理 * @param message 消息内容 */ private void handleBusiness(String message) { // 业务逻辑代码 } }2. 消费者核心配置说明
- 消费者组(consumerGroup):必须唯一,同一组的消费者共同消费主题的消息(集群模式下)。
- 消息模式(messageModel):
- CLUSTERING(集群模式):同组消费者分摊消费消息,一条消息仅被一个消费者消费,默认模式。
- BROADCASTING(广播模式):同组消费者都会消费同一条消息,适用于通知类消息(如配置更新)。
- 消费模式(consumeMode):
- CONCURRENTLY(并发消费):多线程并发消费,消费速度快,默认模式。
- ORDERLY(顺序消费):单线程消费,保证消息按顺序消费,适用于有序消息场景(如订单状态变更)。
- 标签表达式(selectorExpression):用于过滤消息,支持
*(所有标签)、tag1 || tag2(多个标签)等表达式。
五、消息重试:消费失败后的重试机制
在实际业务中,消息消费可能因网络异常、业务处理失败等原因失败,RocketMQ 提供了消费重试机制,确保消息被成功消费。
1. 重试机制原理
- 默认重试:当消费者消费消息时抛出异常,RocketMQ 会将消息重新放入队列,等待重试消费,默认重试次数为 16 次(每次重试的间隔时间逐渐增加:1s、5s、10s、30s、1min、2min、3min、4min、5min、6min、7min、8min、9min、10min、20min、30min)。
- 死信队列:当消息重试 16 次后仍消费失败,会被发送到死信队列(DLQ),死信队列的命名规则为:
%DLQ%+消费者组名,可通过消费死信队列的消息进行人工处理。
2. 自定义重试配置与异常处理
(1)配置消费重试次数
在application.yml中添加消费者的重试配置(也可通过注解属性配置):
# 消费者重试配置(可在注解中覆盖) rocketmq: consumer: # 消费线程数 consume-thread-max: 20 # 批量消费的最大消息数 consume-message-batch-max-size: 1 # 消费超时时间 consume-timeout: 15(2)手动控制重试:返回消费状态
上述示例中,消费者实现的是RocketMQListener接口,无法手动控制消费状态,若需要自定义重试逻辑,可实现RocketMQPushConsumerListener接口(或使用MessageListenerConcurrently),返回ConsumeConcurrentlyStatus枚举:
package com.example.demo.consumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.springframework.stereotype.Component; import java.util.List; /** * 自定义重试逻辑的消费者 */ @Component @RocketMQMessageListener( consumerGroup = "demo-consumer-group-retry", topic = "demo-topic", selectorExpression = "*" ) public class RetryRocketMQConsumerService implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener { @Override public void onMessage(MessageExt messageExt) { String message = new String(messageExt.getBody()); System.out.println("接收到消息(带重试逻辑):" + message + ",重试次数:" + messageExt.getReconsumeTimes()); try { // 模拟业务处理失败 int a = 1 / 0; // 业务处理成功,无需重试 } catch (Exception e) { System.err.println("消息消费失败:" + e.getMessage()); // 若重试次数超过 3 次,直接返回成功(不再重试),否则抛出异常触发重试 if (messageExt.getReconsumeTimes() >= 3) { System.out.println("消息重试次数已达上限,不再重试"); // 可将消息记录到数据库,后续人工处理 return; } // 抛出异常,触发重试 throw new RuntimeException("消费失败,触发重试"); } } @Override public void prepareStart(org.apache.rocketmq.client.consumer.DefaultMQPushConsumer consumer) { // 可自定义消费者配置,如设置重试次数 try { // 设置消费线程数 consumer.setConsumeThreadMax(20); // 设置消息监听器(若需要更细粒度的控制) consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (MessageExt msg : msgs) { onMessage(msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); } catch (MQClientException e) { throw new RuntimeException(e); } } }3. 死信队列处理
当消息进入死信队列后,可创建专门的消费者监听死信队列,进行人工干预处理:
package com.example.demo.consumer; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * 死信队列消费者 * 死信队列名称:%DLQ% + 原消费者组名 */ @Component @RocketMQMessageListener( consumerGroup = "dlq-consumer-group", topic = "%DLQ%demo-consumer-group", selectorExpression = "*" ) public class DlqRocketMQConsumerService implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("接收到死信队列消息:" + message); // 人工处理逻辑,如记录日志、通知运维、手动重试等 } }六、测试验证
1. 启动项目
运行 Spring Boot 项目的主类,确保 RocketMQ NameServer 和 Broker 已启动。
2. 发送消息测试
通过 HTTP 接口发送消息,例如:
- 同步发送:
http://localhost:8080/message/sync/send?msg=Hello RocketMQ Sync - 异步发送:
http://localhost:8080/message/async/send?msg=Hello RocketMQ Async - 单向发送:
http://localhost:8080/message/oneway/send?msg=Hello RocketMQ OneWay
3. 验证消费与重试
观察控制台输出,可看到消费者成功接收消息;若在消费端模拟业务异常(如除以 0),可看到消息重试的日志,重试次数达到上限后进入死信队列。
七、生产环境注意事项
- 主题与消费者组规划:提前手动创建主题(关闭自动创建),消费者组名需唯一且有明确的业务含义。
- 消息重试配置:根据业务场景调整重试次数和间隔,避免无效重试占用资源。
- 死信队列处理:建立死信队列的监控和处理机制,防止消息丢失。
- 消息幂等性:由于消息重试,消费端需保证幂等性(如通过消息 key 去重)。
- 监控与告警:接入 RocketMQ 监控平台(如 RocketMQ Dashboard),监控消息发送 / 消费情况,设置告警机制。
- 集群部署:生产环境中 RocketMQ 需采用集群部署,保证高可用。
八、总结
本文详细介绍了 Spring Boot 整合 RocketMQ 的全流程,包括环境搭建、依赖配置、消息发送(同步 / 异步 / 单向)、消息消费(集群 / 广播、并发 / 顺序)、消息重试与死信队列处理,并提供了完整的代码示例。通过本文的实战内容,你可以快速掌握 RocketMQ 在 Spring Boot 项目中的核心使用方式,并根据实际业务场景进行扩展和优化。RocketMQ 的功能远不止于此,后续还可深入学习顺序消息、事务消息、延迟消息等高级特性,进一步满足复杂的业务需求。