news 2026/4/1 2:09:15

宇树Java面试被问:RocketMQ事务消息的二阶段提交实现

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
宇树Java面试被问:RocketMQ事务消息的二阶段提交实现

一、核心概念理解

事务消息解决什么问题?

java

复制

下载

// 分布式事务典型问题:本地事务与消息发送的一致性 // 传统方式存在的问题: 1. 先发消息,后执行本地事务 → 消息发送成功但本地事务失败 2. 先执行本地事务,后发消息 → 本地事务成功但消息发送失败

RocketMQ事务消息的核心机制

text

复制

下载

Producer发送Half消息 → Broker存储Half消息 → 执行本地事务 ↓ Broker等待事务状态回查 ← Producer返回本地事务结果 ↓ 根据结果提交或回滚消息

二、两阶段提交详细流程

第一阶段:发送Half消息

java

复制

下载

public class TransactionProducer { public static void main(String[] args) throws Exception { // 1. 创建事务消息生产者 TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup"); producer.setNamesrvAddr("127.0.0.1:9876"); // 2. 设置事务监听器(核心) producer.setTransactionListener(new TransactionListener() { /** * 执行本地事务 * @param msg Half消息 * @param arg 业务参数 * @return 本地事务状态 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { // 执行本地数据库事务 boolean success = doLocalBusinessTransaction(msg, arg); if (success) { System.out.println("本地事务执行成功,提交消息"); return LocalTransactionState.COMMIT_MESSAGE; } else { System.out.println("本地事务执行失败,回滚消息"); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { System.out.println("本地事务执行异常,回查"); return LocalTransactionState.UNKNOW; } } /** * 事务回查(Broker主动查询事务状态) * @param msg Half消息 * @return 事务状态 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 根据业务ID查询本地事务状态 String transactionId = msg.getTransactionId(); boolean status = queryLocalTransactionStatus(transactionId); if (status) { System.out.println("事务回查:本地事务已提交"); return LocalTransactionState.COMMIT_MESSAGE; } else { System.out.println("事务回查:本地事务已回滚"); return LocalTransactionState.ROLLBACK_MESSAGE; } } }); // 3. 启动生产者 producer.start(); // 4. 发送事务消息 Message msg = new Message("TransactionTopic", "TagA", "Order-001".getBytes(StandardCharsets.UTF_8)); // 设置事务ID(关键) msg.setKeys("TXN-" + System.currentTimeMillis()); // 发送Half消息(第一阶段) SendResult sendResult = producer.sendMessageInTransaction(msg, // 业务参数,会在executeLocalTransaction中传递 new BusinessParam("orderId", "123456", 100.00) ); System.out.println("Half消息发送结果: " + sendResult.getSendStatus()); } }

篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc

需要全套面试笔记及答案
【点击此处即可/免费获取】

三、完整执行时序图

text

复制

下载

┌─────────┐ ┌────────┐ ┌────────┐ │ Producer│ │ Broker │ │ 本地DB │ └────┬────┘ └───┬────┘ └────┬───┘ │ 1.发送Half消息 │ │ │───────────────>│ │ │ │ │ │ │ 2.存储Half消息 │ │ │───────────────>│ │ │ │ │ 3.返回Half成功 │ │ │<───────────────│ │ │ │ │ │ 4.执行本地事务 │ │ │────────────────────────────────>│ │ │ │ │ 5.返回事务状态 │ │ │───────────────>│ │ │ │ │ │ │6.提交/回滚消息 │ │ │───────────────>│ │ │ │ │(可能)7.事务回查 │ │ │<───────────────│ │ │ │ │ │ 8.返回回查结果 │ │ │───────────────>│ │ │ │ │ │ │9.最终提交/回滚 │ │ │───────────────>│

四、关键配置参数

yaml

复制

下载

# Broker端配置 broker.conf: transactionCheckMax: 15 # 最大回查次数 transactionCheckInterval: 60000 # 回查间隔(ms) transactionTimeOut: 6000 # 超时时间(ms) # Producer端配置 producer: checkThreadPoolMinSize: 1 # 回查线程池最小 checkThreadPoolMaxSize: 1 # 回查线程池最大 checkRequestHoldMax: 2000 # 回查请求队列大小

五、代码实现最佳实践

1. 完整的订单事务示例

java

复制

下载

@Service public class OrderTransactionService { @Resource private OrderMapper orderMapper; @Resource private TransactionMQProducer transactionMQProducer; /** * 创建订单事务消息 */ public void createOrderWithTransaction(OrderDTO orderDTO) { // 构建消息 Message msg = new Message("ORDER_TOPIC", "CREATE", JSON.toJSONBytes(orderDTO)); // 设置业务标识 msg.setKeys("ORDER_" + orderDTO.getOrderNo()); msg.putUserProperty("businessType", "ORDER_CREATE"); // 发送事务消息 SendResult sendResult = transactionMQProducer.sendMessageInTransaction( msg, new OrderTransactionArg(orderDTO) ); if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) { throw new RuntimeException("Half消息发送失败"); } } /** * 事务监听器实现 */ @Component public class OrderTransactionListener implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { OrderTransactionArg transactionArg = (OrderTransactionArg) arg; OrderDTO orderDTO = transactionArg.getOrderDTO(); try { // 1. 保存订单到数据库 Order order = convertToOrder(orderDTO); orderMapper.insert(order); // 2. 扣减库存(调用库存服务) boolean deductResult = inventoryService.deductStock( orderDTO.getProductId(), orderDTO.getQuantity() ); if (!deductResult) { // 库存不足,回滚本地事务 orderMapper.deleteById(order.getId()); return LocalTransactionState.ROLLBACK_MESSAGE; } // 3. 记录事务日志(用于回查) transactionLogService.saveTransactionLog( msg.getTransactionId(), "ORDER_CREATE", order.getId(), LocalTransactionState.COMMIT_MESSAGE.name() ); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("订单本地事务执行异常", e); // 记录异常状态 transactionLogService.saveTransactionLog( msg.getTransactionId(), "ORDER_CREATE", null, LocalTransactionState.UNKNOW.name() ); return LocalTransactionState.UNKNOW; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 根据事务ID查询事务日志 String transactionId = msg.getTransactionId(); TransactionLog log = transactionLogService.getByTransactionId(transactionId); if (log == null) { // 没有事务记录,需要回滚 return LocalTransactionState.ROLLBACK_MESSAGE; } if ("COMMIT_MESSAGE".equals(log.getStatus())) { // 事务已提交 return LocalTransactionState.COMMIT_MESSAGE; } else { // 事务需要回滚 return LocalTransactionState.ROLLBACK_MESSAGE; } } } /** * 事务参数封装 */ @Data @AllArgsConstructor public static class OrderTransactionArg { private OrderDTO orderDTO; } }

2. 消费端幂等处理

java

复制

下载

@Component @RocketMQMessageListener( topic = "ORDER_TOPIC", consumerGroup = "ORDER_CONSUMER_GROUP" ) public class OrderConsumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt message) { // 1. 检查消息幂等性 String messageId = message.getMsgId(); if (redisTemplate.hasKey("MSG_" + messageId)) { log.info("消息已处理,跳过: {}", messageId); return; } // 2. 解析消息 OrderDTO orderDTO = JSON.parseObject(message.getBody(), OrderDTO.class); // 3. 业务处理 try { // 更新订单状态为已确认 orderService.confirmOrder(orderDTO.getOrderNo()); // 4. 记录已处理消息 redisTemplate.opsForValue().set( "MSG_" + messageId, "PROCESSED", 1, TimeUnit.HOURS ); } catch (Exception e) { log.error("订单处理失败,将重试", e); throw new RuntimeException(e); } } }

六、面试问题回答要点

问题:RocketMQ事务消息如何实现二阶段提交?

回答结构:

