news 2026/5/14 19:19:28

Java-212 RabbitMQ 消息可靠性进阶:Publisher Confirms、mandatory Return、持久化与幂等落地

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Java-212 RabbitMQ 消息可靠性进阶:Publisher Confirms、mandatory Return、持久化与幂等落地

TL;DR

  • 场景:支付/充值等需要最终一致性的链路,用 RabbitMQ 做异步解耦但必须可追责不丢单。
  • 结论:Confirm 解决“Broker 收到”,mandatory+Return 解决“路由失败可见”,持久化+幂等兜底“宕机/重投/重复”。
  • 产出:同步 Confirm、批量 Confirm、异步 Confirm 三套 Java 模板 + 一张常见故障速查卡。

RabbitMQ 高级特性

消息可靠性

一般我们使用支付宝或者微信转账的时候,都是扫码支付,然后立刻得到结果,说你支付了多少多少钱,如果你绑定的是卡,可能这个时候你并没有收到支付的确认消息,往往是过了一段时间之后,你会收到发来的短信,告诉你支付的信息。

支付平台必须保证数据正确性,保证数据并发安全性,保证数据最终一致性。

分布式锁

这个比较容易理解,就是操作某条数据的时候先锁定,可以用Redis、ZooKeeper等来实现。在我们修改订单的时候,先锁定该账单,如果该账单有并发操作,后面的操作只有等待上一个操作的锁释放后再依次执行。

  • 优点:能够保证数据的强一致性。
  • 缺点:高并发场景下可能有性能问题。

消息队列

消息队列是为了保证最终一致性,我们需要确保消息队列有ACK机制,客户端收到消息并消费处理之后,客户端发送ACK消息给消息中间件,如果中间件超过指定时间还没有收到ACK消息,则定时去重发消息。

比如我们在用户充值完成之后,会发送充值消息给账户系统,账户系统再去更改账户余额。
优点:异步、高并发
缺点:有一定延时,数据弱一致性,并且必须能够保证该业务操作肯定能够成功,不能失败。

如何保证

我们从以下几个方面来保证消息的可靠性:

  • 客户端代码中的异常捕获,包括生产者和消费者
  • AMQP、RabbitMQ 的事务机制
  • 发送端确认机制
  • 消息持久化机制
  • Broker端的高可用集群
  • 消费者确认机制
  • 消费端限流
  • 消息幂等性

异常捕获

先执行业务操作,业务操作成功后执行消息发送,消息发送的过程用try catch进行捕获,在一场处理的代码中执行回滚业务或者执行重复操作等,这是一种最大努力的确保方式,并无法保证100%绝对可靠,因为这里没有异常并不代表消息就一定投递成功。

另外,可以通过 spring.rabbitmq.template.retry.enabled=true 配置开启发送端的重试。

事务机制

AMQP、RabbitMQ的事务机制,没有捕获到异常不能代表消息就一定投递成功了。
一直到事务提交之后都没有异常,说明消息确实投递成功了,但是,这种方式在性能方面开销比较大,一般也不推荐使用。

发送确认

RabbitMQ 后来引入了一种轻量级的方式,叫发送方确认(publisher confirm)机制。生产者将信道设置为confirm模式,一旦信道进入confirm模式,所有在该信道上发布的消息都会被拍成一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后(如果消息和队列是持久化的,那么确认消息也会在消息持久化后发出),RabbitMQ会发送一个确认Basic.Ack给生产者,包含消息的唯一ID,这样生产者就知道消息已经正确的送达了。

