news 2026/5/27 1:01:55

消息队列顺序性保证实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
消息队列顺序性保证实战

消息队列顺序性保证实战

一、消息顺序性概述

消息队列的顺序性是指消息按照发送顺序被消费的特性,在金融交易、订单处理等场景至关重要。

1.1 顺序性问题场景

┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Producer │────▶│ Queue │────▶│ Consumer │ │ (生产者) │ │ (队列) │ │ (消费者) │ └─────────────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ ▼ │ ┌─────────────┐ │ │ Consumer 1 │ │ │ 消费消息1 │ │ └─────────────┘ │ │ │ ▼ │ ┌─────────────┐ │ │ Consumer 2 │ │ │ 消费消息2 │ │ └─────────────┘ │ ▼ 问题:消息1和消息2顺序可能错乱

1.2 顺序性破坏原因

原因说明
多分区同一主题多个分区,消息可能发送到不同分区
多消费者多个消费者并行消费,处理速度不同
重试机制消息重试可能打乱顺序
网络延迟网络波动导致消息到达顺序变化

二、Kafka顺序性保证

2.1 单分区单消费者

apiVersion: apps/v1 kind: Deployment metadata: name: kafka-consumer spec: replicas: 1 template: spec: containers: - name: consumer image: kafka-consumer:1.0.0 env: - name: KAFKA_TOPIC value: "orders" - name: KAFKA_GROUP_ID value: "order-group" - name: KAFKA_PARTITION value: "0"

2.2 分区键策略

public class OrderProducer { @Autowired private KafkaTemplate<String, Order> kafkaTemplate; public void sendOrder(Order order) { // 使用订单ID的哈希值作为分区键 String key = String.valueOf(order.getUserId()); kafkaTemplate.send("orders", key, order); } }

2.3 自定义分区器

public class OrderPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { String orderKey = (String) key; int numPartitions = cluster.partitionCountForTopic(topic); return Math.abs(orderKey.hashCode()) % numPartitions; } @Override public void close() {} @Override public void configure(Map<String, ?> configs) {} }

三、RocketMQ顺序性保证

3.1 同步发送

public class RocketMQProducer { private DefaultMQProducer producer; public void sendOrderMessage(Order order) throws Exception { Message message = new Message( "OrderTopic", "OrderTag", order.getOrderId(), JSON.toJSONBytes(order) ); // 同步发送,保证顺序 SendResult result = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> queues, Message msg, Object arg) { Long orderId = (Long) arg; int index = (int) (orderId % queues.size()); return queues.get(index); } }, order.getOrderId()); } }

3.2 顺序消费

public class RocketMQConsumer { private DefaultMQPushConsumer consumer; public void consumeOrderMessages() throws Exception { consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { Order order = JSON.parseObject(msg.getBody(), Order.class); processOrder(order); } return ConsumeOrderlyStatus.SUCCESS; } }); } }

四、RabbitMQ顺序性保证

4.1 单队列单消费者

@Component public class RabbitMQConsumer { @RabbitListener(queues = "order-queue", concurrency = "1") public void consumeOrder(Order order) { processOrder(order); } }

4.2 消息优先级

public class RabbitMQProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendOrderWithPriority(Order order, int priority) { MessageProperties props = new MessageProperties(); props.setPriority(priority); Message message = new Message( JSON.toJSONBytes(order), props ); rabbitTemplate.send("order-exchange", "order-routing-key", message); } }

五、消息顺序性最佳实践

5.1 业务层面保证

