news 2026/4/9 23:34:06

RabbitMQ投递回调机制以及策略业务补偿

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
RabbitMQ投递回调机制以及策略业务补偿

在 RabbitMQ 中,生产者发送消息后,有可能遇到以下几种情况:

消息成功投递到交换机(Exchange)

消息未能成功投递到交换机(Exchange)

消息成功进入交换机但无法路由到队列(Queue)

如果生产者端没有回调确认机制,就可能出现严重的数据不一致:

举例: Redis 已经增加点赞数,但消息并未真正进入 MQ,数据库后续也无法更新,就出现了 “缓存超前、数据库缺失” 的问题。

为了解决这种问题,Spring AMQP 提供了:

RabbitTemplate.setConfirmCallback()

RabbitTemplate.setReturnsCallback()

来捕获和处理消息投递的成功与失败。

但是在复杂系统中,不同的业务消息(例如“下单”、“扣库存”、“发积分”)在投递失败时,需要采取不同的补偿逻辑。

弊端:如果你只写一份大而全的回调逻辑,代码就会充满大量的 if else 判断,非常难维护。

二、策略模式思想引入

策略模式的核心思想是:定义一系列算法(或行为),让它们可以相互替换,且算法的变化不会影响使用算法的客户。

“算法” ≈ “不同的消息回调处理逻辑”

“客户” ≈ “RabbitTemplate 的 ConfirmCallback 回调”

操作:通过(根据业务抽象)接口 + Map 注入,在运行时动态选择。

代码实现

1、定义统一的回调处理接口

public interface ConfirmCallbackService {

/**

* 投递失败后的回调处理

* @param message 投递的消息对象

*/

void confirmCallback(Message message);

}

例:定义点赞案例的实现类(可选):

public class LikeConfirmCallback implements ConfirmCallbackService{

/**

* 注入RedisTemplate

*/

private final RedisTemplate<String,Integer> redisTemplate;

/**

* 执行失败后的反向操作

* @param message 投递的消息对象

*/

@Override

public void confirmCallback(Message message) {

byte[] bytes = message.getBody();

//反向序列化为LikeDTO对象

try {

LikeDTO dto = new ObjectMapper().readValue(bytes, LikeDTO.class);

if(dto.getLikeStatus()){

redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+dto.getEid(), dto.getUid());

}else{

redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+dto.getEid(),dto.getUid());

}

} catch (IOException e) {

throw new RuntimeException(e);

}

}

}

小技巧:

可选不单独定义类,而是让业务层本身实现ConfirmCallbackService接口,简化书写操作

分离成策略类则更利于模块化、解耦和扩展。

2、回调上下文: 策略分发器

@Component

@RequiredArgsConstructor

@Slf4j

public class ConfirmCallbackContext {

/**

* 注入RabbitTemplate

*/

private final RabbitTemplate rabbitTemplate;

/**

* 注入所有ConfirmCallbackService的实现类

* 在不同的业务场景调用不同的实现来处理投递失败的业务逻辑

*/

private final Map<String,ConfirmCallbackService> confirmCallbackServiceMap;

/**

* 统一调用回调处理

* 在容器初始化就执行这个方法

*/

@PostConstruct

public void confirmCallback(){

rabbitTemplate.setConfirmCallback((cdata,ack,cause)->{

ReturnedMessage returnedMessage = cdata.getReturned();

if(ack){

log.info("The message was delivered to the{}",returnedMessage);

}else{

//获取业务实现的bean的id

String beanName = returnedMessage.getReplyText();

//根据bean的名称从map中获取相应的实现类

ConfirmCallbackService callbackService = confirmCallbackServiceMap.get(beanName);

callbackService.confirmCallback(returnedMessage.getMessage());

}

});

}

}

核心原理:

Spring Boot 会自动扫描所有实现 ConfirmCallbackService 的 Bean

Bean 名称作为 key,Bean 实例作为 value 注入到 Map<String, ConfirmCallbackService>

ConfirmCallbackContext 根据 replyText 动态找到对应的策略实现类

3.消息发送端封装

@Component

@RequiredArgsConstructor

public class RabbitManager<T> {

private final RabbitTemplate rabbitTemplate;

public void send(String exchange,String routingKey,

String callbackBeanName,T data){

try {

//创建cdata对象并设置一个id

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

//将投递的数据转换为byte[]

byte[] bytes = new ObjectMapper().writeValueAsBytes(data);

//将bytes封装为Message对象

Message message = new Message(bytes);

//创建一个投递失败时返回的消息对象

ReturnedMessage returnedMessage = new ReturnedMessage(message, 0,

callbackBeanName, exchange,routingKey);

//将ReturnedMesssage保存到cdata中

correlationData.setReturned(returnedMessage);

//发送

rabbitTemplate.convertAndSend(exchange,routingKey,data,correlationData);

} catch (Exception e) {

throw new RuntimeException(e);

}

}

}