  1. 概念解释

    • "RocketMQ事务消息通过二阶段提交保证分布式事务的最终一致性"

    • "核心思想:将本地事务和消息发送绑定,通过Half消息和状态回查机制实现"

  2. 第一阶段(Half消息阶段)

    • "Producer发送Half消息到Broker,Broker存储但不对Consumer可见"

    • "Half消息发送成功后,执行本地事务"

    • "本地事务执行结果返回给Broker:COMMIT、ROLLBACK或UNKNOWN"

篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc

需要全套面试笔记及答案
【点击此处即可/免费获取】​​​

  1. 第二阶段(状态确认阶段)

    • "如果本地事务返回COMMIT/ROLLBACK,Broker立即提交/回滚消息"

    • "如果返回UNKNOWN,Broker会发起事务状态回查"

    • "Producer实现TransactionListener.checkLocalTransaction()进行状态查询"

  2. 关键机制

    • "事务状态回查:解决网络超时或生产者宕机问题"

    • "消息幂等性:消费端需要处理重复消息"

    • "超时机制:超过配置时间未确认的消息会自动回滚"

  3. 代码示例

    java

    复制 下载
    // 简要展示核心代码结构 producer.setTransactionListener(new TransactionListener() { public LocalTransactionState executeLocalTransaction(...) { // 执行本地业务 } public LocalTransactionState checkLocalTransaction(...) { // 状态回查 } });
  4. 适用场景

