news 2026/4/20 5:56:20

Chronicle Queue:把 Disruptor 的数据落盘

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Chronicle Queue:把 Disruptor 的数据落盘

之前聊过 Disruptor(高性能队列 Disruptor),它的性能逆天,但有个致命问题:纯内存,进程挂了数据就丢了

Chronicle Queue 就是来解决这个问题的——持久化的 Disruptor

解决什么问题

Disruptor 的问题: - 进程崩溃 → 数据全丢 - 重启后无法恢复之前的消息 - 只能作为临时缓冲 Chronicle Queue 的改进: - 数据落盘,进程重启不丢失 - 支持消息重放(Replay) - 保留了 Disruptor 的高性能

同样的团队(LMAX),同样的设计理念,只是一个加了持久化。

核心原理

MMAP 内存映射

Chronicle Queue 之所以快,关键在于MMAP(内存映射)

// 传统文件写入write(fd,data,size);// 用户态 → 内核态 → 磁盘// MMAP 写入memcpy(buffer,data,size);// 直接写内存,由 OS 异步刷盘

MMAP 把磁盘文件映射到内存,写入像写内存一样快,操作系统会自己在后台刷盘。

文件结构

chronicle-queue/ ├── 20190101.data # cycle 文件,约 64MB ├── 20190102.data ├── 20190103.data └── 20190104.data 每个文件叫做一个 cycle,写满自动切换下一个

数据写入流程

// 1. 写入数据到 MMAP 内存appender.writeText("订单创建: orderId=1001");// 2. 数据立即可读(内存读取)Stringmsg=tailer.readText();// 3. OS 在后台异步刷盘(不需要等待)

快速开始

引入依赖

<dependency><groupId>net.openhft</groupId><artifactId>chronicle_queue</artifactId><version>5.22.4</version></dependency>

基础用法

