news 2026/6/16 10:30:05

数据库向量化执行引擎:从 Volcano 到列式处理的性能跃迁

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
数据库向量化执行引擎:从 Volcano 到列式处理的性能跃迁

数据库向量化执行引擎:从 Volcano 到列式处理的性能跃迁

一、行式执行的瓶颈:一次一行,CPU 的灾难

传统数据库执行引擎采用 Volcano(火山)模型——每个算子通过next()方法逐行传递数据。这种模型实现简单,但对 CPU 极不友好:每行数据都需要一次虚函数调用、一次分支判断和一次数据拷贝。当处理百万行数据时,这些开销累积成巨大的性能浪费。

向量化执行的核心思想是"一次处理一批行"(Batch-at-a-Time)。每个算子接收一批行(如 1024 行),对整批数据执行同一操作,利用 CPU 的 SIMD 指令和缓存局部性。向量化执行不是简单的"批处理"——它要求算子内部按列组织数据,使得连续内存访问和 SIMD 指令成为可能。

二、向量化执行架构:从 Volcano 到 Morsel-Driven

向量化执行引擎的架构变化体现在三个层面:数据组织从行式变为列式,算子接口从next()变为next_batch(),执行模型从推模式变为拉模式+向量化。Morsel-Driven 模型进一步将数据分片为 Morsel(如 10000 行),每个 Morsel 由一个线程处理,实现并行化。

flowchart TB subgraph Volcano模型 A1[Scan] -->|1行| B1[Filter] B1 -->|1行| C1[Project] C1 -->|1行| D1[Aggregate] end subgraph 向量化模型 A2[BatchScan<br/>1024行] -->|1024行| B2[BatchFilter<br/>列式处理] B2 -->|N行| C2[BatchProject<br/>列式计算] C2 -->|N行| D2[BatchAggregate<br/>向量化聚合] end subgraph Morsel-Driven并行 E1[Morsel 1<br/>线程1] --> F1[BatchFilter] E2[Morsel 2<br/>线程2] --> F2[BatchFilter] E3[Morsel 3<br/>线程3] --> F3[BatchFilter] F1 --> G[结果合并] F2 --> G F3 --> G end A2 --> E1 A2 --> E2 A2 --> E3

向量化执行的性能提升来自三个因素:减少虚函数调用(1024 行一次调用 vs 1024 次调用)、SIMD 指令并行(一条指令处理 4-8 个数据)、缓存友好(列式数据连续存储,CPU 预取效率高)。

三、生产级代码实现:向量化算子与列式存储

3.1 列式数据批次

import numpy as np from dataclasses import dataclass from typing import List, Dict, Optional class VectorBatch: """列式数据批次""" def __init__(self, capacity: int = 1024): self.capacity = capacity self.columns: Dict[str, np.ndarray] = {} self.num_rows = 0 self.selection_vector: Optional[np.ndarray] = None # selection_vector 记录有效行的索引 # 为什么用 selection vector 而非物理删除: # 物理删除需要移动数据,开销大; # selection vector 只记录有效行索引, # 后续算子通过索引访问有效数据 def add_column(self, name: str, dtype: str, data: np.ndarray): """添加一列数据""" self.columns[name] = data.astype(dtype) def get_column(self, name: str) -> np.ndarray: """获取一列数据(考虑 selection vector)""" col = self.columns[name] if self.selection_vector is not None: return col[self.selection_vector[:self.num_rows]] return col[:self.num_rows] def filter(self, mask: np.ndarray): """根据布尔掩码过滤行""" # 向量化过滤:用布尔索引一次性选出有效行 # 为什么用布尔掩码而非逐行判断: # NumPy 的布尔索引在 C 层执行, # 比 Python 循环快 50-100 倍 indices = np.where(mask)[0].astype(np.int32) self.selection_vector = indices self.num_rows = len(indices) def slice(self, offset: int, length: int) -> "VectorBatch": """切片生成子批次""" batch = VectorBatch(length) for name, col in self.columns.items(): batch.add_column(name, col.dtype.str, col[offset:offset + length]) batch.num_rows = min(length, self.num_rows - offset) return batch

3.2 向量化算子实现