public class OrderService { @Transactional public void processOrders(List<Order> orders) { // 按订单ID排序 orders.sort(Comparator.comparingLong(Order::getOrderId)); for (Order order : orders) { // 处理订单逻辑 processOrder(order); } } }

5.2 消息去重

@Component public class MessageDeduplicationService { @Autowired private RedisTemplate<String, Object> redisTemplate; private static final String PREFIX = "msg:dedupe:"; public boolean isDuplicate(String messageId) { String key = PREFIX + messageId; Boolean exists = redisTemplate.hasKey(key); if (Boolean.TRUE.equals(exists)) { return true; } redisTemplate.opsForValue().set(key, "true", 24, TimeUnit.HOURS); return false; } }

5.3 死信队列

# RabbitMQ死信队列配置 spring: rabbitmq: listener: simple: retry: enabled: true max-attempts: 3 initial-interval: 1000ms template: reply-timeout: 5000ms # 死信交换机和队列 @Configuration public class DeadLetterConfig { @Bean public Queue deadLetterQueue() { return QueueBuilder.durable("dead-letter-queue").build(); } @Bean public Exchange deadLetterExchange() { return ExchangeBuilder.directExchange("dead-letter-exchange").durable(true).build(); } }

六、顺序性验证

6.1 测试用例

@Test void testMessageOrder() { // 发送100条有序消息 for (int i = 0; i < 100; i++) { producer.send("test-topic", String.valueOf(i), "message-" + i); } // 收集消费结果 List<String> received = consumer.receiveAll(); // 验证顺序 for (int i = 0; i < received.size(); i++) { assertEquals("message-" + i, received.get(i)); } }

6.2 性能测试

# 使用kafka-producer-perf-test测试 kafka-producer-perf-test.sh \ --topic orders \ --num-records 100000 \ --record-size 1024 \ --throughput 10000 \ --producer-props bootstrap.servers=kafka:9092

七、总结

消息顺序性保证策略:

  1. 单分区单消费者:最简单的顺序保证方式
  2. 分区键策略:按业务键哈希分配分区
  3. 同步发送:确保消息按顺序发送
  4. 顺序消费:使用消息队列的顺序消费特性
  5. 业务层面排序:消费后再排序确保顺序

根据业务场景选择合适的顺序性保证方案。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/27 1:01:11

基于HTTP 418与AI的智能茶壶:前端开发与API安全实践

1. 项目概述&#xff1a;一个“叛逆”的智能茶壶最近在DEV社区参加了一个挺有意思的愚人节挑战赛&#xff0c;主题是“无用的发明”。我琢磨着&#xff0c;既然要“无用”&#xff0c;那就得把“无用”做到极致&#xff0c;还得带点幽默和讽刺。于是&#xff0c;我动手做了一个…

作者头像 李华
网站建设 2026/5/27 0:53:08

LangGraph多智能体协作效率:从理论模型到工程实践的量化分析

LangGraph多智能体协作效率&#xff1a;从理论模型到工程实践的量化分析 副标题&#xff1a;构建高吞吐量、低延迟、可解释的工业级智能体系统全链路指南摘要/引言 问题陈述 在大语言模型&#xff08;LLM&#xff09;驱动的智能体系统&#xff08;Multi-Agent System, MAS&…

作者头像 李华
网站建设 2026/5/27 0:42:58

Python类的本质:从运行时对象到生产级设计

我试过很多次教新手理解 Python 类——不是照着文档念定义&#xff0c;而是让他们真正“摸到”类的形状。你打开 Python 解释器输入type(42)&#xff0c;它回你<class int>&#xff1b;输type("hello")&#xff0c;回<class str>&#xff1b;哪怕你写个空…

作者头像 李华
网站建设 2026/5/27 0:42:55

A2UI框架:构建确定性AI Agent交互,实现机器可读与透明化决策

1. 项目概述&#xff1a;从“黑盒”到“白盒”的确定性交互革命如果你在过去几年里深度参与过任何与AI Agent相关的项目&#xff0c;大概率都经历过这样的场景&#xff1a;你精心设计了一个功能强大的智能体&#xff0c;它集成了最新的语言模型、配备了丰富的工具链&#xff0c…

作者头像 李华
网站建设 2026/5/27 0:39:13

D5017UK,175MHz下150W高功率与10dB高增益的完美结合

简介今天我要向大家介绍的是 Semelab 的硅 DMOS RF FET 晶体管——D5017UK。这是一款专为 HF/VHF/UHF 通信频段&#xff08;1 MHz 至 175 MHz&#xff09;设计的单端式射频功率场效应管&#xff0c;在 50V 工作电压、175 MHz 频率下可提供 150W 的输出功率。作为一款高性能射频…

作者头像 李华