RabbitMQ 回传给生产者的确认消息中的 deliveryTag 字段包含了确认消息的序号,另外,通过设置 channel.basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息是否都已经得到了处理。
生产者投递消息后并不需要一直阻塞着,可以继续投递下一个消息并通过回调的方式处理ACK响应。
如果 RabbitMQ 因为自身内部错误导致消息丢失等异常情况发生,就会响应一条(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该Nack命令。

/** * Publisher Confirms(同步等待确认)示例 * * 语义: * 1) confirmSelect():把 Channel 切换到 confirm 模式(broker 会对 publish 做 ack/nack) * 2) waitForConfirmsOrDie(timeout):阻塞等待本批消息的 confirm * - 全部 ack:正常返回 * - 任意 nack / channel 关闭:抛 IOException * - 超时:抛 TimeoutException * * 注意: * - Confirm 只保证“broker 收到了并落到交换机层面”,不保证“路由到了队列” * - 要保证“路由不到队列也能感知”,需要 basicPublish 的 mandatory=true + ReturnListener/ReturnCallback */publicclassPublisherConfirmsDemo{privatestaticfinalStringEXCHANGE="ex.publisher.confirms";privatestaticfinalStringQUEUE="q.publisher.confirms";privatestaticfinalStringROUTING_KEY=QUEUE;publicstaticvoidmain(String[]args)throwsException{ConnectionFactoryfactory=newConnectionFactory();factory.setHost("node1");factory.setVirtualHost("/");factory.setUsername("root");factory.setPassword("123456");factory.setPort(5672);// try-with-resources:确保连接/通道被关闭,避免泄漏try(Connectionconnection=factory.newConnection();Channelchannel=connection.createChannel()){// 1) 开启 publisher confirms(必须在 publish 前调用)channel.confirmSelect();// 2) 声明拓扑(demo 场景放在代码里;生产一般由运维/启动时统一声明)channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.DIRECT,false,false,null);// 生产环境更常用 durable=true;这里按你原始参数保留 falsechannel.queueDeclare(QUEUE,false,false,false,null);channel.queueBind(QUEUE,EXCHANGE,ROUTING_KEY);// 3) 开启 mandatory:路由不到队列时 broker 会 Return 回来(否则静默丢弃)channel.addReturnListener((replyCode,replyText,exchange,routingKey,properties,body)->{Stringreturned=newString(body,StandardCharsets.UTF_8);System.err.printf("Return: code=%d text=%s ex=%s rk=%s body=%s%n",replyCode,replyText,exchange,routingKey,returned);});Stringmessage="hello";// 4) 发布消息// mandatory=true:保证路由失败能被感知(配合 ReturnListener)channel.basicPublish(EXCHANGE,ROUTING_KEY,true,// mandatorynull,// props: 可在这里设置 deliveryMode(持久化)、headers 等message.getBytes(StandardCharsets.UTF_8));// 5) 同步等待确认(最简单,但吞吐低;高吞吐用异步 ConfirmListener)try{channel.waitForConfirmsOrDie(5_000);System.out.println("Confirm ACK:message = "+message);}catch(TimeoutExceptione){// broker 在超时时间内没回确认(网络抖动、broker 压力大等)System.err.println("Confirm 超时:message = "+message);throwe;}catch(InterruptedExceptione){// 线程被中断;通常需要恢复中断标记Thread.currentThread().interrupt();System.err.println("等待 Confirm 被中断:message = "+message);throwe;}catch(IOExceptione){// 任意 nack 或通道异常关闭,都会进这里System.err.println("Confirm NACK/通道异常:message = "+message);throwe;}}}}

waitForConfirm 方法有个重载的,可以自定义 timeout 超时时间,超时会抛出 TimeoutException。类似的有:waitForConfirmsOrDie方法,Broker端在返回 Nack(Basic.Nack)之后该方法会抛出 io.IOException。
需要根据异常类型来区别处理,TimeoutException 超时是属于第三状态(不论确定成功还是失败),而返回 Basic.Nack 抛出 IOException 这种明确的失败。
刚才的代码主要是演示 confirm 机制,实际上还是同步阻塞的,性能并不是太好。

实际上,我们可以通过批处理的方式来改善整体的性能(即批量发送消息仅调用一次 waitForConfirms)。
正常情况下这种批量处理的方式效率会高很多,但是如果发生了超时或者nack(失败)后那就需要批量重发消息或者通知上游业务批量回滚(因为我们只知道这个批次中有消息投递没成功,但是不知道是哪条失败了,所以很难针对性的处理)。
如此看来,批量重发肯定会造成部分数据的重复,另外,我们可以通过异步回调的方式来处理Broker的响应。
addConfirmListener 方法可以添加 ConfirmListener 这个回调接口,这个 ConfirmListener 接口包含:

  • handleAck:RabbitMQ回传的Basic.Ack
  • handleNack:RabbitMQ回传的Baisc.Nack

原生API批处理

/** * Publisher Confirms:批量确认(batch confirms) * * 核心思路: * - 连续 publish N 条消息(batchSize) * - 每发满一批,就 waitForConfirmsOrDie() 阻塞等待这批全部 ack/nack * - 循环结束后,把最后不足一批的 outstanding 再 confirm 一次 * * 特性/边界: * - 性能比“每条都 confirm”高,但仍是同步阻塞模型 * - 一旦 waitForConfirmsOrDie 抛异常,你只知道“这批里至少有失败”,无法精确到是哪一条 * 精确定位要用异步 confirm + publish sequence number 映射(ConfirmListener/异步回调) */publicclassBatchPublisherConfirmsDemo{privatestaticfinalStringEXCHANGE=EX_PUBLISHER_CONFIRMS;privatestaticfinalStringQUEUE=QUEUE_PUBLISHER_CONFIRMS;privatestaticfinalStringROUTING_KEY=QUEUE_PUBLISHER_CONFIRMS;publicstaticvoidpublishBatch(ConnectionFactoryfactory)throwsException{// 关闭资源:避免连接/通道泄漏try(Connectionconnection=factory.newConnection();Channelchannel=connection.createChannel()){// 1) 开启 confirm 模式channel.confirmSelect();// 2) 声明拓扑(demo 场景写在代码里)channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.DIRECT,false,false,null);channel.queueDeclare(QUEUE,false,false,false,null);channel.queueBind(QUEUE,EXCHANGE,ROUTING_KEY);// 3) 批量参数finalintbatchSize=10;// 每批确认多少条finalinttotalMessages=102;// 总消息数intoutstanding=0;// 当前批次已发送但未确认的数量for(inti=0;i<totalMessages;i++){// 每条消息带上序号,便于日志定位(即使这里用的是批量确认)Stringbody="hello-"+i;// publish(这里不启 mandatory;如果要覆盖“路由不到队列”请改为 mandatory=true + ReturnListener)channel.basicPublish(EXCHANGE,ROUTING_KEY,null,body.getBytes(StandardCharsets.UTF_8));outstanding++;// 4) 满一批就阻塞等待 broker 确认if(outstanding==batchSize){waitBatchConfirmOrThrow(channel,5_000);System.out.println("批消息确认:size="+batchSize);outstanding=0;}}// 5) 处理最后不足一批的尾巴if(outstanding>0){waitBatchConfirmOrThrow(channel,5_000);System.out.println("批消息确认:tailSize="+outstanding);}}}privatestaticvoidwaitBatchConfirmOrThrow(Channelchannel,longtimeoutMs)throwsIOException,InterruptedException,TimeoutException{try{channel.waitForConfirmsOrDie(timeoutMs);}catch(InterruptedExceptione){Thread.currentThread().interrupt();// 恢复中断标记throwe;}}}

还可以使用回调方法:

/** * Publisher Confirms:异步确认(最高吞吐、可精确定位哪条消息 ack/nack) * * 核心机制: * - 每次 publish 都有一个递增的 publishSeqNo(channel 级别序列号) * - broker 异步返回 ack/nack: * - sequenceNumber:确认到的序列号 * - multiple=true:表示 <= sequenceNumber 的全部都被 ack/nack(批量确认) * * 实现策略: * - outstandingConfirms:用 seqNo -> message 的映射保存“已发送但未确认”的消息 * - ack 回调:从 map 中删除对应 seqNo(multiple 则清空 headMap) * - nack 回调:删除并记录失败的消息(multiple 则取出 headMap 做失败处理) * * 注意: * - confirm 只保证“broker 收到并处理了 publish”,不保证一定路由到队列。 * 要覆盖“路由失败”,需要 basicPublish mandatory=true + ReturnListener。 */publicclassAsyncPublisherConfirmsDemo{privatestaticfinalStringEXCHANGE=EX_PUBLISHER_CONFIRMS;privatestaticfinalStringQUEUE=QUEUE_PUBLISHER_CONFIRMS;privatestaticfinalStringROUTING_KEY=QUEUE_PUBLISHER_CONFIRMS;publicstaticvoidpublishAsync(ConnectionFactoryfactory)throwsException{try(Connectionconnection=factory.newConnection();Channelchannel=connection.createChannel()){// 1) 启用 confirm 模式(必须先调用)channel.confirmSelect();// 2) 声明拓扑(demo:非持久化)channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.DIRECT,false,false,null);channel.queueDeclare(QUEUE,false,false,false,null);channel.queueBind(QUEUE,EXCHANGE,ROUTING_KEY);// 3) 保存“未确认消息”:seqNo -> payloadConcurrentNavigableMap<Long,String>outstanding=newConcurrentSkipListMap<>();// 4) ACK 回调:清理已确认的消息ConfirmCallbackackCallback=(seqNo,multiple)->{if(multiple){// <= seqNo 的全部都 ack 了ConcurrentNavigableMap<Long,String>head=outstanding.headMap(seqNo,true);head.clear();}else{outstanding.remove(seqNo);}};// 5) NACK 回调:清理并记录失败的消息(这里只打印;生产一般做重试/落库)ConfirmCallbacknackCallback=(seqNo,multiple)->{if(multiple){// <= seqNo 的全部都 nack 了(或至少 broker 表示这批失败)ConcurrentNavigableMap<Long,String>head=outstanding.headMap(seqNo,true);head.forEach((k,v)->System.err.println("NACK: seqNo="+k+" msg="+v));head.clear();}else{Stringmsg=outstanding.remove(seqNo);System.err.println("NACK: seqNo="+seqNo+" msg="+msg);}};// 6) 注册 confirm 监听器(异步)channel.addConfirmListener(ackCallback,nackCallback);// (可选)路由失败可见性:mandatory + ReturnListener// channel.addReturnListener((replyCode, replyText, ex, rk, props, body) ->// System.err.printf("Return: code=%d text=%s ex=%s rk=%s body=%s%n",// replyCode, replyText, ex, rk, new String(body, StandardCharsets.UTF_8)));// boolean mandatory = true;// 7) 高速发送:先拿到 seqNo,再 publish,再 put 到 outstanding// 顺序必须是:getNextPublishSeqNo -> basicPublish -> outstanding.put// 因为 seqNo 属于“即将发送的这条消息”Stringprefix="hello-";inttotal=1000;for(inti=0;i<total;i++){Stringpayload=prefix+i;longseqNo=channel.getNextPublishSeqNo();channel.basicPublish(EXCHANGE,ROUTING_KEY,null,payload.getBytes(StandardCharsets.UTF_8)// 如果启用 mandatory:把上面 null 换成 (mandatory, props, body) 的重载);outstanding.put(seqNo,payload);}// 8) 这里不阻塞等待(异步回调会持续清理 outstanding)// 如果你需要“确保全部确认后再退出”,可以自旋等待 outstanding 为空:// while (!outstanding.isEmpty()) { Thread.sleep(10); }}}}

持久化机制

持久化是提高RabbitMQ可靠性的基础,否则当RabbitMQ遇到异常的时候,比如断电、重启、停机等等,主要以下面的几个方面来保障消息的持久性:

  • Exchange的持久化,通过定义时设置durable参数为true来保证Exchange相关的元数据不丢失
  • Queue的持久化,也是通过定义设置durable参数为true来保证Queue相关的元数据不丢失
  • 消息的持久化,通过将消息投递模式(BasicProperties 中的 deliveryMode 属性)配置为2,即可实现消息持久化,保证消息自身不丢失。

RabbitMQ 中的持久化都需要写磁盘,当系统内存不够的时候,非持久化消息也会被刷到磁盘中,这些处理动作都是在持久层中完成的,持久层是一个逻辑上的概念,实际上包含两个部分:

  • 队列索引(rabbit_queue_index),负责维护 Queue 中消息的信息,包括消息的存储位置、是否已交给消费者、是否已被消费及ACK确认等等,每个Queue都有与之对应的 rabbit_queue_index。
  • 消息存储(rabbit_msg_store),rabbit_msg_store 以键值对的形式存储消息,它被所有队列共享,在每个节点中有且只有一个。

在 rabbit_home msg_stores vhosts/$VHostId 这个路径下包含 queues、msg_store_persistent、msg_store_transient 这3个目录,这是实机存储消息的位置。
其中 queues 目录中保持着 rabbit_queue_index 相关的数据,而 msg_store_persistent 保持着持久化的消息数据,msg_store_transient 保存着非持久化相关的数据。

另外,RabbitMQ 通过配置 queue_index_embed_msgs_below

错误速查

症状根因定位修复
“发送成功”,但队列里没有消息且无报错未启用 mandatory,路由不到队列被静默丢弃看是否注册 ReturnListener/ReturnCallback;检查交换机/路由键/绑定basicPublish mandatory=true + ReturnListener;或在 Spring AMQP 开启 publisher-returns + template.mandatory
waitForConfirmsOrDie 报 TimeoutExceptionBroker 压力大/网络抖动/磁盘 IO 慢导致确认迟到Broker 日志、连接 RTT、磁盘利用率;确认超时是否集中出现增大 timeout;改异步 Confirm;降低单通道并发/分通道;优化磁盘与队列类型并压测
waitForConfirmsOrDie 报 IOException(Nack/通道异常)Broker 返回 Nack 或 channel 被关闭客户端日志中是否有 Basic.Nack / channel shutdown;Broker 端错误日志对 Nack 做重试/落库补偿;排查 broker 内部错误与资源耗尽;必要时熔断降级
批量 Confirm 失败后无法确认哪条消息失败批量确认只给“批次结果”,缺少 seqNo->消息映射是否仅使用 batch wait;是否没有 outstanding map需要精确定位时改异步 Confirm:getNextPublishSeqNo + outstanding 映射 + ack/nack 回调处理
Broker 重启/宕机后消息丢失Exchange/Queue 未 durable 或消息未持久化(deliveryMode!=2)检查 exchangeDeclare/queueDeclare 的 durable;消息 properties 是否设置持久化Exchange durable=true、Queue durable=true、消息 deliveryMode=2;并接受“持久化≠绝对不丢”,仍要补偿/对账
消费者重复消费/余额被加两次重投/至少一次投递语义下未做幂等业务侧是否有唯一键/去重表/幂等 token;是否存在重试与重复投递以业务唯一键做幂等(DB 唯一约束/去重表/状态机);把“扣减/入账”做成可重入
吞吐低、延迟高同步 confirm 每条阻塞等待线程栈/耗时在 waitForConfirmsOrDie;TPS 与 CPU 利用不匹配改批量 confirm 或异步 confirm;多通道并发;按链路压测选参数
磁盘占用异常/写放大明显持久化+高峰流量导致 msg_store/index 压力看 rabbitmq-diagnostics、磁盘 IO、队列积压、内存 watermark限流/削峰;优化队列模型与消息体;按业务拆分 vhost/队列;必要时调整内存/磁盘策略并压测

其他系列

🚀 AI篇持续更新中(长期更新)

AI炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有部署 测试上手 架构研究,持续打造实用AI工具指南!
AI研究-132 Java 生态前沿 2025:Spring、Quarkus、GraalVM、CRaC 与云原生落地
🔗 AI模块直达链接

💻 Java篇持续更新中(长期更新)

Java-207 RabbitMQ Direct 交换器路由:RoutingKey 精确匹配、队列多绑定与日志分流实战
MyBatis 已完结,Spring 已完结,Nginx已完结,Tomcat已完结,分布式服务已完结,Dubbo已完结,MySQL已完结,MongoDB已完结,Neo4j已完结,FastDFS 已完结,OSS已完结,GuavaCache已完结,EVCache已完结,RabbitMQ正在更新… 深入浅出助你打牢基础!
🔗 Java模块直达链接

📊 大数据板块已完成多项干货更新(300篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!
大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT案例 详解
🔗 大数据模块直达链接

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/14 14:58:33

基于GPT-SoVITS构建虚拟主播语音系统的技术路径

基于GPT-SoVITS构建虚拟主播语音系统的技术路径 在直播与虚拟内容爆发式增长的今天&#xff0c;一个关键问题正摆在内容创作者面前&#xff1a;如何让虚拟主播真正“活”起来&#xff1f;不是靠预录语音循环播放&#xff0c;而是能实时回应弹幕、自然讲述故事、拥有独一无二的…

作者头像 李华
网站建设 2026/5/14 19:18:56

手把手教你部署Open-AutoGLM视频生成系统,10分钟快速上手不踩坑

第一章&#xff1a;Open-AutoGLM视频生成系统概述Open-AutoGLM 是一个基于生成式语言模型与扩散模型融合架构的开源视频生成系统&#xff0c;旨在实现从自然语言描述到高质量动态视频内容的端到端生成。该系统结合了文本理解、时序建模与多帧一致性优化技术&#xff0c;支持用户…

作者头像 李华
网站建设 2026/5/14 9:11:31

网页元素定位神器:5步教你轻松搞定页面元素查找难题

网页元素定位神器&#xff1a;5步教你轻松搞定页面元素查找难题 【免费下载链接】xpath-helper-plus 项目地址: https://gitcode.com/gh_mirrors/xp/xpath-helper-plus 还在为网页元素定位而苦恼吗&#xff1f;无论是前端开发、自动化测试还是数据采集&#xff0c;精准…

作者头像 李华
网站建设 2026/5/7 21:15:34

WebTopo:专业级Web拓扑图编辑器的深度技术解析与实践指南

WebTopo&#xff1a;专业级Web拓扑图编辑器的深度技术解析与实践指南 【免费下载链接】WebTopo 基于VUE的web组态&#xff08;组态&#xff0c;拓扑图&#xff0c;拓扑编辑器&#xff09; 项目地址: https://gitcode.com/gh_mirrors/we/WebTopo 在当今数字化浪潮中&…

作者头像 李华
网站建设 2026/5/14 9:05:07

Win10 on ARM固件分区结构分析:图解说明部署步骤

深入Win10 on ARM固件世界&#xff1a;从分区结构到系统部署的实战图解 你有没有遇到过这样的场景&#xff1f;手头有一块基于高通骁龙的ARM开发板&#xff0c;想刷个Windows 10&#xff0c;却发现传统x86那一套PE启动、DiskGenius分区的老办法完全失效——设备根本点不亮。这…

作者头像 李华
网站建设 2026/5/14 13:19:19

MANO手部模型实战指南:构建下一代3D交互应用的核心技术

MANO手部模型实战指南&#xff1a;构建下一代3D交互应用的核心技术 【免费下载链接】MANO A PyTorch Implementation of MANO hand model. 项目地址: https://gitcode.com/gh_mirrors/ma/MANO 你是否曾为3D手部建模的复杂性而困扰&#xff1f;传统方法需要繁琐的骨骼绑定…

作者头像 李华