class VectorizedFilter: """向量化过滤算子""" def __init__(self, column: str, operator: str, value): self.column = column self.operator = operator self.value = value def execute(self, batch: VectorBatch) -> VectorBatch: """执行向量化过滤""" col = batch.get_column(self.column) # 向量化比较:一次比较整列 # 为什么用 NumPy 比较而非 Python 循环: # NumPy 的比较操作在 C 层执行, # 且可以利用 SIMD 指令并行处理 if self.operator == "=": mask = col == self.value elif self.operator == ">": mask = col > self.value elif self.operator == "<": mask = col < self.value elif self.operator == ">=": mask = col >= self.value elif self.operator == "<=": mask = col <= self.value elif self.operator == "!=": mask = col != self.value else: raise ValueError(f"不支持的操作符: {self.operator}") # 应用过滤 batch.filter(mask) return batch class VectorizedAggregation: """向量化聚合算子""" def __init__(self, group_by: List[str], aggregates: Dict[str, str]): # group_by: 分组列名列表 # aggregates: {输出列名: 聚合表达式} # 如 {"total": "SUM(amount)", "count": "COUNT(*)"} self.group_by = group_by self.aggregates = aggregates def execute(self, batches: List[VectorBatch]) -> VectorBatch: """执行向量化聚合""" # 合并所有批次 all_groups = {} # 为什么用字典而非排序聚合: # 哈希聚合对大数据集更高效(O(N)), # 排序聚合需要 O(N log N); # 但哈希聚合的内存占用更高 for batch in batches: # 提取分组键 group_keys = self._extract_group_keys(batch) # 对每个分组执行聚合 for i in range(batch.num_rows): key = tuple(group_keys[col][i] for col in self.group_by) if key not in all_groups: all_groups[key] = self._init_accumulators() self._accumulate(all_groups[key], batch, i) # 构建输出批次 return self._build_output(all_groups) def _extract_group_keys(self, batch) -> Dict[str, np.ndarray]: return {col: batch.get_column(col) for col in self.group_by} def _init_accumulators(self) -> dict: accum = {} for name, expr in self.aggregates.items(): if expr.startswith("SUM"): accum[name] = 0.0 elif expr.startswith("COUNT"): accum[name] = 0 elif expr.startswith("AVG"): accum[name] = {"sum": 0.0, "count": 0} elif expr.startswith("MAX"): accum[name] = float("-inf") elif expr.startswith("MIN"): accum[name] = float("inf") return accum def _accumulate(self, accum: dict, batch: VectorBatch, row_idx: int): for name, expr in self.aggregates.items(): # 提取聚合列名 col_name = expr.split("(")[1].rstrip(")") if col_name == "*": accum[name] += 1 continue value = batch.get_column(col_name)[row_idx] if expr.startswith("SUM"): accum[name] += value elif expr.startswith("COUNT"): accum[name] += 1 elif expr.startswith("AVG"): accum[name]["sum"] += value accum[name]["count"] += 1 elif expr.startswith("MAX"): accum[name] = max(accum[name], value) elif expr.startswith("MIN"): accum[name] = min(accum[name], value) def _build_output(self, groups: dict) -> VectorBatch: num_groups = len(groups) batch = VectorBatch(num_groups) # 写入分组列 for i, col_name in enumerate(self.group_by): data = np.array([key[i] for key in groups.keys()]) batch.add_column(col_name, data.dtype.str, data) # 写入聚合列 for name, expr in self.aggregates.items(): if expr.startswith("AVG"): values = np.array([ v["sum"] / v["count"] for v in groups.values() ], dtype=np.float64) else: values = np.array( list(groups.values()), dtype=np.float64 ) batch.add_column(name, values.dtype.str, values) batch.num_rows = num_groups return batch

3.3 向量化 Scan 算子

class VectorizedTableScan: """向量化表扫描算子""" def __init__(self, table_name: str, columns: List[str], batch_size: int = 1024): self.table_name = table_name self.columns = columns self.batch_size = batch_size def execute(self) -> List[VectorBatch]: """分批扫描表数据""" batches = [] # 从列式存储中分批读取 # 为什么分批读取而非全量加载: # 全量加载百万行数据会占用数 GB 内存; # 分批读取内存占用恒定(batch_size × 列数 × 8字节) offset = 0 while True: batch = self._read_batch(offset, self.batch_size) if batch.num_rows == 0: break batches.append(batch) offset += self.batch_size return batches def _read_batch(self, offset: int, limit: int) -> VectorBatch: """从列式存储读取一批数据""" batch = VectorBatch(limit) for col_name in self.columns: # 列式存储:每列独立存储,连续读取 # 为什么列式存储对向量化重要: # 行式存储中同一列的数据不连续, # CPU 缓存行预取效率低; # 列式存储中同一列数据连续排列, # 一次缓存行加载可以处理多个值 data = self._read_column(col_name, offset, limit) batch.add_column(col_name, data.dtype.str, data) batch.num_rows = len(data) return batch def _read_column(self, col_name: str, offset: int, limit: int) -> np.ndarray: """从存储层读取一列数据(占位实现)""" # 实际实现:从 Parquet/ORC 文件读取 return np.zeros(limit)

