与Kafka类似,RocketMQ也支持基于队列(分区)的顺序消费机制。具体表现为:同一队列内的消息保证有序,而不同队列间的消息则是无序的。
实现顺序消息发送时,生产者需在send方法中传入MessageQueueSelector。该接口的select方法用于确定消息投递的目标队列,常见实现方式是采用取模路由策略:
SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);注意必须使用同步发送方式确保顺序性。
消费者端通过MessageListenerOrderly模式实现顺序消费:
consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { System.out.printf("Receive order msg:" + new String(msgs.get(0).getBody())); return ConsumeOrderlyStatus.SUCCESS; } });顺序消费通过三级加锁机制保障:
- Broker级锁:确保消息只投递给特定消费者
- MessageQueue锁:保证单线程处理队列消息
- ProcessQueue锁:防止重平衡时的重复消费
扩展说明: 第三把锁主要应对消费者集群重平衡场景。当队列需要重新分配时,该锁确保正在处理的消息能完成消费并提交位点,避免新消费者重复消费。若不加此锁,可能导致位点未提交的消息被重复处理。
需注意:顺序消费会降低系统吞吐量,且存在消息阻塞传递效应,应谨慎使用。