news 2026/4/4 22:49:59

RabbitMQ高级特性----生产者确认机制

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
RabbitMQ高级特性----生产者确认机制

题记:在Java微服务开发中,对于一个功能需要调用另一个服务下的功能才能实现的情况,我们通常会使用异步调用取代同步调用,进而实现增强业务的可拓展性和实现故障隔离以及流量削峰填谷的目的。而消息队列就是异步调用的解决方案之一。不过在使用消息队列实现异步调用的时候,可能会出现消息无法传递到位进而导致业务信息出现差异的情况,因此消息的传递的可靠性就显得尤为重要。

传递消息的流程:

要保障消息传递的可靠性,我们可以从消息队列的每一步分析,以RabbitMQ为例,其发送消息的流程大致如下图所示:

不难发现消息传递一共会经历三名角色,分别是消息发送者,MQ和消息接收者。因此我们要保证消息成功传递并被正确处理就需要保证这三者的可靠性。

发送者可靠性:

1.1生产者重连

首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。

我们可以在消息发送者的yml配置文件中加入如下信息:

spring: rabbitmq: connection-timeout: 1s # 设置MQ的连接超时时间 template: retry: enabled: true # 开启超时重试机制,默认时关闭的 initial-interval: 1000ms # 失败后的初始等待时间 multiplier: 2 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier max-attempts: 3 # 最大重试次数

我们可以在消息发送者工程下创建一个Test模拟向rabbitMQ发送消息(在此之前我们需要把rabbiitMQ关闭或是断掉网络)

@Test public void TestTimeOutPublish(){ rabbitTemplate.convertAndSend("simple.queue","Hello,world"); }

代码执行后会是这样的效果:

通过日志我们可以观察到一共重连了三次,与我们在yml文件中配置的max-attempt属性一致。需要注意的是,这里等待的时间还需要再加上connect-timeout这一判定连接超时的配置。

1.2生产者确认机制

在保证网络畅通并且RabbitMQ服务正确启动了的前提下,我们就可以成功将消息发送到MQ中了。

不过仍有少数情况下会出现消息进入mq后丢失的情况:

  • 无法找到指定的exchange,大部分情况下就是交换机名称出错
  • exchange无法正确路由到queue,可能是routeKey错误导致的

针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher ConfirmPublisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执

当我们在yml文件中增加了相应的配置后流程就变得如下图所示:

配置完成之后:

  • 对于所有成功投递如Mq中的消息都会返回Ack,表示投递成功。
  • 对于投递成功但路由失败的消息,会返回Publish Return并返回Ack
  • 除此之外的所有消息都返回NAck,表示消息投递失败

需要注意的是,对于临时消息而言是进入队列Publisher Comfirm就是返回Ack,如果是持久消息则需要写入磁盘后才是返回Ack。

实现方式如下:

publisher消息发送者yml配置文件中加入:

spring: rabbitmq: publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型 publisher-returns: true # 开启publisher return机制

pubulisher-confirm-type一共有三种属性提供选择:

  1. none:默认选项,关闭
  2. correlated:异步回调返回回执
  3. simple:同步阻塞等待mq回执

并且我们还需要在配置类中配置confrim Return报文:

public class MqConfig { private final RabbitTemplate rabbitTemplate; @PostConstruct //在注入rabbitTemplate依赖后执行 public void init(){ rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { log.error("触发return callback,"); log.debug("exchange: {}", returned.getExchange()); log.debug("routingKey: {}", returned.getRoutingKey()); log.debug("message: {}", returned.getMessage()); log.debug("replyCode: {}", returned.getReplyCode()); log.debug("replyText: {}", returned.getReplyText()); } }); } }

对于不同的业务我们有不同的处理方案,因此CallbackComfirm需要在每次发送方法前定义,并且由作为convertMessage方法的参数,如下所示:

@Test void testPublisherConfirm() { // 1.创建CorrelationData CorrelationData cd = new CorrelationData(); // 2.给Future添加ConfirmCallback cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { @Override public void onFailure(Throwable ex) { // 2.1.Future发生异常时的处理逻辑,基本不会触发 log.error("send message fail", ex); } @Override public void onSuccess(CorrelationData.Confirm result) { // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容 if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执 log.debug("发送消息成功,收到 ack!"); }else{ // result.getReason(),String类型,返回nack时的异常描述 log.error("发送消息失败,收到 nack, reason : {}", result.getReason()); } } }); // 3.发送消息 rabbitTemplate.convertAndSend("simple.direct", "simple", "hello,RabbiteMQ", cd); }

我们可以在onSuccess和onFailure中编写我们对这一消息发送成功以及失败情况的对应处理。

如果关于上述有其他更好的建议以及疑问,欢迎留言,我将尽快回复。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/4 16:31:52

RabbitMQ 客户端 连接、发送、接收处理消息

RabbitMQ 客户端 连接、发送、接收处理消息 一. RabbitMQ 的机制跟 Tcp、Udp、Http 这种还不太一样 RabbitMQ 服务&#xff0c;不是像其他服务器一样&#xff0c;负责逻辑处理&#xff0c;然后转发给客户端 而是所有客户端想要向 RabbitMQ服务发送消息&#xff0c; 第一步&…

作者头像 李华
网站建设 2026/3/25 16:05:59

通信协议仿真:通信协议基础_(9).通信协议仿真案例分析

通信协议仿真案例分析 在上一节中&#xff0c;我们介绍了通信协议的基础知识&#xff0c;包括通信协议的定义、分类以及重要性。本节将通过具体的案例分析&#xff0c;深入探讨通信协议仿真的实际应用和实现方法。我们将从简单的串行通信协议开始&#xff0c;逐步分析更复杂的网…

作者头像 李华
网站建设 2026/4/3 20:40:01

autosar软件开发中诊断协议栈配置实践案例

AUTOSAR诊断协议栈配置实战&#xff1a;从UDS服务到DTC管理的全链路解析在一辆现代智能汽车中&#xff0c;当你用诊断仪读取一个故障码、刷新ECU程序&#xff0c;或是远程获取车辆实时数据时——背后支撑这一切的&#xff0c;正是AUTOSAR架构中的诊断通信协议栈。它不仅是连接整…

作者头像 李华
网站建设 2026/4/3 6:48:44

MPC5634 Bootloader

MPC5634 Bootloader嵌入式工程师最怕遇到设备变砖&#xff0c;而好的Bootloader设计就是咱们的救命稻草。今天咱们来盘一盘飞思卡尔MPC5634这颗工业级控制器的Bootloader实现&#xff0c;直接上干货不啰嗦。先说启动流程&#xff0c;这货上电先执行0x00地址的启动代码。来看关键…

作者头像 李华
网站建设 2026/3/19 6:36:27

【大模型越狱】【ICML2025】Weak-to-Strong Jailbreaking on Large Language Models

Abstract 大型语言模型(LLM)容易受到越狱攻击,导致生成有害、不道德或有偏见的内容。然而,现有的越狱方法计算成本高昂。本文提出了一种高效的推理时攻击方法——弱到强(weak-to-strong)越狱攻击,用于诱导对齐后的LLM生成有害文本。我们的核心观察是:越狱模型与安全模…

作者头像 李华