视频看了几百小时还迷糊?关注我,几分钟让你秒懂!
在使用 RabbitMQ 时,你是否遇到过以下问题?
- 消息发出去了,但消费者好像“没反应”;
- 系统突然变慢,CPU 不高,但订单一直不处理;
- 重启服务后,大量消息重新被消费,疑似重复处理……
这些问题,很可能是因为“未确认消息”(Unacked Messages)堆积导致的!
今天我们就用Spring Boot + Java,结合 RabbitMQ 管理界面和代码手段,手把手教你如何监控、分析和解决 Unacked 消息问题,小白也能轻松上手!
🧩 一、什么是 “Unacked 消息”?
在 RabbitMQ 中,消息从队列到消费者的过程分为三个状态:
| 状态 | 说明 |
|---|---|
| Ready | 消息在队列中,等待被消费 |
| Unacked | 消息已被推送给消费者,但尚未收到 ACK 确认 |
| Acked | 消息已被成功确认,从队列中移除 |
✅Unacked = 已发送但未确认的消息
⚠️ 如果 Unacked 长期不减少,说明消费者“卡住了”或“崩溃了”!
🔍 二、为什么 Unacked 消息会堆积?
常见原因包括:
- 消费者处理太慢(如数据库慢、外部 API 超时);
- prefetch 设置过大,一个消费者拉取太多消息,处理不过来;
- 手动 ACK 忘记调用(比如异常没捕获,没执行
basicAck); - 消费者进程挂掉,但连接未正常关闭,RabbitMQ 还在等 ACK;
- 线程阻塞或死锁,导致 ACK 无法发出。
📊 三、方法一:通过 RabbitMQ 管理界面监控(最直观!)
步骤 1:启用管理插件(如未开启)
rabbitmq-plugins enable rabbitmq_management默认访问地址:http://localhost:15672
账号密码默认:guest / guest
步骤 2:查看队列详情
进入Queues标签页,找到你的队列(如order.queue),你会看到类似:
Messages: 1000 - Ready: 200 - Unacked: 800 ← 这里就是未确认消息数!🔴 如果Unacked 数量持续增长或长期不为 0,说明消费异常!
步骤 3:查看消费者连接状态
点击队列名,往下拉可以看到Consumers列表:
- 检查每个消费者的“Ack required”是否为
true; - 查看“Unacked message count”字段,确认哪个消费者囤积了消息;
- 如果消费者状态是
idle但 Unacked 很高,说明它“卡住”了!
💻 四、方法二:通过 Spring Boot 代码主动监控(高级用法)
虽然管理界面很直观,但在生产环境中,我们往往需要程序化监控,比如告警或自动扩缩容。
方案:使用 RabbitMQ HTTP API 获取队列信息
1. 添加依赖(用于调用 HTTP API)
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>2. 编写监控服务类
@Service public class RabbitMqMonitorService { private static final String RABBITMQ_API_URL = "http://localhost:15672/api/queues/%s/%s"; private static final String VHOST = "%2F"; // 默认 vhost 是 "/",需 URL 编码为 %2F @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; public Map<String, Object> getQueueStats(String queueName) { String url = String.format(RABBITMQ_API_URL, VHOST, queueName); RestTemplate restTemplate = new RestTemplate(); restTemplate.getInterceptors().add((request, body, execution) -> { String auth = username + ":" + password; String encodedAuth = Base64.getEncoder().encodeToString(auth.getBytes()); request.getHeaders().add("Authorization", "Basic " + encodedAuth); return execution.execute(request, body); }); try { ResponseEntity<Map> response = restTemplate.getForEntity(url, Map.class); return response.getBody(); } catch (Exception e) { throw new RuntimeException("Failed to fetch RabbitMQ queue stats", e); } } public int getUnackedCount(String queueName) { Map<String, Object> stats = getQueueStats(queueName); return (int) stats.getOrDefault("messages_unacknowledged", 0); } }3. 定时检查 + 告警(示例)
@Component public class UnackedMessageChecker { @Autowired private RabbitMqMonitorService monitorService; private static final String QUEUE_NAME = "order.queue"; private static final int THRESHOLD = 50; // 超过50条未确认就告警 @Scheduled(fixedRate = 30_000) // 每30秒检查一次 public void checkUnackedMessages() { int unacked = monitorService.getUnackedCount(QUEUE_NAME); if (unacked > THRESHOLD) { System.err.println("⚠️ 警告:队列 " + QUEUE_NAME + " 有 " + unacked + " 条未确认消息!"); // 这里可以集成企业微信、钉钉、邮件等告警 } else { System.out.println("✅ 队列 " + QUEUE_NAME + " Unacked: " + unacked); } } }✅ 启动类记得加
@EnableScheduling
🛠 五、方法三:日志 + 指标埋点(开发阶段推荐)
在消费者代码中记录关键日志:
@RabbitListener(queues = "order.queue") public void handleMessage(Message message, Channel channel) throws Exception { long tag = message.getMessageProperties().getDeliveryTag(); log.info("📥 收到消息,deliveryTag={}, 开始处理...", tag); try { // 业务处理 doBusiness(message); channel.basicAck(tag, false); log.info("✅ 消息处理完成并ACK,deliveryTag={}", tag); } catch (Exception e) { log.error("❌ 消息处理失败,deliveryTag={}", tag, e); channel.basicNack(tag, false, true); } }配合日志系统(如 ELK、Loki),你可以:
- 统计“收到消息”和“ACK 成功”的数量差;
- 发现哪些
deliveryTag卡住了; - 快速定位异常堆栈。
⚠️ 六、反例:忽略 Unacked 监控的后果
场景模拟:
prefetch=100,concurrency=1;- 消费者处理第 1 条消息时发生死循环;
- RabbitMQ 已经把 100 条消息全部推给这个消费者;
- 结果:99 条消息处于 Unacked 状态,其他消费者无法接手;
- 整个队列“假死”,新消息不断堆积,Ready 越来越多!
💥 用户下单无响应,运维却看不出 CPU 或内存异常——典型的“静默故障”!
✅ 七、最佳实践建议
| 项目 | 建议 |
|---|---|
| prefetch | 设为 5~15,避免单个消费者囤积过多 |
| ACK 模式 | 必须用manual,确保业务成功才 ACK |
| 监控频率 | 生产环境每 30 秒~1 分钟检查一次 Unacked |
| 告警阈值 | 根据业务量设定,如超过并发数 × prefetch 的 80% |
| 消费者健康检查 | 结合 Spring Boot Actuator 暴露/actuator/health |
🎯 总结
- Unacked 消息 = 已发送但未确认的消息;
- 长期堆积 = 消费者异常或配置不当;
- 监控手段:
- ✅ RabbitMQ 管理界面(最简单);
- ✅ HTTP API + 代码告警(适合自动化);
- ✅ 日志埋点(适合开发调试);
- 核心原则:及时 ACK,合理 prefetch,实时监控!
只要盯住 “Unacked” 这个指标,你就掌握了 RabbitMQ 消费健康的“脉搏”!
视频看了几百小时还迷糊?关注我,几分钟让你秒懂!