news 2026/3/27 21:55:28

Spring Boot 整合 RocketMQ 实战:消息发送、消费、重试全流程代码示例

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Spring Boot 整合 RocketMQ 实战:消息发送、消费、重试全流程代码示例

RocketMQ 作为阿里开源的分布式消息中间件,凭借高吞吐量、低延迟、高可靠性等特性,被广泛应用于分布式系统的异步通信、解耦、削峰填谷等场景。Spring Boot 作为主流的微服务开发框架,其自动配置机制能极大简化与第三方组件的整合过程。本文将从实战角度出发,详细讲解 Spring Boot 如何整合 RocketMQ,覆盖普通消息发送、消息消费、消息重试等核心流程,并提供完整的代码示例。

一、环境准备

1. 基础环境依赖

  • JDK 8 及以上(RocketMQ 对 JDK 版本有一定要求,推荐 8/11)
  • Maven 3.6+(项目构建工具)
  • RocketMQ 4.9.7(本文使用的稳定版本,可根据需求选择最新版本)
  • Spring Boot 2.7.15(与 RocketMQ Starter 适配的版本)

2. RocketMQ 服务部署

首先需要搭建 RocketMQ 服务环境,可选择本地单机部署集群部署,本文以本地单机为例:

  1. 从 RocketMQ 官网 下载对应版本的安装包,解压到本地目录。
  2. 启动 NameServer:
    # 进入 RocketMQ 解压目录的 bin 文件夹 cd rocketmq-all-4.9.7-bin-release/bin # 启动 NameServer(Windows 系统执行 mqnamesrv.cmd) nohup sh mqnamesrv &
  3. 启动 Broker:
    # 启动 Broker(Windows 系统执行 mqbroker.cmd) nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true &

    注:autoCreateTopicEnable=true表示自动创建主题,方便测试,生产环境建议提前手动创建主题。

二、项目初始化与依赖配置

1. 创建 Spring Boot 项目

