news 2026/5/29 21:29:21

RabbitMQ 事务与消息分发

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
RabbitMQ 事务与消息分发

RabbitMQ 事务与消息分发详解

在使用 RabbitMQ 构建异步系统时,消息能不能可靠发送、消费者能不能稳定处理、多个消费者之间能不能合理分担压力,都会直接影响系统的稳定性。本文围绕 RabbitMQ 的事务机制和消息分发机制展开,重点说明它们解决的问题、核心配置方式以及在实际业务中的使用思路。

RabbitMQ 事务机制

RabbitMQ 基于 AMQP 协议实现,而 AMQP 协议本身支持事务机制,因此 RabbitMQ 也提供了事务能力。事务的核心作用是保证消息发送或接收过程具备原子性:要么全部成功,要么全部失败。

在 Spring AMQP 中,也可以通过事务管理器配合RabbitTemplate来完成事务控制。典型场景是:业务代码中连续发送多条消息,如果中间出现异常,希望前面已经执行的消息发送操作也能够回滚,避免系统进入半成功状态。

配置事务管理器

在 Spring Boot 项目中,可以通过RabbitTransactionManager开启 RabbitMQ 事务支持。同时,需要将RabbitTemplatechannelTransacted设置为true,表示该模板使用事务信道发送消息。

importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.rabbit.transaction.RabbitTransactionManager;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassTransactionConfig{@BeanpublicRabbitTransactionManagertransactionManager(CachingConnectionFactoryconnectionFactory){returnnewRabbitTransactionManager(connectionFactory);}@BeanpublicRabbitTemplaterabbitTemplate(CachingConnectionFactoryconnectionFactory){RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);returnrabbitTemplate;}}

这里有两个关键点:

  1. RabbitTransactionManager用来管理 RabbitMQ 事务。
  2. rabbitTemplate.setChannelTransacted(true)表示发送消息时启用事务信道。

如果只配置事务管理器,却没有让RabbitTemplate使用事务信道,消息发送过程并不会真正受到事务控制。

声明事务测试队列

为了验证事务效果,可以先声明一个普通的持久化队列:

@Bean("transQueue")publicQueuetransQueue(){returnQueueBuilder.durable("trans_queue").build();}

该队列用于接收事务发送的消息。

生产者发送消息

生产者中使用@Transactional注解开启事务控制。下面的示例中,第一条消息发送之后,代码主动制造了一个异常,因此第二条消息不会执行。

importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.transaction.annotation.Transactional;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RequestMapping("/trans")@RestControllerpublicclassTransactionProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;@Transactional@RequestMapping("/send")publicStringsend(){rabbitTemplate.convertAndSend("","trans_queue","trans test 1...");inta=5/0;rabbitTemplate.convertAndSend("","trans_queue","trans test 2...");return"发送成功";}}

如果不加@Transactional,第一条消息已经发送到 RabbitMQ,后续代码即使抛出异常,也不会影响这条消息。

如果加上@Transactional,当方法执行过程中出现异常时,整个消息发送过程会回滚。也就是说,第一条消息和第二条消息都不会真正发送成功。

事务机制适合解决什么问题

RabbitMQ 事务机制适合用在对一致性要求较高的场景。例如:

  1. 一个业务动作需要连续发送多条消息,必须保证这些消息同时成功或同时失败。
  2. 消息发送逻辑和本地业务逻辑需要放在同一个事务流程中控制。
  3. 某些关键链路不能接受“前一条消息发送成功、后一条消息发送失败”的中间状态。

不过,事务机制会带来额外性能开销。因为事务需要等待提交或回滚结果,吞吐量会受到影响。在高并发消息发送场景中,更常见的做法是使用发送方确认机制来保证消息投递可靠性;事务更适合对一致性要求更强、吞吐要求相对没那么极端的业务。

RabbitMQ 消息分发机制

当一个队列绑定多个消费者时,RabbitMQ 会把队列中的消息分发给不同消费者处理。每条消息只会发送给订阅该队列的某一个消费者。这种模式非常适合横向扩展:当业务压力变大时,可以增加更多消费者实例来提升整体消费能力。

RabbitMQ 默认使用轮询方式分发消息。也就是说,消息会按照顺序依次分配给不同消费者,而不会主动判断消费者当前是否忙碌、处理速度是否足够快。

