一、分块技术的本质与核心挑战
1.1 为什么分块是 RAG 系统的 "命门"
很多人认为 RAG 的效果主要取决于大模型和嵌入模型,但实际上分块质量对 RAG 效果的影响占比超过 40%。一个好的分块策略可以让普通的嵌入模型和大模型产生优秀的效果,而一个差的分块策略即使使用最好的模型也无法得到准确的回答。
分块技术的本质是:在有限的上下文窗口内,最大化每个块的语义完整性和检索相关性。
1.2 分块的三大核心矛盾
所有分块算法都是在平衡这三个相互矛盾的目标:
- 上下文完整性:每个块应该包含足够的上下文信息,能够独立表达一个完整的语义单元
- 检索相关性:每个块应该足够小,能够精准匹配用户的查询
- 模型窗口限制:每个块的大小不能超过嵌入模型和大模型的最大 token 限制
| 分块大小 | 上下文完整性 | 检索相关性 | 模型窗口占用 | 适用场景 |
|---|---|---|---|---|
| 小(128-256 字符) | ❌ 差 | ✅ 好 | ✅ 小 | 问答系统、关键词检索 |
| 中(512-1024 字符) | ⭐ 一般 | ⭐ 一般 | ⭐ 一般 | 通用场景 |
| 大(2048-4096 字符) | ✅ 好 | ❌ 差 | ❌ 大 | 长文档理解、摘要生成 |
1.3 分块效果不好的典型表现
如果你的 RAG 系统出现以下问题,90% 是分块策略有问题:
- 明明文档里有答案,但系统总是说 "找不到相关信息"
- 回答不完整,只提到了部分内容
- 回答出现幻觉,引用了不存在的信息
- 检索到的文档块和问题不相关
- 同一个答案被分散在多个块中,无法整合
二、主流分块算法深度解析与实现
2.1 算法一:固定长度分块(Fixed-length Chunking)
2.1.1 原理
将文本按照固定的字符数或 token 数分割成大小相等的块,相邻块之间保留一定的重叠部分,避免上下文断裂。
2.1.2 优缺点
| 优点 | 缺点 |
|---|---|
| 实现简单,速度极快 | 容易在语义单元中间分割 |
| 可预测性强,容易调优 | 无法处理不同长度的语义单元 |
| 资源消耗低 | 重叠部分会导致信息冗余 |
2.1.3 核心参数
chunk_size:每个块的最大字符数 /token 数chunk_overlap:相邻块之间的重叠字符数 /token 数separator:分割符,优先在分割符处分割
2.1.4 最佳实践参数
| 嵌入模型 | 推荐 chunk_size(字符) | 推荐 chunk_overlap(字符) |
|---|---|---|
| BGE-small | 256-512 | 32-64 |
| BGE-base | 512-768 | 64-128 |
| BGE-large | 768-1024 | 128-256 |
2.1.5 代码实现(纯 Python 版,无依赖)
import re from typing import List, Tuple def fixed_length_chunk( text: str, chunk_size: int = 1024, chunk_overlap: int = 200, separators: List[str] = ["\n\n", "\n", "。", "!", "?", ".", "!", "?", " ", ""] ) -> List[str]: """ 纯Python实现的固定长度分块器,优先在语义分隔符处分割 :param text: 输入文本 :param chunk_size: 每个块的最大字符数 :param chunk_overlap: 相邻块之间的重叠字符数 :param separators: 分隔符优先级列表,从高到低 :return: 分块后的文本列表 """ if not text or chunk_size <= 0: return [] if chunk_overlap >= chunk_size: raise ValueError("chunk_overlap不能大于等于chunk_size") chunks = [] start = 0 text_length = len(text) while start < text_length: # 计算当前块的结束位置 end = min(start + chunk_size, text_length) # 从后往前找最合适的分隔符 actual_end = end for separator in separators: separator_pos = text.rfind(separator, start, end) if separator_pos != -1: actual_end = separator_pos + len(separator) break # 添加当前块 chunk = text[start:actual_end].strip() if chunk: chunks.append(chunk) # 移动到下一个块的起始位置 start = actual_end - chunk_overlap # 防止无限循环 if start < 0: start = 0 # 如果剩余文本不足一个块,直接添加 if end == text_length: break return chunks2.1.6 代码实现(LangChain 版,工业级标准)
from langchain_text_splitters import RecursiveCharacterTextSplitter def langchain_fixed_length_chunk( text: str, chunk_size: int = 1024, chunk_overlap: int = 200 ) -> List[str]: """ LangChain官方实现的递归字符分块器,工业界标准 """ text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=len, is_separator_regex=False, separators=[ "\n\n", "\n", "。", "!", "?", ".", "!", "?", " ", "", ] ) chunks = text_splitter.split_text(text) return chunks2.2 算法二:语义分块(Semantic Chunking)
2.2.1 原理
基于语义相似度的智能分块方法。首先将文本分割成句子,然后计算相邻句子之间的语义相似度,在相似度最低的地方分割,确保每个块内部的语义一致性最高。
2.2.2 优缺点
| 优点 | 缺点 |
|---|---|
| 分割位置符合语义边界 | 计算速度慢,需要嵌入模型 |
| 块内语义一致性高 | 对嵌入模型质量依赖大 |
| 检索准确率高 | 块大小不可预测 |
2.2.3 核心参数
sentence_window_size:计算相似度时考虑的句子窗口大小similarity_threshold:相似度阈值,低于阈值则分割min_chunk_size:最小块大小,防止生成过小的块
2.2.4 代码实现(纯 Python 版)
import numpy as np from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity from typing import List def split_into_sentences(text: str) -> List[str]: """ 纯Python实现的中文句子分割器(与之前的版本一致) """ text = text.replace('\r\n', '\n').replace('\r', '\n') sentence_endings = r'([。!?.!?]+)(?![\da-zA-Z])' parts = re.split(sentence_endings, text) sentences = [] for i in range(0, len(parts)-1, 2): if parts[i].strip(): sentence = parts[i].strip() + parts[i+1] sentences.append(sentence) if len(parts) % 2 == 1 and parts[-1].strip(): sentences.append(parts[-1].strip()) return [s.strip() for s in sentences if s.strip() and len(s.strip()) > 1] def semantic_chunk( text: str, model_name_or_path: str = "./models/bge-large-zh-v1.5", sentence_window_size: int = 3, similarity_threshold: float = 0.7, min_chunk_size: int = 100, device: str = "cpu" ) -> List[str]: """ 纯Python实现的语义分块器 :param text: 输入文本 :param model_name_or_path: 嵌入模型路径 :param sentence_window_size: 句子窗口大小 :param similarity_threshold: 相似度阈值 :param min_chunk_size: 最小块大小(字符) :param device: 运行设备 :return: 分块后的文本列表 """ # 第一步:将文本分割成句子 sentences = split_into_sentences(text) if len(sentences) <= 1: return [text] # 第二步:生成句子嵌入 model = SentenceTransformer(model_name_or_path, device=device) embeddings = model.encode(sentences, normalize_embeddings=True) # 第三步:计算相邻句子窗口的相似度 similarities = [] for i in range(len(sentences) - sentence_window_size): # 计算当前窗口和下一个窗口的嵌入 current_window = embeddings[i:i+sentence_window_size].mean(axis=0).reshape(1, -1) next_window = embeddings[i+1:i+1+sentence_window_size].mean(axis=0).reshape(1, -1) # 计算余弦相似度 similarity = cosine_similarity(current_window, next_window)[0][0] similarities.append(similarity) # 第四步:找到相似度低于阈值的分割点 split_points = [] for i, sim in enumerate(similarities): if sim < similarity_threshold: split_points.append(i + sentence_window_size) # 添加开始和结束点 split_points = [0] + split_points + [len(sentences)] # 第五步:生成块 chunks = [] for i in range(len(split_points) - 1): start = split_points[i] end = split_points[i+1] chunk_sentences = sentences[start:end] chunk_text = "".join(chunk_sentences) # 如果块太小,合并到下一个块 if len(chunk_text) < min_chunk_size and i < len(split_points) - 2: continue if chunk_text.strip(): chunks.append(chunk_text.strip()) return chunks2.3 算法三:递归分块(Recursive Chunking)
2.3.1 原理
递归分块是固定长度分块的改进版,专门用于处理长文档。它按照分隔符的优先级从高到低递归分割文本,直到每个块的大小小于指定的 chunk_size。
例如,分隔符优先级为["\n\n", "\n", "。", " "],则:
- 首先用
\n\n分割文本 - 如果分割后的块仍然大于 chunk_size,再用
\n分割 - 如果还是太大,再用
。分割 - 最后用空格分割
2.3.2 优缺点
| 优点 | 缺点 |
|---|---|
| 尽可能保留语义单元的完整性 | 仍然是基于字符的分割,不是真正的语义分割 |
| 比简单固定长度分块效果好 | 块大小仍然不可预测 |
| 实现简单,速度快 | 对复杂结构的文档处理效果一般 |
2.3.3 代码实现
def recursive_chunk( text: str, chunk_size: int = 1024, chunk_overlap: int = 200, separators: List[str] = ["\n\n", "\n", "。", "!", "?", ".", "!", "?", " ", ""], depth: int = 0 ) -> List[str]: """ 纯Python实现的递归分块器 """ if not text or depth >= len(separators): return [text.strip()] if text.strip() else [] chunks = [] separator = separators[depth] # 用当前分隔符分割文本 parts = text.split(separator) current_chunk = "" for part in parts: if not part.strip(): continue # 加上分隔符 part_with_sep = part + separator # 如果当前块加上这个部分超过chunk_size if len(current_chunk) + len(part_with_sep) > chunk_size: # 如果当前块不为空,添加到结果 if current_chunk: chunks.append(current_chunk.strip()) # 如果这个部分本身就超过chunk_size,递归分割 if len(part_with_sep) > chunk_size: sub_chunks = recursive_chunk( part_with_sep, chunk_size, chunk_overlap, separators, depth + 1 ) chunks.extend(sub_chunks) current_chunk = "" else: current_chunk = part_with_sep else: current_chunk += part_with_sep # 添加最后一个块 if current_chunk.strip(): chunks.append(current_chunk.strip()) # 处理重叠 if chunk_overlap > 0 and len(chunks) > 1: overlapped_chunks = [] for i in range(len(chunks)): if i == 0: overlapped_chunks.append(chunks[i]) else: # 从前一个块的末尾取overlap个字符 overlap_text = chunks[i-1][-chunk_overlap:] overlapped_chunk = overlap_text + chunks[i] overlapped_chunks.append(overlapped_chunk) chunks = overlapped_chunks return chunks2.4 算法四:结构化分块(Structured Chunking)
2.4.1 原理
结构化分块是专门用于处理有格式的文档(如 PDF、Word、HTML)的分块方法。它首先提取文档的结构信息(标题、段落、列表、表格等),然后按照文档的自然结构进行分块,保留文档的层次关系。
2.4.2 优缺点
| 优点 | 缺点 |
|---|---|
| 保留文档的原始结构和层次关系 | 依赖文档解析器的质量 |
| 分块结果最符合人类阅读习惯 | 实现复杂,不同格式的文档需要不同的解析器 |
| 检索和回答效果最好 | 速度较慢 |
2.4.3 代码实现(基于 pdfplumber,最佳 PDF 解析器)
import pdfplumber from typing import List, Dict, Any def parse_pdf_with_structure(file_path: str) -> List[Dict[str, Any]]: """ 解析PDF文档并保留结构信息 :param file_path: PDF文件路径 :return: 包含结构信息的文本块列表 """ structured_blocks = [] with pdfplumber.open(file_path) as pdf: for page_num, page in enumerate(pdf.pages, 1): # 提取页面文本和布局信息 text = page.extract_text() if not text: continue # 简单的标题识别(基于字体大小) lines = page.lines for line in lines: font_size = line["chars"][0]["size"] if line["chars"] else 12 text_line = line["text"].strip() if not text_line: continue # 判断是否为标题(字体大于14且加粗) is_heading = False if font_size > 14 and any(char["fontname"].lower().endswith("bold") for char in line["chars"]): is_heading = True structured_blocks.append({ "text": text_line, "page": page_num, "font_size": font_size, "is_heading": is_heading, "level": 1 if is_heading else 2 }) return structured_blocks def structured_chunk( structured_blocks: List[Dict[str, Any]], max_chunk_size: int = 1024 ) -> List[Dict[str, Any]]: """ 基于文档结构进行分块 :param structured_blocks: 包含结构信息的文本块列表 :param max_chunk_size: 每个块的最大字符数 :return: 分块后的结果 """ chunks = [] current_chunk = [] current_chunk_size = 0 current_heading = None for block in structured_blocks: text = block["text"] text_size = len(text) # 如果是标题 if block["is_heading"]: # 如果当前块不为空,保存 if current_chunk: chunk_text = "\n".join([b["text"] for b in current_chunk]) chunks.append({ "text": chunk_text, "heading": current_heading, "pages": list(set([b["page"] for b in current_chunk])) }) current_chunk = [] current_chunk_size = 0 current_heading = text current_chunk.append(block) current_chunk_size = text_size else: # 如果加上这个块超过最大大小 if current_chunk_size + text_size > max_chunk_size and current_chunk: # 保存当前块 chunk_text = "\n".join([b["text"] for b in current_chunk]) chunks.append({ "text": chunk_text, "heading": current_heading, "pages": list(set([b["page"] for b in current_chunk])) }) current_chunk = [] current_chunk_size = 0 # 如果这个块本身就超过最大大小,分割成多个块 if text_size > max_chunk_size: sub_chunks = fixed_length_chunk(text, chunk_size=max_chunk_size, chunk_overlap=100) for sub_chunk in sub_chunks: chunks.append({ "text": sub_chunk, "heading": current_heading, "pages": [block["page"]] }) else: current_chunk.append(block) current_chunk_size = text_size else: current_chunk.append(block) current_chunk_size += text_size # 保存最后一个块 if current_chunk: chunk_text = "\n".join([b["text"] for b in current_chunk]) chunks.append({ "text": chunk_text, "heading": current_heading, "pages": list(set([b["page"] for b in current_chunk])) }) return chunks最佳实践总结
| 场景 | 推荐分块方法 | 推荐 chunk_size |
|---|---|---|
| 通用场景 | 递归分块 | 1024 |
| 对准确率要求高 | 结构化分块 | 1024 |
| 短文档问答 | 语义分块 | 512 |
| 长文档处理 | 递归分块 + 语义分块 | 2048 |
| 对性能要求高 | 固定长度分块 | 1024 |