一、核心概念理解
事务消息解决什么问题?
java
复制
下载
// 分布式事务典型问题:本地事务与消息发送的一致性 // 传统方式存在的问题: 1. 先发消息,后执行本地事务 → 消息发送成功但本地事务失败 2. 先执行本地事务,后发消息 → 本地事务成功但消息发送失败
RocketMQ事务消息的核心机制
text
复制
下载
Producer发送Half消息 → Broker存储Half消息 → 执行本地事务 ↓ Broker等待事务状态回查 ← Producer返回本地事务结果 ↓ 根据结果提交或回滚消息
二、两阶段提交详细流程
第一阶段:发送Half消息
java
复制
下载
public class TransactionProducer { public static void main(String[] args) throws Exception { // 1. 创建事务消息生产者 TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup"); producer.setNamesrvAddr("127.0.0.1:9876"); // 2. 设置事务监听器(核心) producer.setTransactionListener(new TransactionListener() { /** * 执行本地事务 * @param msg Half消息 * @param arg 业务参数 * @return 本地事务状态 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { // 执行本地数据库事务 boolean success = doLocalBusinessTransaction(msg, arg); if (success) { System.out.println("本地事务执行成功,提交消息"); return LocalTransactionState.COMMIT_MESSAGE; } else { System.out.println("本地事务执行失败,回滚消息"); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { System.out.println("本地事务执行异常,回查"); return LocalTransactionState.UNKNOW; } } /** * 事务回查(Broker主动查询事务状态) * @param msg Half消息 * @return 事务状态 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 根据业务ID查询本地事务状态 String transactionId = msg.getTransactionId(); boolean status = queryLocalTransactionStatus(transactionId); if (status) { System.out.println("事务回查:本地事务已提交"); return LocalTransactionState.COMMIT_MESSAGE; } else { System.out.println("事务回查:本地事务已回滚"); return LocalTransactionState.ROLLBACK_MESSAGE; } } }); // 3. 启动生产者 producer.start(); // 4. 发送事务消息 Message msg = new Message("TransactionTopic", "TagA", "Order-001".getBytes(StandardCharsets.UTF_8)); // 设置事务ID(关键) msg.setKeys("TXN-" + System.currentTimeMillis()); // 发送Half消息(第一阶段) SendResult sendResult = producer.sendMessageInTransaction(msg, // 业务参数,会在executeLocalTransaction中传递 new BusinessParam("orderId", "123456", 100.00) ); System.out.println("Half消息发送结果: " + sendResult.getSendStatus()); } }篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc
需要全套面试笔记及答案
【点击此处即可/免费获取】
三、完整执行时序图
text
复制
下载
┌─────────┐ ┌────────┐ ┌────────┐ │ Producer│ │ Broker │ │ 本地DB │ └────┬────┘ └───┬────┘ └────┬───┘ │ 1.发送Half消息 │ │ │───────────────>│ │ │ │ │ │ │ 2.存储Half消息 │ │ │───────────────>│ │ │ │ │ 3.返回Half成功 │ │ │<───────────────│ │ │ │ │ │ 4.执行本地事务 │ │ │────────────────────────────────>│ │ │ │ │ 5.返回事务状态 │ │ │───────────────>│ │ │ │ │ │ │6.提交/回滚消息 │ │ │───────────────>│ │ │ │ │(可能)7.事务回查 │ │ │<───────────────│ │ │ │ │ │ 8.返回回查结果 │ │ │───────────────>│ │ │ │ │ │ │9.最终提交/回滚 │ │ │───────────────>│
四、关键配置参数
yaml
复制
下载
# Broker端配置 broker.conf: transactionCheckMax: 15 # 最大回查次数 transactionCheckInterval: 60000 # 回查间隔(ms) transactionTimeOut: 6000 # 超时时间(ms) # Producer端配置 producer: checkThreadPoolMinSize: 1 # 回查线程池最小 checkThreadPoolMaxSize: 1 # 回查线程池最大 checkRequestHoldMax: 2000 # 回查请求队列大小
五、代码实现最佳实践
1. 完整的订单事务示例
java
复制
下载
@Service public class OrderTransactionService { @Resource private OrderMapper orderMapper; @Resource private TransactionMQProducer transactionMQProducer; /** * 创建订单事务消息 */ public void createOrderWithTransaction(OrderDTO orderDTO) { // 构建消息 Message msg = new Message("ORDER_TOPIC", "CREATE", JSON.toJSONBytes(orderDTO)); // 设置业务标识 msg.setKeys("ORDER_" + orderDTO.getOrderNo()); msg.putUserProperty("businessType", "ORDER_CREATE"); // 发送事务消息 SendResult sendResult = transactionMQProducer.sendMessageInTransaction( msg, new OrderTransactionArg(orderDTO) ); if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) { throw new RuntimeException("Half消息发送失败"); } } /** * 事务监听器实现 */ @Component public class OrderTransactionListener implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { OrderTransactionArg transactionArg = (OrderTransactionArg) arg; OrderDTO orderDTO = transactionArg.getOrderDTO(); try { // 1. 保存订单到数据库 Order order = convertToOrder(orderDTO); orderMapper.insert(order); // 2. 扣减库存(调用库存服务) boolean deductResult = inventoryService.deductStock( orderDTO.getProductId(), orderDTO.getQuantity() ); if (!deductResult) { // 库存不足,回滚本地事务 orderMapper.deleteById(order.getId()); return LocalTransactionState.ROLLBACK_MESSAGE; } // 3. 记录事务日志(用于回查) transactionLogService.saveTransactionLog( msg.getTransactionId(), "ORDER_CREATE", order.getId(), LocalTransactionState.COMMIT_MESSAGE.name() ); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("订单本地事务执行异常", e); // 记录异常状态 transactionLogService.saveTransactionLog( msg.getTransactionId(), "ORDER_CREATE", null, LocalTransactionState.UNKNOW.name() ); return LocalTransactionState.UNKNOW; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 根据事务ID查询事务日志 String transactionId = msg.getTransactionId(); TransactionLog log = transactionLogService.getByTransactionId(transactionId); if (log == null) { // 没有事务记录,需要回滚 return LocalTransactionState.ROLLBACK_MESSAGE; } if ("COMMIT_MESSAGE".equals(log.getStatus())) { // 事务已提交 return LocalTransactionState.COMMIT_MESSAGE; } else { // 事务需要回滚 return LocalTransactionState.ROLLBACK_MESSAGE; } } } /** * 事务参数封装 */ @Data @AllArgsConstructor public static class OrderTransactionArg { private OrderDTO orderDTO; } }2. 消费端幂等处理
java
复制
下载
@Component @RocketMQMessageListener( topic = "ORDER_TOPIC", consumerGroup = "ORDER_CONSUMER_GROUP" ) public class OrderConsumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt message) { // 1. 检查消息幂等性 String messageId = message.getMsgId(); if (redisTemplate.hasKey("MSG_" + messageId)) { log.info("消息已处理,跳过: {}", messageId); return; } // 2. 解析消息 OrderDTO orderDTO = JSON.parseObject(message.getBody(), OrderDTO.class); // 3. 业务处理 try { // 更新订单状态为已确认 orderService.confirmOrder(orderDTO.getOrderNo()); // 4. 记录已处理消息 redisTemplate.opsForValue().set( "MSG_" + messageId, "PROCESSED", 1, TimeUnit.HOURS ); } catch (Exception e) { log.error("订单处理失败,将重试", e); throw new RuntimeException(e); } } }六、面试问题回答要点
问题:RocketMQ事务消息如何实现二阶段提交?
回答结构:
概念解释
"RocketMQ事务消息通过二阶段提交保证分布式事务的最终一致性"
"核心思想:将本地事务和消息发送绑定,通过Half消息和状态回查机制实现"
第一阶段(Half消息阶段)
"Producer发送Half消息到Broker,Broker存储但不对Consumer可见"
"Half消息发送成功后,执行本地事务"
"本地事务执行结果返回给Broker:COMMIT、ROLLBACK或UNKNOWN"
篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc
需要全套面试笔记及答案
【点击此处即可/免费获取】
第二阶段(状态确认阶段)
"如果本地事务返回COMMIT/ROLLBACK,Broker立即提交/回滚消息"
"如果返回UNKNOWN,Broker会发起事务状态回查"
"Producer实现TransactionListener.checkLocalTransaction()进行状态查询"
关键机制
"事务状态回查:解决网络超时或生产者宕机问题"
"消息幂等性:消费端需要处理重复消息"
"超时机制:超过配置时间未确认的消息会自动回滚"
代码示例
java
复制 下载// 简要展示核心代码结构 producer.setTransactionListener(new TransactionListener() { public LocalTransactionState executeLocalTransaction(...) { // 执行本地业务 } public LocalTransactionState checkLocalTransaction(...) { // 状态回查 } });适用场景
"订单创建+通知库存"
"支付成功+发送通知"
"任何需要保证本地事务和消息发送一致性的场景"
注意事项
"事务消息不支持定时和批量消息"
"确保checkLocalTransaction方法的幂等性"
"合理配置回查次数和间隔"
面试加分项
提到"最大努力通知型事务"
对比TCC、Saga等分布式事务方案
强调消息幂等处理的重要性
提及RocketMQ 4.3+的事务消息优化
这样的回答既展示了理论知识,又体现了实际编码能力,适合中高级Java岗位面试。