这种默认方式在消费者处理能力相近时问题不大。但如果某些消费者处理速度很慢,另一些消费者处理速度很快,就可能出现不均衡现象:慢消费者手里堆着大量未确认消息,快消费者却处于空闲状态,最终导致整体吞吐量下降。

要解决这个问题,可以使用channel.basicQos(int prefetchCount)控制消费者一次最多能够持有多少条未确认消息。

prefetchCount 的作用

prefetchCount用来限制消费者预取消息的数量。RabbitMQ 会为消费者维护一个未确认消息计数:

  1. 每发送一条消息给消费者,计数加一。
  2. 消费者确认一条消息后,计数减一。
  3. 当未确认数量达到prefetchCount上限时,RabbitMQ 暂停继续向该消费者推送消息。
  4. 等消费者确认已有消息后,RabbitMQ 才会继续发送新消息。

这种机制类似滑动窗口,可以让 RabbitMQ 根据消费者的处理进度控制消息流量。

需要注意的是,prefetchCount设置为0表示没有上限。另外,它主要针对推模式消费生效,对拉模式消费无效。

消费端限流

限流是prefetchCount最常见的使用场景之一。

假设订单系统正常每秒最多处理 5000 个请求,但在秒杀活动中,请求量瞬间上涨到每秒 10000 个。如果所有请求都通过 MQ 一次性推送给订单服务,订单服务很可能会被压垮。

这时可以使用 RabbitMQ 的消费端限流机制,让消费者一次只拉取固定数量的消息。这样即使队列中瞬间堆积大量请求,消费者也会按照自身处理能力逐批处理。

配置手动确认和 prefetch

使用消费端限流时,需要配合手动确认模式。因为 RabbitMQ 判断消费者是否还能继续接收消息,依赖的是“未确认消息数量”。

spring:rabbitmq:listener:simple:acknowledge-mode:manualprefetch:5

上面的配置表示:每个消费者最多同时持有 5 条未确认消息。

配置交换机和队列

可以声明一个直连交换机、一个队列,并通过路由键完成绑定。

importcom.bite.rabbitmq.constant.Constant;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.Exchange;importorg.springframework.amqp.core.ExchangeBuilder;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.QueueBuilder;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassQosConfig{@Bean("qosExchange")publicExchangeqosExchange(){returnExchangeBuilder.directExchange(Constant.QOS_EXCHANGE_NAME).durable(true).build();}@Bean("qosQueue")publicQueueqosQueue(){returnQueueBuilder.durable(Constant.QOS_QUEUE).build();}@Bean("qosBinding")publicBindingqosBinding(@Qualifier("qosExchange")Exchangeexchange,@Qualifier("qosQueue")Queuequeue){returnBindingBuilder.bind(queue).to(exchange).with("qos").noargs();}}

一次发送多条消息

为了观察限流效果,可以一次性发送 20 条消息:

@RequestMapping("/qos")publicStringqos(){for(inti=0;i<20;i++){rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE_NAME,"qos","qos test..."+i);}return"发送成功";}

消费者监听队列

消费者监听队列后,先打印消息内容和deliveryTag。如果暂时不调用basicAck,就可以观察到 RabbitMQ 的限流效果。

