之前聊过 Disruptor(高性能队列 Disruptor),它的性能逆天,但有个致命问题:纯内存,进程挂了数据就丢了。
Chronicle Queue 就是来解决这个问题的——持久化的 Disruptor。
解决什么问题
Disruptor 的问题: - 进程崩溃 → 数据全丢 - 重启后无法恢复之前的消息 - 只能作为临时缓冲 Chronicle Queue 的改进: - 数据落盘,进程重启不丢失 - 支持消息重放(Replay) - 保留了 Disruptor 的高性能同样的团队(LMAX),同样的设计理念,只是一个加了持久化。
核心原理
MMAP 内存映射
Chronicle Queue 之所以快,关键在于MMAP(内存映射):
// 传统文件写入write(fd,data,size);// 用户态 → 内核态 → 磁盘// MMAP 写入memcpy(buffer,data,size);// 直接写内存,由 OS 异步刷盘MMAP 把磁盘文件映射到内存,写入像写内存一样快,操作系统会自己在后台刷盘。
文件结构
chronicle-queue/ ├── 20190101.data # cycle 文件,约 64MB ├── 20190102.data ├── 20190103.data └── 20190104.data 每个文件叫做一个 cycle,写满自动切换下一个数据写入流程
// 1. 写入数据到 MMAP 内存appender.writeText("订单创建: orderId=1001");// 2. 数据立即可读(内存读取)Stringmsg=tailer.readText();// 3. OS 在后台异步刷盘(不需要等待)快速开始
引入依赖
<dependency><groupId>net.openhft</groupId><artifactId>chronicle_queue</artifactId><version>5.22.4</version></dependency>基础用法
publicclassChronicleQueueDemo{publicstaticvoidmain(String[]args){Stringpath="./data/orders";// 1. 创建队列(单机模式)ChronicleQueuequeue=ChronicleQueueBuilder.single(path)// 单文件模式.build();// 2. 获取写入器ExcerptAppenderappender=queue.acquireAppender();// 3. 写入消息(支持多种格式)appender.writeText("订单1: 100元");appender.writeText("订单2: 200元");appender.writeText("订单3: 300元");// 4. 获取读取器ExcerptTailertailer=queue.createTailer();// 5. 读取所有消息while(true){Stringtext=tailer.readText();if(text==null){break;// 没有更多消息}System.out.println("收到: "+text);}queue.close();}}输出
收到: 订单1: 100元 收到: 订单2: 200元 收到: 订单3: 300元写入/读取二进制数据
Text 方便但不类型安全,实际项目推荐用二进制:
// 1. 定义 Order 消息@DatapublicclassOrder{privatelongorderId;privatedoubleamount;privateStringstatus;}// 2. 使用 BinaryWire 写入ChronicleQueuequeue=ChronicleQueueBuilder.single("./data/orders").wireType(WireType.BINARY).build();ExcerptAppenderappender=queue.acquireAppender();// 写入appender.writeDocument(w->w.write("orderId").int64(1001).write("amount").float64(299.00).write("status").text("pending"));// 读取ExcerptTailertailer=queue.createTailer();while(tailer.readDocument(r->{longorderId=r.read("orderId").int64();doubleamount=r.read("amount").float64();Stringstatus=r.read("status").text();System.out.println(orderId+", "+amount+", "+status);})){// 继续读取}服务重启后数据恢复
这是 Chronicle Queue 最厉害的地方——重启后消息还在:
publicclassDataRecoveryDemo{publicstaticvoidmain(String[]args){Stringpath="./data/orders";// 第一次运行:写入数据writeData(path);// 模拟进程重启...// 第二次运行:读取数据(刚才写的还在)readData(path);}privatestaticvoidwriteData(Stringpath){ChronicleQueuequeue=ChronicleQueueBuilder.single(path).build();ExcerptAppenderappender=queue.acquireAppender();for(inti=0;i<100;i++){appender.writeText("订单 "+i);}queue.close();System.out.println("写入完成,进程退出");}privatestaticvoidreadData(Stringpath){ChronicleQueuequeue=ChronicleQueueBuilder.single(path).build();ExcerptTailertailer=queue.createTailer();intcount=0;while(true){Stringtext=tailer.readText();if(text==null)break;count++;}System.out.println("重启后读到 "+count+" 条消息");queue.close();}}输出:
写入完成,进程退出 重启后读到 100 条消息异步写入
Chronicle Queue 默认是同步写入内存(但不刷盘),如果需要更高吞吐:
// 异步写入(不等待)appender.writeText("订单",()->"异步订单数据");// 或者批量写入for(inti=0;i<10000;i++){appender.writeText("订单 "+i);// 先写入到内存,积累一定量后批量刷盘}指定读取位置
有时候不需要从头读,只想从某个时间点开始:
// 从指定时间开始读取longfromTime=System.currentTimeMillis()-3600_000;// 1小时前ExcerptTailertailer=queue.createTailer().toStart()// 从头开始// 或// .toEnd() // 只读新消息// .to(1234567890L) // 读到指定位置;// 遍历while(true){Stringtext=tailer.readText();if(text==null)break;// 处理...}为什么不都用 Chronicle Queue
说了这么多好处,为什么大家还是在用 Kafka?
Chronicle Queue 的局限:
- 不能分布式 - 只能单机玩,想扩容?没门
- 不支持多消费者 - 一条消息只能被一个进程拿走
- 数据堆积有限 - 单机磁盘多大,它就能存多少
- 没有生态 - 没有管理后台、没有监控面板
说白了:
Chronicle Queue 就像一辆超跑,赛道无敌,但上不了高速(不能分布式)。
Kafka 就像 SUV,什么都能干,虽然跑赛道不如超跑,但胜在能拉人能越野(支持集群、广播、海量数据)。
选谁:
- 同机器进程间通信、高性能低延迟 → Chronicle Queue
- 跨机器、分布式、海量数据 → Kafka/RabbitMQ
两者不是一个赛道的,没有谁取代谁的说法。
注意事项
1. 磁盘空间
数据不会自动清理,需要定期清理旧 cycle 文件:
# 手动清理 3 天前的文件find./data/orders-name"*.data"-mtime+3-delete# 或配置 Chronicle Queue 自动清理ChronicleQueueBuilder.single(path).rollCycle(RollCycles.MINUTELY)// 按分钟轮转 .build();2. 数据安全
如果需要更严格的数据安全(同步刷盘):
ChronicleQueuequeue=ChronicleQueueBuilder.single(path).forceWrites(true)// 每次写入都同步刷盘(会变慢).build();3. 读取的唯一性
多个进程读同一个 Queue,每条消息可以被每个进程读到一次(不是广播模式)。
如果需要广播,用 Chronicle Broadcast。
总结
Chronicle Queue 就是一个能落盘的 Disruptor:
- MMAP让写入像内存一样快
- cycle 文件实现数据持久化和自动轮转
- 支持重放让故障恢复变得简单
适合场景:日志持久化、事件溯源、低延迟进程间通信。
如果你的业务需要高性能又不希望丢数据,Chronicle Queue 是一个很好的选择!