四、向量化执行的架构权衡:内存、兼容性与编译优化

内存占用的增加:向量化执行需要将数据从行式转换为列式,中间结果的内存占用可能增加。行式存储中一行数据紧密排列,列式存储中一列数据紧密排列但不同列分散。建议在 Scan 算子中按需读取查询涉及的列,避免读取不需要的列。

与现有行式系统的兼容:向量化执行需要列式存储支持。在行式存储(如 InnoDB)上做向量化,需要先将行数据转换为列式批次,转换开销可能抵消向量化的收益。ClickHouse 和 DuckDB 从存储层就是列式的,向量化收益最大。

表达式编译 vs 解释执行:向量化算子的表达式求值可以用解释执行(遍历 AST)或编译执行(生成机器码)。编译执行(如 Hyper 的 LLVM 编译)性能更高,但编译时间增加了查询延迟。建议对短查询用解释执行,对长查询用编译执行。

SIMD 的可移植性:不同 CPU 的 SIMD 宽度不同(SSE 128bit、AVX2 256bit、AVX-512 512bit)。手写 SIMD 代码的可移植性差,建议使用编译器自动向量化或跨平台 SIMD 库(如 xsimd)。

五、总结

向量化执行引擎通过"一次处理一批行"和"列式数据组织"大幅提升 CPU 利用率。核心收益来自减少虚函数调用、SIMD 并行和缓存友好。落地时建议从 Scan → Filter → Project 的简单链路开始验证,再逐步实现 Join 和 Aggregate 等复杂算子。列式存储是向量化的前提,行式存储上的向量化收益有限。DuckDB 是嵌入式向量化引擎的参考实现,值得研究其算子设计。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/16 10:29:12

Sqribble文档操作系统:模板即规则,排版即工程

1. 项目概述&#xff1a;当模板不再是“套壳”&#xff0c;而是一套可执行的文档操作系统 你有没有过这种体验&#xff1a;手头有一篇写得不错的行业分析&#xff0c;想快速变成一份体面的PDF报告发给客户&#xff1b;或者刚整理完一套培训材料&#xff0c;却卡在排版上——调字…

作者头像 李华
网站建设 2026/6/16 10:29:00

MIPS-Linux-GNU交叉编译工具链:嵌入式开发的核心基础设施

1. 项目概述&#xff1a;从“mips-linux-gnu”说起&#xff0c;一个交叉编译工具链的深度解析如果你在嵌入式开发&#xff0c;尤其是涉及路由器、网络设备、物联网终端或者一些老旧但仍在服役的专用设备时&#xff0c;很可能会在项目的构建脚本或者文档里遇到mips-linux-gnu这个…

作者头像 李华
网站建设 2026/6/16 10:28:11

Cartographer详细讲解

1. Cartographer 是什么&#xff1f;Cartographer 是 Google 开源的实时 SLAM 系统&#xff0c;支持 2D 和 3D&#xff0c;同一套系统可以适配不同传感器配置&#xff0c;例如 2D 激光、3D 点云、IMU、轮速里程计等。它的核心思想是&#xff1a;前端构建局部一致的子地图&#…

作者头像 李华
网站建设 2026/6/16 10:23:54

快捷支付 VS 网关支付 要点速览

一、支付流程与操作快捷支付&#xff1a;提前绑定银行卡或支付账户&#xff0c;付款无需填写卡号、有效期等信息&#xff0c;一键确认即可完成&#xff0c;流程极简。网关支付&#xff1a;无需预先绑卡&#xff0c;付款跳转至第三方支付网关&#xff0c;手动填写卡号、有效期、…

作者头像 李华
网站建设 2026/6/16 10:22:57

告别手速焦虑:大麦网自动抢票工具终极指南

告别手速焦虑&#xff1a;大麦网自动抢票工具终极指南 【免费下载链接】Autoticket 大麦网自动抢票工具 项目地址: https://gitcode.com/gh_mirrors/au/Autoticket 还在为抢不到演唱会门票而烦恼吗&#xff1f;面对热门演出开票瞬间的秒杀&#xff0c;手动操作往往难以应…

作者头像 李华
网站建设 2026/6/16 10:21:58

“四十载笔耕沂蒙大地,融联入诗探索旧体新路”李日升的创作之路

生于莒县浮来山下&#xff0c;扎根沂蒙乡土数十载&#xff0c;李日升身兼高校教授、工商管理博士&#xff0c;从政履职、治学研学之余&#xff0c;坚持诗词创作四十余年。多年来他立足本土风物、日常见闻与人情世事&#xff0c;在创作中探索楹联与七言旧体诗相融的写作方式&…

作者头像 李华