importcom.bite.rabbitmq.constant.Constant;importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassQosQueueListener{@RabbitListener(queues=Constant.QOS_QUEUE)publicvoidlistenerQueue(Messagemessage,Channelchannel)throwsException{longdeliveryTag=message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息: %s, deliveryTag: %d%n",newString(message.getBody(),"UTF-8"),deliveryTag);// 手动确认后,RabbitMQ 才会继续向该消费者推送新的消息// channel.basicAck(deliveryTag, true);}}

prefetch: 5的配置下,如果消费者没有确认消息,控制台最多只会打印 5 条消息。此时管理界面中可以看到:还有 15 条消息处于 Ready 状态,5 条消息处于 Unacked 状态。

如果去掉prefetch: 5配置,消费者可能会一次性收到全部 20 条消息。对于处理能力有限的服务来说,这就很容易形成瞬时压力。

更合理的消费者负载分配

prefetchCount不仅可以限流,还可以让多个消费者之间的消息分配更加合理。

默认轮询分发只关心“该轮该发给谁”,不关心消费者是否已经处理完上一条消息。因此,当两个消费者处理速度不同,一个很快、一个很慢时,慢消费者仍然可能持续拿到新消息,导致它手里的任务越积越多。

一种常见优化方式是将prefetch设置为1

spring:rabbitmq:listener:simple:acknowledge-mode:manualprefetch:1

这表示 RabbitMQ 每次只给消费者分配一条未确认消息。在消费者确认当前消息之前,不会继续给它推送新消息,而是把消息分配给其他空闲消费者。

启动两个消费者观察效果

下面的代码模拟两个消费者,其中第二个消费者通过Thread.sleep(100)模拟较慢的处理速度。

importcom.bite.rabbitmq.constant.Constant;importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassQosQueueListener{@RabbitListener(queues=Constant.QOS_QUEUE)publicvoidlistenerQosQueue(Messagemessage,Channelchannel)throwsException{longdeliveryTag=message.getMessageProperties().getDeliveryTag();System.out.printf("消费者1接收到消息: %s, deliveryTag: %d%n",newString(message.getBody(),"UTF-8"),deliveryTag);channel.basicAck(deliveryTag,true);}@RabbitListener(queues=Constant.QOS_QUEUE)publicvoidlistenerQueue2(Messagemessage,Channelchannel)throwsException{longdeliveryTag=message.getMessageProperties().getDeliveryTag();System.out.printf("消费者2接收到消息: %s, deliveryTag: %d%n",newString(message.getBody(),"UTF-8"),deliveryTag);Thread.sleep(100);channel.basicAck(deliveryTag,true);}}

当发送多条消息后,可以看到处理速度快的消费者会消费更多消息,而处理速度慢的消费者不会被持续塞满任务。这样整体消费能力会更加接近真实处理能力,而不是被固定轮询策略限制。

另外,日志中可能会看到两个消费者的deliveryTag出现重复。这是正常现象,因为deliveryTag是在每个 Channel 内部独立递增的,不同 Channel 之间并不共享同一个计数器。

总结

RabbitMQ 的事务机制主要用于保证消息操作的原子性。在 Spring AMQP 中,可以通过RabbitTransactionManager、事务信道和@Transactional配合使用,让消息发送过程支持回滚。不过事务会影响性能,因此更适合一致性要求较高的场景。

消息分发机制则关注消费者之间如何分担消息。RabbitMQ 默认采用轮询分发,简单直接,但在消费者处理速度不一致时可能造成任务堆积。通过prefetchCount可以限制消费者持有的未确认消息数量,从而实现消费端限流,也可以让多个消费者之间的任务分配更加公平。

在实际项目中,可以按照下面的思路选择:

  1. 需要保证多条消息同时成功或失败时,考虑使用事务。
  2. 需要保护下游服务,避免瞬时流量压垮消费者时,配置手动确认和合理的prefetch
  3. 多个消费者处理能力不一致时,可以设置prefetch: 1,让 RabbitMQ 优先把消息分配给更空闲的消费者。
  4. 对性能要求很高的消息发送链路,应谨慎使用事务,并结合发送方确认机制设计可靠投递方案。

理解事务和消息分发后,RabbitMQ 就不只是一个简单的消息中转站,而是可以在可靠性、吞吐量和消费者负载之间进行细粒度调节的消息系统。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/29 21:22:10

从零自制直流电机:电磁原理与动手实践详解

1. 项目概述&#xff1a;亲手造一个会转的“魔法”如果你对身边那些会转的东西感到好奇——比如风扇的叶片、玩具车的轮子&#xff0c;或者电动牙刷的刷头——那么你很可能已经对直流电机产生了兴趣。这玩意儿不是什么遥不可及的高科技&#xff0c;它本质上就是电磁学原理最直观…

作者头像 李华
网站建设 2026/5/29 21:21:24

163MusicLyrics:一键获取多平台音乐歌词的终极指南

163MusicLyrics&#xff1a;一键获取多平台音乐歌词的终极指南 【免费下载链接】163MusicLyrics 云音乐歌词获取处理工具【网易云、QQ音乐】 项目地址: https://gitcode.com/GitHub_Trending/16/163MusicLyrics 在数字音乐时代&#xff0c;你是否曾为了找到一首歌的完整…

作者头像 李华