    • "订单创建+通知库存"

    • "支付成功+发送通知"

    • "任何需要保证本地事务和消息发送一致性的场景"

  5. 注意事项

    • "事务消息不支持定时和批量消息"

    • "确保checkLocalTransaction方法的幂等性"

    • "合理配置回查次数和间隔"

面试加分项

  • 提到"最大努力通知型事务"

  • 对比TCC、Saga等分布式事务方案

  • 强调消息幂等处理的重要性

  • 提及RocketMQ 4.3+的事务消息优化

这样的回答既展示了理论知识,又体现了实际编码能力,适合中高级Java岗位面试。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/28 21:33:57

万和制药和安胶囊,营养补充与常见感冒药联用更安心

感冒期间&#xff0c;人体代谢加快&#xff0c;营养消耗增加&#xff0c;很多人会在服用感冒药的同时&#xff0c;选择营养补充剂提升抵抗力&#xff0c;但用药安全是核心考量。深圳万和制药的和安复方氨基酸胶囊&#xff08;8-11&#xff09;&#xff0c;与常见感冒药联用的安…

作者头像 李华
网站建设 2026/3/23 17:39:45

【Java并发】ForkJoinPool和ThreadPoolExecutor的区别

【Java并发】ForkJoinPool和ThreadPoolExecutor的区别1、ForkJoinPool和ThreadPoolExecutor的区别2、为什么CompletableFuture使用ForkJoinPool&#xff1f;1、ForkJoinPool和ThreadPoolExecutor的区别 ForkJoinPool和ExecutorService都是Java中常用的线程池的实现&#xff0c…

作者头像 李华
网站建设 2026/3/24 2:54:25

第 4 篇:策略模式 (Strategy) —— 算法的热插拔艺术

专栏导读&#xff1a;你是否遇到过这种崩溃瞬间&#xff1a;产品卖给 A 客户要用 Modbus 协议&#xff0c;卖给 B 客户要用私有协议&#xff0c;卖给 C 客户要加密传输。你的代码里是不是充斥着无数的 #ifdef CUSTOMER_A 或者 if (mode 1)&#xff1f; 策略模式教你用 C 语言实…

作者头像 李华
网站建设 2026/3/30 21:58:35

Sentinel 流控配置案例:两次请求的时间间隔必须在3秒以上

要实现“两次请求的时间间隔必须在3秒以上”这种需求&#xff0c;需要根据具体的业务场景选择合适的Sentinel配置方式。本文博主将介绍几种实现方案&#xff1a; 方案1&#xff1a;使用排队等待流控效果&#xff08;最接近需求&#xff09; # Sentinel规则配置&#xff08;对…

作者头像 李华
网站建设 2026/3/28 7:47:40

告别低效内耗!这款CRM营销管理系统,让职场人少走80%的弯路

你是否每天被这些职场难题裹挟&#xff0c;身心俱疲却业绩平平&#xff1f; ✅ 营销获客像“大海捞针”&#xff1a;砸钱投渠道、熬夜做推广&#xff0c;线索来了却杂乱无章&#xff0c;高价值客户藏在信息堆里无人问津&#xff0c;获客成本居高不下&#xff0c;转化效果惨不忍…

作者头像 李华