通过 Spring Initializr(https://start.spring.io/)创建一个 Spring Boot 项目,选择基础依赖(如 Spring Web),也可手动创建 Maven 项目并配置 pom.xml。

2. 引入 RocketMQ 依赖

在 pom.xml 中添加 RocketMQ Spring Boot Starter 依赖,注意版本适配(本文使用 2.2.3 版本,与 RocketMQ 4.9.7 适配):

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.15</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>spring-boot-rocketmq-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-boot-rocketmq-demo</name> <description>Spring Boot RocketMQ 整合示例</description> <properties> <java.version>1.8</java.version> <rocketmq-spring-boot-starter.version>2.2.3</rocketmq-spring-boot-starter.version> </properties> <dependencies> <!-- Spring Web 依赖,用于提供接口测试 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- RocketMQ Spring Boot Starter --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>${rocketmq-spring-boot-starter.version}</version> </dependency> <!-- 测试依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

3. 配置 RocketMQ 连接信息

application.yml(或 application.properties)中配置 RocketMQ 的 NameServer 地址、生产者组等信息:

spring: application: name: spring-boot-rocketmq-demo # RocketMQ 配置 rocketmq: # NameServer 地址,多个地址用分号分隔 name-server: localhost:9876 # 生产者配置 producer: # 生产者组名,必须唯一 group: demo-producer-group # 发送消息的超时时间,默认 3000ms send-message-timeout: 3000 # 消息体最大长度,默认 4MB max-message-size: 4194304 # 压缩消息的阈值,默认 4KB compress-message-body-threshold: 4096 # 重试次数,默认 2 次 retry-times-when-send-failed: 2 # 异步发送失败时是否重试其他 Broker,默认 false retry-next-server: false

三、消息发送:普通消息、同步 / 异步 / 单向发送

RocketMQ 支持同步发送异步发送单向发送三种消息发送方式,适用于不同的业务场景:

  • 同步发送:发送后等待 Broker 响应,可靠性最高,适用于重要消息(如订单创建)。
  • 异步发送:发送后不阻塞,通过回调函数处理响应,适用于需要高吞吐量且允许少量延迟的场景。
  • 单向发送:发送后不等待响应,性能最高,可靠性最低,适用于日志收集等不重要消息。

1. 封装消息发送工具类

创建RocketMQProducerService类,注入RocketMQTemplate(由 RocketMQ Starter 提供的模板类,简化消息发送),实现三种发送方式:

package com.example.demo.service; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * RocketMQ 生产者服务 */ @Service public class RocketMQProducerService { // 注入 RocketMQ 模板类 @Resource private RocketMQTemplate rocketMQTemplate; /** * 同步发送消息 * @param topic 主题名(可携带标签,格式:topic:tag) * @param message 消息内容 * @return 发送结果 */ public SendResult sendSyncMessage(String topic, String message) { // 构建消息(可添加消息头,如自定义 key) Message<String> msg = MessageBuilder.withPayload(message) .setHeader(RocketMQHeaders.KEYS, "sync-key-" + System.currentTimeMillis()) .build(); // 同步发送 return rocketMQTemplate.syncSend(topic, msg); } /** * 异步发送消息 * @param topic 主题名 * @param message 消息内容 */ public void sendAsyncMessage(String topic, String message) { Message<String> msg = MessageBuilder.withPayload(message) .setHeader(RocketMQHeaders.KEYS, "async-key-" + System.currentTimeMillis()) .build(); // 异步发送,通过 SendCallback 处理回调 rocketMQTemplate.asyncSend(topic, msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 发送成功处理 System.out.println("异步发送消息成功:" + sendResult); } @Override public void onException(Throwable e) { // 发送失败处理 System.err.println("异步发送消息失败:" + e.getMessage()); } }); } /** * 单向发送消息(不关心发送结果) * @param topic 主题名 * @param message 消息内容 */ public void sendOneWayMessage(String topic, String message) { Message<String> msg = MessageBuilder.withPayload(message) .setHeader(RocketMQHeaders.KEYS, "oneway-key-" + System.currentTimeMillis()) .build(); // 单向发送 rocketMQTemplate.sendOneWay(topic, msg); } }

2. 编写测试接口

创建MessageSendController控制器,提供 HTTP 接口测试消息发送:

package com.example.demo.controller; import org.apache.rocketmq.client.producer.SendResult; import com.example.demo.service.RocketMQProducerService; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * 消息发送测试控制器 */ @RestController @RequestMapping("/message") public class MessageSendController { @Resource private RocketMQProducerService rocketMQProducerService; // 测试同步发送 @GetMapping("/sync/send") public String sendSyncMessage(@RequestParam String msg) { // 主题名:demo-topic,标签:demo-tag(标签用于消息过滤) String topic = "demo-topic:demo-tag"; SendResult sendResult = rocketMQProducerService.sendSyncMessage(topic, msg); return "同步发送消息成功:" + sendResult; } // 测试异步发送 @GetMapping("/async/send") public String sendAsyncMessage(@RequestParam String msg) { String topic = "demo-topic:demo-tag"; rocketMQProducerService.sendAsyncMessage(topic, msg); return "异步发送消息请求已提交"; } // 测试单向发送 @GetMapping("/oneway/send") public String sendOneWayMessage(@RequestParam String msg) { String topic = "demo-topic:demo-tag"; rocketMQProducerService.sendOneWayMessage(topic, msg); return "单向发送消息完成"; } }

四、消息消费:消费者配置与消息监听

RocketMQ 的消费端通过消息监听器监听指定主题的消息,Spring Boot 整合后可通过注解@RocketMQMessageListener快速实现消费逻辑。

1. 编写消息消费者

创建RocketMQConsumerService类,实现RocketMQListener接口,通过注解配置消费者组、监听的主题和标签:

package com.example.demo.consumer; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * RocketMQ 消费者服务 * 注解说明: * - consumerGroup:消费者组名,必须唯一 * - topic:监听的主题名 * - selectorExpression:标签表达式,* 表示所有标签,也可指定具体标签(如 demo-tag) * - messageModel:消息模式,CLUSTERING(集群模式)/BROADCASTING(广播模式),默认集群模式 * - consumeMode:消费模式,CONCURRENTLY(并发消费)/ORDERLY(顺序消费),默认并发消费 */ @Component @RocketMQMessageListener( consumerGroup = "demo-consumer-group", topic = "demo-topic", selectorExpression = "*", messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY ) public class RocketMQConsumerService implements RocketMQListener<String> { /** * 消息消费逻辑 * @param message 消息内容 */ @Override public void onMessage(String message) { System.out.println("接收到消息:" + message); // 模拟业务处理 // 注意:消费端抛出异常会触发消息重试 // handleBusiness(message); } /** * 模拟业务处理 * @param message 消息内容 */ private void handleBusiness(String message) { // 业务逻辑代码 } }

2. 消费者核心配置说明

  • 消费者组(consumerGroup):必须唯一,同一组的消费者共同消费主题的消息(集群模式下)。
  • 消息模式(messageModel)
    • CLUSTERING(集群模式):同组消费者分摊消费消息,一条消息仅被一个消费者消费,默认模式。
    • BROADCASTING(广播模式):同组消费者都会消费同一条消息,适用于通知类消息(如配置更新)。
  • 消费模式(consumeMode)
    • CONCURRENTLY(并发消费):多线程并发消费,消费速度快,默认模式。
    • ORDERLY(顺序消费):单线程消费,保证消息按顺序消费,适用于有序消息场景(如订单状态变更)。
  • 标签表达式(selectorExpression):用于过滤消息,支持*(所有标签)、tag1 || tag2(多个标签)等表达式。

五、消息重试:消费失败后的重试机制

在实际业务中,消息消费可能因网络异常、业务处理失败等原因失败,RocketMQ 提供了消费重试机制,确保消息被成功消费。

1. 重试机制原理

  • 默认重试:当消费者消费消息时抛出异常,RocketMQ 会将消息重新放入队列,等待重试消费,默认重试次数为 16 次(每次重试的间隔时间逐渐增加:1s、5s、10s、30s、1min、2min、3min、4min、5min、6min、7min、8min、9min、10min、20min、30min)。
  • 死信队列:当消息重试 16 次后仍消费失败,会被发送到死信队列(DLQ),死信队列的命名规则为:%DLQ%+消费者组名,可通过消费死信队列的消息进行人工处理。

2. 自定义重试配置与异常处理

(1)配置消费重试次数

application.yml中添加消费者的重试配置(也可通过注解属性配置):

# 消费者重试配置(可在注解中覆盖) rocketmq: consumer: # 消费线程数 consume-thread-max: 20 # 批量消费的最大消息数 consume-message-batch-max-size: 1 # 消费超时时间 consume-timeout: 15
(2)手动控制重试:返回消费状态

上述示例中,消费者实现的是RocketMQListener接口,无法手动控制消费状态,若需要自定义重试逻辑,可实现RocketMQPushConsumerListener接口(或使用MessageListenerConcurrently),返回ConsumeConcurrentlyStatus枚举:

package com.example.demo.consumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.springframework.stereotype.Component; import java.util.List; /** * 自定义重试逻辑的消费者 */ @Component @RocketMQMessageListener( consumerGroup = "demo-consumer-group-retry", topic = "demo-topic", selectorExpression = "*" ) public class RetryRocketMQConsumerService implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener { @Override public void onMessage(MessageExt messageExt) { String message = new String(messageExt.getBody()); System.out.println("接收到消息(带重试逻辑):" + message + ",重试次数:" + messageExt.getReconsumeTimes()); try { // 模拟业务处理失败 int a = 1 / 0; // 业务处理成功,无需重试 } catch (Exception e) { System.err.println("消息消费失败:" + e.getMessage()); // 若重试次数超过 3 次,直接返回成功(不再重试),否则抛出异常触发重试 if (messageExt.getReconsumeTimes() >= 3) { System.out.println("消息重试次数已达上限,不再重试"); // 可将消息记录到数据库,后续人工处理 return; } // 抛出异常,触发重试 throw new RuntimeException("消费失败,触发重试"); } } @Override public void prepareStart(org.apache.rocketmq.client.consumer.DefaultMQPushConsumer consumer) { // 可自定义消费者配置,如设置重试次数 try { // 设置消费线程数 consumer.setConsumeThreadMax(20); // 设置消息监听器(若需要更细粒度的控制) consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (MessageExt msg : msgs) { onMessage(msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); } catch (MQClientException e) { throw new RuntimeException(e); } } }

3. 死信队列处理

当消息进入死信队列后,可创建专门的消费者监听死信队列,进行人工干预处理:

package com.example.demo.consumer; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * 死信队列消费者 * 死信队列名称:%DLQ% + 原消费者组名 */ @Component @RocketMQMessageListener( consumerGroup = "dlq-consumer-group", topic = "%DLQ%demo-consumer-group", selectorExpression = "*" ) public class DlqRocketMQConsumerService implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("接收到死信队列消息:" + message); // 人工处理逻辑,如记录日志、通知运维、手动重试等 } }

六、测试验证

1. 启动项目

运行 Spring Boot 项目的主类,确保 RocketMQ NameServer 和 Broker 已启动。

2. 发送消息测试

通过 HTTP 接口发送消息,例如:

  • 同步发送:http://localhost:8080/message/sync/send?msg=Hello RocketMQ Sync
  • 异步发送:http://localhost:8080/message/async/send?msg=Hello RocketMQ Async
  • 单向发送:http://localhost:8080/message/oneway/send?msg=Hello RocketMQ OneWay

3. 验证消费与重试

观察控制台输出,可看到消费者成功接收消息;若在消费端模拟业务异常(如除以 0),可看到消息重试的日志,重试次数达到上限后进入死信队列。

七、生产环境注意事项

  1. 主题与消费者组规划:提前手动创建主题(关闭自动创建),消费者组名需唯一且有明确的业务含义。
  2. 消息重试配置:根据业务场景调整重试次数和间隔,避免无效重试占用资源。
  3. 死信队列处理:建立死信队列的监控和处理机制,防止消息丢失。
  4. 消息幂等性:由于消息重试,消费端需保证幂等性(如通过消息 key 去重)。
  5. 监控与告警:接入 RocketMQ 监控平台(如 RocketMQ Dashboard),监控消息发送 / 消费情况,设置告警机制。
  6. 集群部署:生产环境中 RocketMQ 需采用集群部署,保证高可用。

八、总结

本文详细介绍了 Spring Boot 整合 RocketMQ 的全流程,包括环境搭建、依赖配置、消息发送(同步 / 异步 / 单向)、消息消费(集群 / 广播、并发 / 顺序)、消息重试与死信队列处理,并提供了完整的代码示例。通过本文的实战内容,你可以快速掌握 RocketMQ 在 Spring Boot 项目中的核心使用方式,并根据实际业务场景进行扩展和优化。RocketMQ 的功能远不止于此,后续还可深入学习顺序消息、事务消息、延迟消息等高级特性,进一步满足复杂的业务需求。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/25 2:11:02

RocketMQ 集群部署指南:单 Master、多 Master 多 Slave 架构搭建与配置优化

在分布式系统中&#xff0c;消息中间件扮演着“通信枢纽”的关键角色&#xff0c;负责实现服务间的解耦、异步通信与流量削峰。RocketMQ 作为阿里开源的高性能消息中间件&#xff0c;凭借其高吞吐量、低延迟、高可靠性等特性&#xff0c;被广泛应用于各类大型分布式系统中。集群…

作者头像 李华
网站建设 2026/3/26 5:47:02

RAG教程看了 100 篇,为什么还是做不好?

RAG教程满天飞。随便搜一下&#xff0c;“手把手教你搭建RAG”、“10分钟跑通RAG”、“RAG最佳实践”……看起来很简单对吧&#xff1f; 但真正上手就会发现&#xff1a;教程里的demo跑得飞起&#xff0c;换成自己的文档就拉胯。 为什么&#xff1f; 因为大多数教程在教你怎么跑…

作者头像 李华
网站建设 2026/3/25 23:44:01

前端知识,什么是BFC?,零基础入门到精通,收藏这篇就够了

什么是BFC&#xff1f; BFC全称是Block Formatting Context&#xff0c;即块格式化上下文。它是CSS2.1规范定义的&#xff0c;关于CSS渲染定位的一个概念。要明白BFC到底是什么&#xff0c;首先来看看什么是视觉格式化模型。视觉格式化模型 视觉格式化模型(visual formatting…

作者头像 李华
网站建设 2026/3/14 8:15:15

26、Unix 系统中编码压缩文件处理与软件安装指南

Unix 系统中编码压缩文件处理与软件安装指南 1. 编码与压缩文件的命令组合 在处理编码和压缩文件时,我们通常会使用不同的命令来进行编码/解码、打包/解包、压缩/解压缩以及归档/解归档操作。不过,很多时候我们可以将这些命令组合起来按顺序执行,这样能节省时间和精力。 …

作者头像 李华