分类:6.数据订阅 TMQ |篇章:03 TMQ 内部机制
适用版本:TDengine v3.x(v3.3.x / v3.4.x) | 最后更新:2026-07-04
TMQ 本质上是"WAL 之上的过滤式订阅"。本文深入剖析 TMQ 如何复用 WAL 数据、Topic SQL 如何被推到 VNode 端执行、Consumer Group 协调者如何工作、以及位点管理的实现细节。
核心概念速查表
| 概念 | 说明 |
|---|---|
| WAL 复用 | TMQ 直接从 WAL 读取,不额外存储 |
| Topic SQL Pushdown | 过滤在 VNode 端执行 |
| Coordinator | MNode 上的消费组协调者 |
| Offset Map | (group, topic, vgroup) → offset |
| Long Poll | 长轮询拉取 |
| WAL Retention | WAL 保留期(决定可订阅范围) |
详细解析
1. TMQ 与 WAL 的关系
TMQ 数据来源: 写入数据 → WAL(持久化日志)→ MemTable → TSDB 文件 ↑ TMQ 从这里读取 优势: - 不需要单独的消息存储 - 数据天然按写入顺序排列 - WAL 已经保证持久性 WAL 中包含: - 数据写入(INSERT) - Schema 变更(ALTER TABLE) - 子表创建/删除 - 删除/更新操作 可订阅范围: - 受 WAL_RETENTION_PERIOD 限制 - WAL 被清理 = 数据无法订阅 - 调大保留期 → 消费历史能力增强(但占磁盘)2. Topic SQL 下推执行
Topic SQL 执行位置: CREATE TOPIC t_high AS SELECT * FROM meters WHERE current > 100; 执行流程: Consumer.poll() ↓ Fetch Request (vgroup=3, offset=N) ↓ VNode 3 收到请求 ↓ 从 WAL 读取 offset N 之后的 Record ↓ 对每个 Record 应用 SQL: - 解析为内存数据结构 - 应用 WHERE 过滤 - 投影 SELECT 列 ↓ 打包过滤后数据 ↓ 返回 Consumer 关键优化: - SQL 计划缓存(避免每次重 Parser) - 列投影减少传输 - 谓词早期过滤3. Coordinator 角色
MNode 上的 Consumer Group 协调者: 职责: - 维护 Consumer Group 成员列表 - 处理 Subscribe / Unsubscribe - 触发 Rebalance - 持久化 Offset - 监控 Consumer 心跳 - 故障检测 协调消息: - JoinGroup: Consumer 加入 - SyncGroup: 获取分配方案 - Heartbeat: 保活 - CommitOffset: 提交位点 - LeaveGroup: 离开 协调者状态: - Empty: 无活跃成员 - Preparing: Rebalance 准备中 - Stable: 稳定消费 - Dead: 组已删除4. Offset 管理
Offset 元数据结构: 存储位置:MNode 内部表 Schema: group_id (varchar) topic_name (varchar) vgroup_id (int) offset (int64) commit_time (timestamp) Commit 持久化: ① Consumer 发送 Commit RPC ② MNode 写入 Offset 元数据 ③ 多副本同步 ④ 返回成功 读取 Offset: ① Consumer 启动时拉取自己的 Offset ② Rebalance 后从 Offset 继续 ③ Offset 不存在 → 用 auto.offset.reset 策略5. Long Poll 机制
长轮询拉取: Consumer.poll(timeout=1.0) ↓ Fetch Request → VNode ↓ VNode 检查: if 有未读消息: 立即返回 else: 等待最长 timeout 期间有新数据则立即返回 超时则返回空 优势: - 减少空轮询 - 实时性好(新数据即时推送) - 服务端负载可控 对比短轮询: 短轮询:客户端每 100ms 询问一次 → 9 成是空回 长轮询:等待新数据或超时 → 几乎不浪费6. 多 VGroup 并行消费
单 Consumer 处理多个 VGroup: Consumer-1 被分配 VG1, VG2, VG3 内部实现: Thread / async: - 与 VG1 维持 Fetch 连接 - 与 VG2 维持 Fetch 连接 - 与 VG3 维持 Fetch 连接 并发拉取,合并消息队列 Poll 返回: 合并后的消息批 各 VGroup 独立 Offset 负载均衡: - 各 VGroup 数据不均时 → 处理快的优先 - 不会因为某 VG 慢导致整体卡住7. Schema 变更的传递
Topic 数据流中遇到 ALTER TABLE: ① 服务端 ALTER STABLE meters ADD COLUMN x ② WAL 中记录该 Schema 变更 ③ Consumer Fetch 时遇到 Schema 变更记录 ④ 返回特殊事件给 Consumer ⑤ Consumer 检测到事件 → 更新本地 Schema 缓存 ⑥ 后续数据按新 Schema 解析 应用处理: - 监听 Schema 变更事件 - 动态适应新列 - 或忽略新列继续处理已知列8. 重启与恢复
Consumer 重启场景: ① 关闭 Consumer 时: - 发送 LeaveGroup - 触发 Rebalance - 其他 Consumer 接管分区 ② 重启 Consumer: - 重新 JoinGroup - 触发 Rebalance - 从 Committed Offset 继续 ③ Consumer 崩溃(无 LeaveGroup): - 心跳超时(默认 30s) - MNode 标记失联 - 触发 Rebalance - 其他成员接管 会话超时 vs 重平衡耗时: - session.timeout.ms 过小 → 误判失联频繁 Rebalance - 过大 → 故障感知慢 - 推荐 30~60 秒代码示例
查看 TMQ 内部状态
-- 所有订阅SELECT*FROMinformation_schema.ins_subscriptions;-- Consumer 详情SELECT*FROMperformance_schema.perf_consumers;-- WAL 保留期SELECTname,wal_retention_period,wal_retention_sizeFROMinformation_schema.ins_databases;配置 WAL 保留
-- 创建数据库时配置(保留 30 天 WAL)CREATEDATABASEdb WAL_RETENTION_PERIOD2592000WAL_RETENTION_SIZE10000;-- 修改现有数据库ALTERDATABASEdb WAL_RETENTION_PERIOD2592000;Rebalance 日志监控
# taosd.log 中查找grep"rebalance"/var/log/taos/taosdlog.0grep"consumer.*joined"/var/log/taos/taosdlog.0grep"consumer.*left"/var/log/taos/taosdlog.0性能考量
TMQ 性能特点
| 维度 | 表现 |
|---|---|
| 写入侧 | 无额外开销(WAL 本就要写) |
| Fetch 侧 | 受 WAL 读速度限制 |
| 过滤计算 | 取决于 SQL 复杂度 |
| 元数据 | Commit 频率影响 MNode 负载 |
影响 TMQ 性能的关键
| 因素 | 影响 |
|---|---|
| Topic SQL 复杂度 | 高 |
| WAL 保留期 | 影响磁盘+IO |
| Consumer 数 | 与并行度直接相关 |
| 批量大小 | 摊薄 RPC 开销 |
FAQ
Q1: TMQ 性能上限?
单 VGroup 消费可达几十万行/秒。总吞吐与 VGroup 数线性扩展。
Q2: Topic SQL 能用所有 SQL 语法吗?
主要支持:SELECT 投影、WHERE 过滤。不支持 JOIN、聚合、窗口、子查询。复杂分析推荐用流计算。
Q3: 删除 Topic 影响 Consumer 吗?
Consumer 的 Poll 会失败。需要订阅其他 Topic 或关闭。
Q4: Consumer Group 怎么管理?
SHOWCONSUMERS;SHOWSUBSCRIPTIONS;DROPCONSUMERGROUP<group>ON<topic>;Q5: TMQ 和流计算的关系?
流计算可以订阅 Topic 作为输入;流计算的输出可以再创建 Topic 给下游消费。两者形成完整数据流水线。
参考
系统构架篇
- 01-《TDengine 整体架构全景》
- 02-《集群拓扑深度解析》
- 03-《MNode 内部机制深度解析》
- 04-《RPC 通信层深度解析》
- 05-《VNode 生命周期》
- 06-《RAFT 共识协议》
- 07-《端到端的消息流》
数据模型
- 01-《数据库创建与参数详解》
- 02-《超级表/子表/普通表》
- 03-《支持数据类型深度解析》
- 04-《TDengine Tag 设计哲学与 Schema 变更机制》
- 05-《TDengine 虚拟表实现原理》
存储引擎
- 01-《TDengine 存储引擎概览》
- 02-《TDengine MemTable 深度解析》
- 03-《TDengine WAL 预写日志机制》
- 04-《TDengine 数据文件格式》
- 05-《TDengine Commit 与 Flush 机制 》
- 06-《TDengine Compaction 合并策略 》
- 07-《TDengine 数据保留与 TTL》
- 08-《TDengine 压缩编码机制》
- 09-《TDengine Cache 与 Last 查询加速》
- 10-《TDengine 逻辑计划生成》
查询引擎
- 01-《TDengine 查询引擎概览》
- 02-《TDengine SQL 解析与词法分析》
- 03-《TDengine 语义分析与 AST 重写》
- 04-《TDengine 逻辑计划生成》
- 05-《TDengine 物理计划生成》
- 06-《TDengine 扫描算子》
- 07-《TDengine 聚合算子》
- 08-《TDengine 聚合算子》
- 09-《TDengine 连接算子》
- 10-《TDengine 排序、填充与投影》
- 11-《TDengine 分布式查询执行》
- 12-《TDengine EXPLAIN 与查询优化》
数据写入
- 01-《TDengine SQL INSERT》
- 02-《TDengine 无模式写入》
- 03-《TDengine STMT 写入》
- 04-《TDengine 写入内部流程》
- 05-《TDengine 数据更新删除》
数据订阅
- 01-《TDengine 数据订阅》
- 02-《TDengine 订阅 vs Kafka》
- 03-《TDengine TMQ 消费流程》
关于 TDengine
TDengine 专为物联网IoT平台、工业大数据平台设计。其中,TDengine TSDB 是一款高性能、分布式的时序数据库(Time Series Database),同时它还带有内建的缓存、流式计算、数据订阅等系统功能;TDengine IDMP 是一款AI原生工业数据管理平台,它通过树状层次结构建立数据目录,对数据进行标准化、情景化,并通过 AI 提供实时分析、可视化、事件管理与报警等功能。