ChatGPT聊天归档实战:从数据收集到高效检索的完整解决方案
1. 背景痛点:为什么“存日志”会拖垮系统
很多团队刚接入 ChatGPT 时,为了图省事,把对话直接追加到.log文件里,或者往 MySQL 的text字段一塞就完事。等业务量一上来,痛点立刻暴露:
- 全文检索靠
LIKE '%keyword%'直接锁表,查询 3 秒起步; - 单表过 500 万行后,索引失效,磁盘 IO 飙红;
- 日志文件分散在多机,grep 一次要敲 5 台服务器,排障效率低;
- 想做对话去重、敏感词过滤、上下文关联,得先写 200 行脚本洗数据。
一句话:原始方案在“可检索、可扩展、可维护”三角里,一个角都没占到。
2. 技术选型:Elasticsearch 为什么胜出
我把需求拆化成 4 个维度:全文检索速度、水平扩展成本、聚合能力、运维复杂度。用 10 分制给主流方案打分(纯主观,但来自线上踩坑):
| 维度 | ES | MongoDB | PG(BM25 插件) | ClickHouse |
|---|---|---|---|---|
| 全文检索 | 9 | 6 | 7 | 5 |
| 水平扩展 | 9 | 7 | 5 | 8 |
| 聚合统计 | 8 | 7 | 8 | 10 |
| 运维门槛 | 6 | 7 | 8 | 7 |
结论:
- 如果只想做“搜索+轻量聚合”,MongoDB 勉强够用,但分词器对中文支持弱;
- PG 在单节点表现好,扩容就得玩分库分表,改造成本高;
- ClickHouse 聚合怪兽,可全文检索不是亲儿子;
- ES 在“搜得到”与“扩得动”之间最均衡,社区现成的中文分词、SQL 风格 DSL 都成熟,于是拍板。
3. 核心实现:从裸日志到可检索文档
3.1 Python 侧:先把对话“洗”成结构化数据
原始日志长这样:
2024-05-20 14:23:10 user: 如何做番茄炒蛋? 2024-05-20 14:23:12 assistant: 首先准备鸡蛋3个,番茄2个...清洗目标:一行 JSON 带字段session_id,role,content,timestamp,msg_hash(去重)。
import json, hashlib, logging, pathlib, datetime as dt from typing import Iterator logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") def parse_raw_log(file_path: pathlib.Path) -> Iterator[dict]: session_id, msgs = Noneaca0", [] # 假设按时间排序 for line in file_path.read_text(encoding="utf8").splitlines(): try: ts_str, body = line.split(" ", 1) role, content = body.split(": ", 1) ts = dt.datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S") if not session_id: session_id = hashlib.md5(ts_str.encode()).hexdigest()[:8] msg_hash = hashlib.sha256(content.encode()).hexdigest()[:16] yield { "session_id": session_id, "role": role.strip(), "content": content.strip(), "timestamp": ts.isoformat(), "msg_hash": msg_hash } except Exception as e: logging.warning("bad line: %s | err: %s", line, e)跑完拿到.jsonl,下一步喂给 ES。
3.2 Elasticsearch 索引设计
需求:
- 按
content全文搜索,高亮; - 按
session_id拉取完整上下文; - 按
timestamp范围过滤; - 中文分词,兼容拼音搜索。
映射模板如下(已含 IK + pinyin 插件):
PUT chatgpt_2024-05-20 { "settings": { "number_of_shards": 2, "number_of_replicas": 1, "refresh_interval": "5s", "analysis": { "analyzer": { "ik_pinyin": { "type": "custom", "tokenizer": "ik_max_word", "filter": ["pinyin"] } -ci": { "type": "pinyin", "keep_first_letter": true, "keep_separate_first_letter": false } } } }, "mappings": { "properties": { "session_id": {"type": "keyword"}, "role": {"type": "keyword"}, "content": { "type": "text", "analyzer": "ik_pinyin", "search_analyzer": "ik_smart" }, "timestamp": {"type": "date", "format": "strict_date_optional_time||epoch_millis"}, "msg_hash": {"type": "keyword"} } } }要点:
- 索引按天滚动,方便冷热分离;
refresh_interval放宽到 5s,写吞吐提升 30%;- 搜索时用
ik_smart减少冗余词项,提高精度。
3.3 批量写入 & 近实时查询
单条index在高并发下会把 CPU 打满,官方推荐bulk。Python 端用elasticsearch.helpers.streaming_bulk:
from elasticsearch import Elasticsearch, helpers import json, time, logging es = Elasticsearch(["http://es-node1:9200"], retry=2, retry_on_timeout=True) def gendoc(path): for msg in parse_raw_log(path): yield { "_index": f"chatgpt_{msg['timestamp'][:10]}", "_id": msg["msg_hash"], "_source": msg } def bulk_index(path, chunk=800): successes = 0 for ok, item in helpers.streaming_bulk(es, gendoc(path), chunk_size=chunk, max_retries=3): if ok: successes += 1 else: logging.error("bulk failed: %s", item) logging.info("indexed %d docs", successes) if __name__ == "__main__": bulk_index(pathlib.Path("chat.log"))优化技巧:
- chunk 大小 800~1000 条(单条 1 KB 左右)时,写入 QPS 峰值最高;
- 客户端开
gzip压缩,内网带宽省一半; - 写前把
refresh_interval调到 -1,写完再改回 5s,能防止 segment 频繁 merge。
4. 性能考量:压测与扩容
测试环境:3 节点(8C32G,SSD),ES 7.17。
| 指标 | 单节点 | 3 节点默认分片 | 3 节点+冷热 |
|---|---|---|---|
| 写 QPS | 8k | 22k | 28k |
| 平均检索 RT | 320ms | 110ms | 80ms |
| 磁盘/GB | 100 | 100 | 62(冷压缩) |
扩容策略:
- 写高峰加“热节点”,SSD 盘,分片 2∶1;
- 7 天后索引移到“冷节点”,1 副本 +
best_compression,磁盘节省 38%; - 搜索并发高时,给协调节点升配 + 增加副本,读性能线性提升。
5. 避坑指南:中文搜索、冷热分离、敏感词
5.1 分词器选错,用户搜“番茄”找不到“西红柿”
IK 细粒度用ik_max_word,但搜索端用ik_smart可减少跳词。若业务要兼容拼音,一定装elasticsearch-analysis-pinyin,否则“fanqie” 搜不出结果。上线前跑一遍 cws_eval 工具,看看召回率。
5.2 冷热数据分离最佳实践
- 索引按天滚动模板 + ILM:热阶段 2 副本,冷阶段 0 副本(只查不吊打);
- 冷节点机械盘 +
index.codec: best_compression,CPU 消耗 <5%; - 查询侧加
preference=_local,尽量让协调节点本地收片,减少跨网络。
5.3 敏感信息过滤
在parse_raw_log阶段加正则,手机号、身份证、银行卡号直接替换成<MASK>,再写 ES。正则示例:
import re PHONE_RE = re.compile(r"(?<!\d)(1[3-9]\d{9})(?!\d)") def mask_sensitive(text: str) -> str: return PHONE_RE.sub("<MASK>", text)好处:
- 即使索引泄露,也不会带真实隐私;
- 正则跑在写入前,搜索端无感知,零额外开销。
6. 开放问题:多租户隔离怎么做?
当前方案按业务日期切索引,如果平台服务多个企业,如何做到:
- 租户 A 看不到租户 B 的数据;
- 高优租户查询慢查询隔离;
- 索引仍复用冷热分层,节省成本。
你可以思考:
- 在 mapping 里加
tenant_id字段,结合routing把同一租户路由到相同分片? - 还是直接给每个租户建独立索引,用 ILM 统一管理?
- 查询层用 Kibana Spaces + Role 过滤,还是自己在网关加一层行级权限?
7. 写在最后:把“归档”做成“资产”
聊天记录不是垃圾,而是优化模型、审计合规、用户运营的金矿。把 ES 链路搭好后,我最大的感受是:搜索响应从 3 秒降到 80ms,产品同学终于愿意用数据说话,而不是“拍脑袋”。整套代码我放在 GitHub 模板仓库(示例地址),你可以直接拿去改。
如果你想亲手搭一遍,又担心卡在某个小环节,可以试试这个动手实验:
从0打造个人豆包实时通话AI
实验里把 ASR→LLM→TTS 整条链路拆成 7 个可运行脚本,我跟着做完,对“实时语音转文本再落库”的延迟优化有了更直观的体感。里面的日志归档部分,用的正是本文这套 ES 方案,算是把“学”和“用”串在了一起。小白也能顺利体验——至少我这种非 Java 栈选手,一下午就跑通了。
下一步,你准备把聊天记录拿来训练专属模型,还是先做一套租户级 SaaS 检索平台?欢迎留言聊聊你的打算。