** 关键点:**

callbackBeanName 会被放进 replyText 中,作为“回调策略的指针”。

4.点赞业务逻辑方法

4.1简化写法

@Override

public LikeDTO likeEssay(Integer uid, Integer eid) {

boolean likeStatus = false;

//如果缓存中存在用户id则取消点赞,不存在则添加用户id记录点赞

if(isLike(eid, uid)) {

//将用户ID从set集合中移除

redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid, uid);

} else {

likeStatus = true;

//将用户ID添加到set集合中

redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid, uid);

}

//获取当前帖子在redis中的点赞总数

Long likeCount = redisTemplate.opsForSet().size(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid);

//创建LikeDTO封装修改的数据并发布到消息队列

LikeDTO likeDTO = new LikeDTO(eid, uid, likeCount,likeStatus);

//发送到mq异步更新到数据库

rabbitManager.send(RabbitmqConfig.EXCHANGE_NAME, RabbitmqConfig.ROUTING_KEY,

"likeServiceImpl", likeDTO);

return likeDTO;

}

/**

* 消息投递失败后的处理

* @param message 失败后返回的消息

*/

@Override

public void confirmCallback(Message message) {

byte[] bytes = message.getBody();

try {

//反序列化为LikeDTO对象

LikeDTO dto = new ObjectMapper().readValue(bytes, LikeDTO.class);

//执行反向操作

if(dto.getLikeStatus()) {

redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + dto.getEid(), dto.getUid());

} else {

redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + dto.getEid(), dto.getUid());

}

} catch (IOException e) {

throw new RuntimeException(e);

}

}

4.2 有业务实现类时

````

public LikeDTO likeEssay(Integer uid, Integer eid) {

boolean likeStatus = false;

//如果缓存中存在用户id则取消点赞,不存在则添加用户id记录点赞

if(isLike(uid,eid)){

//取消点赞

redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+eid,uid.toString());

likeMapper.deleteLike(eid,uid);

}else{

likeStatus = true;

//将用户ID添加到set集合中

redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+eid,uid.toString());

}

//获取当前帖子在redis中的点赞总数

Long likeCount = redisTemplate.opsForSet().size(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid);

//创建LikeDTO封装修改的数据并发布到消息队列

LikeDTO likeDTO = new LikeDTO(eid, uid, likeCount,likeStatus);

//发送到mq异步更新到数据库

rabbitManager.send(RabbitmqConfig.EXCHANGE_NAME,RabbitmqConfig.ROUTING_KEY,

"likeConfirmCallbackService",likeDTO);

return likeDTO;

}

最终目标:当点赞消息从生产者发送到 RabbitMQ 时,一旦投递失败,系统能自动执行反向补偿逻辑,确保 Redis 与数据库的一致性。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/2 19:43:38

从“听得清”到“听得懂”:音频标注技术的演进

在人工智能的发展图谱中&#xff0c;让机器 “听见” 并解读世界&#xff0c;始终是一条充满挑战却意义深远的探索路径。 早期技术突破集中于一个明确目标 ——“听得清”&#xff0c;即实现声音信号向文字符号的高精度转化。然而&#xff0c;随着 AI 应用场景的持续拓展与深化…

作者头像 李华
网站建设 2026/4/6 20:18:21

FFXIV TexTools终极指南:5步打造专属游戏角色

FFXIV TexTools终极指南&#xff1a;5步打造专属游戏角色 【免费下载链接】FFXIV_TexTools_UI 项目地址: https://gitcode.com/gh_mirrors/ff/FFXIV_TexTools_UI 想要在《最终幻想14》中创造独一无二的个性化角色吗&#xff1f;FFXIV TexTools作为专业的游戏模型与贴图…

作者头像 李华
网站建设 2026/4/3 0:39:05

终极信息安全指南:快速上手NIST SP800-53中文翻译版

终极信息安全指南&#xff1a;快速上手NIST SP800-53中文翻译版 【免费下载链接】NISTSP800-53翻译稿 本开源项目提供了NIST SP800-53早期版本的中文翻译稿&#xff0c;致力于为信息安全领域的研究者和技术人员提供权威参考。翻译内容详尽准确&#xff0c;帮助用户深入理解信息…

作者头像 李华
网站建设 2026/4/3 11:34:13

如何快速配置Reader:面向新手的完整小说阅读器使用指南

如何快速配置Reader&#xff1a;面向新手的完整小说阅读器使用指南 【免费下载链接】Reader-v2.0.0.4-x64PC端小说阅读器工具下载 Reader是一款专为小说爱好者设计的绿色、开源、免费的阅读神器&#xff0c;致力于提供极致的阅读体验。本版本为v2.0.0.4&#xff0c;发布时间为2…

作者头像 李华