publicclassChronicleQueueDemo{publicstaticvoidmain(String[]args){Stringpath="./data/orders";// 1. 创建队列(单机模式)ChronicleQueuequeue=ChronicleQueueBuilder.single(path)// 单文件模式.build();// 2. 获取写入器ExcerptAppenderappender=queue.acquireAppender();// 3. 写入消息(支持多种格式)appender.writeText("订单1: 100元");appender.writeText("订单2: 200元");appender.writeText("订单3: 300元");// 4. 获取读取器ExcerptTailertailer=queue.createTailer();// 5. 读取所有消息while(true){Stringtext=tailer.readText();if(text==null){break;// 没有更多消息}System.out.println("收到: "+text);}queue.close();}}

输出

收到: 订单1: 100元 收到: 订单2: 200元 收到: 订单3: 300元

写入/读取二进制数据

Text 方便但不类型安全,实际项目推荐用二进制:

// 1. 定义 Order 消息@DatapublicclassOrder{privatelongorderId;privatedoubleamount;privateStringstatus;}// 2. 使用 BinaryWire 写入ChronicleQueuequeue=ChronicleQueueBuilder.single("./data/orders").wireType(WireType.BINARY).build();ExcerptAppenderappender=queue.acquireAppender();// 写入appender.writeDocument(w->w.write("orderId").int64(1001).write("amount").float64(299.00).write("status").text("pending"));// 读取ExcerptTailertailer=queue.createTailer();while(tailer.readDocument(r->{longorderId=r.read("orderId").int64();doubleamount=r.read("amount").float64();Stringstatus=r.read("status").text();System.out.println(orderId+", "+amount+", "+status);})){// 继续读取}

服务重启后数据恢复

这是 Chronicle Queue 最厉害的地方——重启后消息还在

publicclassDataRecoveryDemo{publicstaticvoidmain(String[]args){Stringpath="./data/orders";// 第一次运行:写入数据writeData(path);// 模拟进程重启...// 第二次运行:读取数据(刚才写的还在)readData(path);}privatestaticvoidwriteData(Stringpath){ChronicleQueuequeue=ChronicleQueueBuilder.single(path).build();ExcerptAppenderappender=queue.acquireAppender();for(inti=0;i<100;i++){appender.writeText("订单 "+i);}queue.close();System.out.println("写入完成,进程退出");}privatestaticvoidreadData(Stringpath){ChronicleQueuequeue=ChronicleQueueBuilder.single(path).build();ExcerptTailertailer=queue.createTailer();intcount=0;while(true){Stringtext=tailer.readText();if(text==null)break;count++;}System.out.println("重启后读到 "+count+" 条消息");queue.close();}}

输出:

写入完成,进程退出 重启后读到 100 条消息

异步写入

Chronicle Queue 默认是同步写入内存(但不刷盘),如果需要更高吞吐:

// 异步写入(不等待)appender.writeText("订单",()->"异步订单数据");// 或者批量写入for(inti=0;i<10000;i++){appender.writeText("订单 "+i);// 先写入到内存,积累一定量后批量刷盘}

指定读取位置

有时候不需要从头读,只想从某个时间点开始:

// 从指定时间开始读取longfromTime=System.currentTimeMillis()-3600_000;// 1小时前ExcerptTailertailer=queue.createTailer().toStart()// 从头开始// 或// .toEnd() // 只读新消息// .to(1234567890L) // 读到指定位置;// 遍历while(true){Stringtext=tailer.readText();if(text==null)break;// 处理...}

为什么不都用 Chronicle Queue

说了这么多好处,为什么大家还是在用 Kafka?

Chronicle Queue 的局限:

  • 不能分布式 - 只能单机玩,想扩容?没门
  • 不支持多消费者 - 一条消息只能被一个进程拿走
  • 数据堆积有限 - 单机磁盘多大,它就能存多少
  • 没有生态 - 没有管理后台、没有监控面板

说白了:

Chronicle Queue 就像一辆超跑,赛道无敌,但上不了高速(不能分布式)。

Kafka 就像 SUV,什么都能干,虽然跑赛道不如超跑,但胜在能拉人能越野(支持集群、广播、海量数据)。

选谁:

  • 同机器进程间通信、高性能低延迟 → Chronicle Queue
  • 跨机器、分布式、海量数据 → Kafka/RabbitMQ

两者不是一个赛道的,没有谁取代谁的说法。

注意事项

1. 磁盘空间

数据不会自动清理,需要定期清理旧 cycle 文件:

# 手动清理 3 天前的文件find./data/orders-name"*.data"-mtime+3-delete# 或配置 Chronicle Queue 自动清理ChronicleQueueBuilder.single(path).rollCycle(RollCycles.MINUTELY)// 按分钟轮转 .build();

2. 数据安全

如果需要更严格的数据安全(同步刷盘):

ChronicleQueuequeue=ChronicleQueueBuilder.single(path).forceWrites(true)// 每次写入都同步刷盘(会变慢).build();

3. 读取的唯一性

多个进程读同一个 Queue,每条消息可以被每个进程读到一次(不是广播模式)。

如果需要广播,用 Chronicle Broadcast。

总结

Chronicle Queue 就是一个能落盘的 Disruptor

  • MMAP让写入像内存一样快
  • cycle 文件实现数据持久化和自动轮转
  • 支持重放让故障恢复变得简单

适合场景:日志持久化、事件溯源、低延迟进程间通信。

如果你的业务需要高性能又不希望丢数据,Chronicle Queue 是一个很好的选择!

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/20 5:45:38

Janus-Pro-7B赋能运维可视化:自动生成服务器监控图表分析报告

Janus-Pro-7B赋能运维可视化&#xff1a;自动生成服务器监控图表分析报告 每次凌晨被告警电话叫醒&#xff0c;睡眼惺忪地打开监控大盘&#xff0c;面对几十张密密麻麻、曲线乱舞的性能图表&#xff0c;你是不是也感到一阵头疼&#xff1f;CPU使用率突然飙升&#xff0c;是业务…

作者头像 李华
网站建设 2026/4/20 5:43:29

Ollama本地模型管理利器:与星图云端Qwen3-14B-AWQ协同工作流

Ollama本地模型管理利器&#xff1a;与星图云端Qwen3-14B-AWQ协同工作流 1. 混合AI部署的新思路 在AI应用开发中&#xff0c;我们常常面临一个两难选择&#xff1a;是追求高性能的云端大模型&#xff0c;还是选择响应更快的本地轻量模型&#xff1f;这个问题在资源有限的中小…

作者头像 李华