数据库向量化执行引擎:从 Volcano 到列式处理的性能跃迁
数据库向量化执行引擎从 Volcano 到列式处理的性能跃迁一、行式执行的瓶颈一次一行CPU 的灾难传统数据库执行引擎采用 Volcano火山模型——每个算子通过next()方法逐行传递数据。这种模型实现简单但对 CPU 极不友好每行数据都需要一次虚函数调用、一次分支判断和一次数据拷贝。当处理百万行数据时这些开销累积成巨大的性能浪费。向量化执行的核心思想是一次处理一批行Batch-at-a-Time。每个算子接收一批行如 1024 行对整批数据执行同一操作利用 CPU 的 SIMD 指令和缓存局部性。向量化执行不是简单的批处理——它要求算子内部按列组织数据使得连续内存访问和 SIMD 指令成为可能。二、向量化执行架构从 Volcano 到 Morsel-Driven向量化执行引擎的架构变化体现在三个层面数据组织从行式变为列式算子接口从next()变为next_batch()执行模型从推模式变为拉模式向量化。Morsel-Driven 模型进一步将数据分片为 Morsel如 10000 行每个 Morsel 由一个线程处理实现并行化。flowchart TB subgraph Volcano模型 A1[Scan] --|1行| B1[Filter] B1 --|1行| C1[Project] C1 --|1行| D1[Aggregate] end subgraph 向量化模型 A2[BatchScanbr/1024行] --|1024行| B2[BatchFilterbr/列式处理] B2 --|N行| C2[BatchProjectbr/列式计算] C2 --|N行| D2[BatchAggregatebr/向量化聚合] end subgraph Morsel-Driven并行 E1[Morsel 1br/线程1] -- F1[BatchFilter] E2[Morsel 2br/线程2] -- F2[BatchFilter] E3[Morsel 3br/线程3] -- F3[BatchFilter] F1 -- G[结果合并] F2 -- G F3 -- G end A2 -- E1 A2 -- E2 A2 -- E3向量化执行的性能提升来自三个因素减少虚函数调用1024 行一次调用 vs 1024 次调用、SIMD 指令并行一条指令处理 4-8 个数据、缓存友好列式数据连续存储CPU 预取效率高。三、生产级代码实现向量化算子与列式存储3.1 列式数据批次import numpy as np from dataclasses import dataclass from typing import List, Dict, Optional class VectorBatch: 列式数据批次 def __init__(self, capacity: int 1024): self.capacity capacity self.columns: Dict[str, np.ndarray] {} self.num_rows 0 self.selection_vector: Optional[np.ndarray] None # selection_vector 记录有效行的索引 # 为什么用 selection vector 而非物理删除 # 物理删除需要移动数据开销大 # selection vector 只记录有效行索引 # 后续算子通过索引访问有效数据 def add_column(self, name: str, dtype: str, data: np.ndarray): 添加一列数据 self.columns[name] data.astype(dtype) def get_column(self, name: str) - np.ndarray: 获取一列数据考虑 selection vector col self.columns[name] if self.selection_vector is not None: return col[self.selection_vector[:self.num_rows]] return col[:self.num_rows] def filter(self, mask: np.ndarray): 根据布尔掩码过滤行 # 向量化过滤用布尔索引一次性选出有效行 # 为什么用布尔掩码而非逐行判断 # NumPy 的布尔索引在 C 层执行 # 比 Python 循环快 50-100 倍 indices np.where(mask)[0].astype(np.int32) self.selection_vector indices self.num_rows len(indices) def slice(self, offset: int, length: int) - VectorBatch: 切片生成子批次 batch VectorBatch(length) for name, col in self.columns.items(): batch.add_column(name, col.dtype.str, col[offset:offset length]) batch.num_rows min(length, self.num_rows - offset) return batch3.2 向量化算子实现class VectorizedFilter: 向量化过滤算子 def __init__(self, column: str, operator: str, value): self.column column self.operator operator self.value value def execute(self, batch: VectorBatch) - VectorBatch: 执行向量化过滤 col batch.get_column(self.column) # 向量化比较一次比较整列 # 为什么用 NumPy 比较而非 Python 循环 # NumPy 的比较操作在 C 层执行 # 且可以利用 SIMD 指令并行处理 if self.operator : mask col self.value elif self.operator : mask col self.value elif self.operator : mask col self.value elif self.operator : mask col self.value elif self.operator : mask col self.value elif self.operator !: mask col ! self.value else: raise ValueError(f不支持的操作符: {self.operator}) # 应用过滤 batch.filter(mask) return batch class VectorizedAggregation: 向量化聚合算子 def __init__(self, group_by: List[str], aggregates: Dict[str, str]): # group_by: 分组列名列表 # aggregates: {输出列名: 聚合表达式} # 如 {total: SUM(amount), count: COUNT(*)} self.group_by group_by self.aggregates aggregates def execute(self, batches: List[VectorBatch]) - VectorBatch: 执行向量化聚合 # 合并所有批次 all_groups {} # 为什么用字典而非排序聚合 # 哈希聚合对大数据集更高效O(N) # 排序聚合需要 O(N log N) # 但哈希聚合的内存占用更高 for batch in batches: # 提取分组键 group_keys self._extract_group_keys(batch) # 对每个分组执行聚合 for i in range(batch.num_rows): key tuple(group_keys[col][i] for col in self.group_by) if key not in all_groups: all_groups[key] self._init_accumulators() self._accumulate(all_groups[key], batch, i) # 构建输出批次 return self._build_output(all_groups) def _extract_group_keys(self, batch) - Dict[str, np.ndarray]: return {col: batch.get_column(col) for col in self.group_by} def _init_accumulators(self) - dict: accum {} for name, expr in self.aggregates.items(): if expr.startswith(SUM): accum[name] 0.0 elif expr.startswith(COUNT): accum[name] 0 elif expr.startswith(AVG): accum[name] {sum: 0.0, count: 0} elif expr.startswith(MAX): accum[name] float(-inf) elif expr.startswith(MIN): accum[name] float(inf) return accum def _accumulate(self, accum: dict, batch: VectorBatch, row_idx: int): for name, expr in self.aggregates.items(): # 提取聚合列名 col_name expr.split(()[1].rstrip()) if col_name *: accum[name] 1 continue value batch.get_column(col_name)[row_idx] if expr.startswith(SUM): accum[name] value elif expr.startswith(COUNT): accum[name] 1 elif expr.startswith(AVG): accum[name][sum] value accum[name][count] 1 elif expr.startswith(MAX): accum[name] max(accum[name], value) elif expr.startswith(MIN): accum[name] min(accum[name], value) def _build_output(self, groups: dict) - VectorBatch: num_groups len(groups) batch VectorBatch(num_groups) # 写入分组列 for i, col_name in enumerate(self.group_by): data np.array([key[i] for key in groups.keys()]) batch.add_column(col_name, data.dtype.str, data) # 写入聚合列 for name, expr in self.aggregates.items(): if expr.startswith(AVG): values np.array([ v[sum] / v[count] for v in groups.values() ], dtypenp.float64) else: values np.array( list(groups.values()), dtypenp.float64 ) batch.add_column(name, values.dtype.str, values) batch.num_rows num_groups return batch3.3 向量化 Scan 算子class VectorizedTableScan: 向量化表扫描算子 def __init__(self, table_name: str, columns: List[str], batch_size: int 1024): self.table_name table_name self.columns columns self.batch_size batch_size def execute(self) - List[VectorBatch]: 分批扫描表数据 batches [] # 从列式存储中分批读取 # 为什么分批读取而非全量加载 # 全量加载百万行数据会占用数 GB 内存 # 分批读取内存占用恒定batch_size × 列数 × 8字节 offset 0 while True: batch self._read_batch(offset, self.batch_size) if batch.num_rows 0: break batches.append(batch) offset self.batch_size return batches def _read_batch(self, offset: int, limit: int) - VectorBatch: 从列式存储读取一批数据 batch VectorBatch(limit) for col_name in self.columns: # 列式存储每列独立存储连续读取 # 为什么列式存储对向量化重要 # 行式存储中同一列的数据不连续 # CPU 缓存行预取效率低 # 列式存储中同一列数据连续排列 # 一次缓存行加载可以处理多个值 data self._read_column(col_name, offset, limit) batch.add_column(col_name, data.dtype.str, data) batch.num_rows len(data) return batch def _read_column(self, col_name: str, offset: int, limit: int) - np.ndarray: 从存储层读取一列数据占位实现 # 实际实现从 Parquet/ORC 文件读取 return np.zeros(limit)四、向量化执行的架构权衡内存、兼容性与编译优化内存占用的增加向量化执行需要将数据从行式转换为列式中间结果的内存占用可能增加。行式存储中一行数据紧密排列列式存储中一列数据紧密排列但不同列分散。建议在 Scan 算子中按需读取查询涉及的列避免读取不需要的列。与现有行式系统的兼容向量化执行需要列式存储支持。在行式存储如 InnoDB上做向量化需要先将行数据转换为列式批次转换开销可能抵消向量化的收益。ClickHouse 和 DuckDB 从存储层就是列式的向量化收益最大。表达式编译 vs 解释执行向量化算子的表达式求值可以用解释执行遍历 AST或编译执行生成机器码。编译执行如 Hyper 的 LLVM 编译性能更高但编译时间增加了查询延迟。建议对短查询用解释执行对长查询用编译执行。SIMD 的可移植性不同 CPU 的 SIMD 宽度不同SSE 128bit、AVX2 256bit、AVX-512 512bit。手写 SIMD 代码的可移植性差建议使用编译器自动向量化或跨平台 SIMD 库如 xsimd。五、总结向量化执行引擎通过一次处理一批行和列式数据组织大幅提升 CPU 利用率。核心收益来自减少虚函数调用、SIMD 并行和缓存友好。落地时建议从 Scan → Filter → Project 的简单链路开始验证再逐步实现 Join 和 Aggregate 等复杂算子。列式存储是向量化的前提行式存储上的向量化收益有限。DuckDB 是嵌入式向量化引擎的参考实现值得研究其算子设计。