背景痛点:智能客服 API 的三座大山
去年“618”大促,我负责的智能客服接口在 3 k QPS 峰值下直接“躺平”:P99 延迟飙到 2.3 s,20% 意图识别结果前后矛盾,最惨的是用户连续追问“那运费呢?”——系统把上下文丢了,只能回复“亲,您想问什么?”。
复盘后把问题拆成三块:
- 并发请求处理:同步 Flask 架构,worker 打满后 CPU 空转,I/O 等待占 60%。
- 多轮对话状态维护:session 存在本地内存,横向扩容即丢失;用户刷新页面就“换了个脑子”。
- 意图识别歧义:关键词+规则模板,新品上线后同义词爆炸,准确率从 92% 跌到 74%。
技术选型:REST vs GraphQL vs gRPC 实测
先用 wrk 在 8C16G 容器里压测,场景均为“单轮 FAQ 查询”,网络 RTT 0.3 ms,三次握手:
- 空载回显:REST 22 k、GraphQL 19 k、gRPC 38 k QPS。
- P99 延迟(QPS 10 k):REST 110 ms、GraphQL 125 ms、gRPC 55 ms。
- CPU 占满时吞吐量:gRPC 仍保持 32 k,REST 跌到 9 k。
结论:对话式场景请求包体小、往返多,gRPC 的 HTTP/2 多路复用 + Protobuf 序列化优势明显;但 REST 易读、前后端直接调试方便。最终采用“外部 REST 入口 + 内部 gRPC 微服务”双层架构,兼顾开发与性能。
核心实现:FastAPI + BERT + 状态机
1. FastAPI 异步端点
# main.py from fastapi import FastAPI, Request, HTTPException, status from fastapi.responses import ORJSONResponse import aioredis import asyncio from限流 import RateLimiter # 自写模块 app = FastAPI(default_response_class=ORJSONResponse) limiter = RateLimiter(times=60, per=60) # 单 IP 60 次/分 redis = aioredis.from_url("redis://localhost:6379/1", decode_responses=True)2. BERT 意图分类
# intent.py from transformers import BertTokenizer, BertForSequenceClassification import torch MODEL_PATH = "./bert-faq-cls" tokenizer = BertTokenizer.from_pretrained(MODEL_PATH) model = BertForSequenceClassification.from_pretrained(MODEL_PATH) model.eval() def predict(text: str) -> tuple[str, float]: inputs = tokenizer(text, return_tensors="pt", truncation=True, padding="max_length", max_length=64) with torch.no_grad(): logits = model(**inputs).logits probs = torch.softmax(logits, dim=-1) idx = int(torch.argmax(probs)) label = model.config.id2label[idx] confidence = float(probs[0, idx]) return label, confidenceAttention Mask 已由 tokenizer 自动生成,无需手写。
3. 对话状态机(PlantUML)
@startuml [*] --> Idle: 新建 session Idle --> AwaitIntent: 收到首轮 query AwaitIntent --> AwaitIntent: 置信度<阈值\n请求澄清 AwaitIntent --> Answered: 置信度≥阈值\n返回答案 Answered --> AwaitIntent: 追问 Answered --> [*]: 会话超时 30min @enduml状态持久化到 Redis Hash,key 格式cx:{uid},field 包括state、last_intent、history_ids。
代码示例:带异常处理、日志、限流
# router.py import logging from datetime import datetime import json from fastapi import Depends from pydantic import BaseModel, Field class QueryIn(BaseModel): uid: str = Field(..., regex=r"^[A-Za-z0-9]{6,32}$") query: str = Field(..., min_length=1, max_length=256) logger = logging.getLogger("cx") @app.post("/v1/ask") async def ask(q: QueryIn, request: Request, _=Depends(limiter)): try: # 1. 限流已校验 # 2. 敏感词过滤 if await has_sensitive(q.query): logger.warning(f"Sensitive detected uid={q.uid}") raise HTTPException(status_code=400, detail="Invalid query") # 3. 意图识别 label, conf = predict(q.query) logger.info(f"uid={q.uid} query={q.query} label={label} conf={conf:.2f}") # 4. 状态机迁移 state = await redis.hget(f"cx:{q.uid}", "state") or "Idle" if conf < 0.65 and state != "AwaitIntent": await redis.hset(f"cx:{q.uid}", mapping={ "state": "AwaitIntent", "last_intent": label, "ts": datetime.now().isoformat() }) return {"reply": "抱歉,我没理解您的问题,能再具体点吗?", "confidence": conf} # 5. 返回答案 answer = fetch_answer(label) await redis.hset(f"cx:{q.uid}", mapping={ "state": "Answered", "last_intent": label, "ts": datetime.now().isoformat() }) return {"reply": answer, "confidence": conf} except asyncio.TimeoutError: logger.error("Redis timeout") raise HTTPException(status_code=503, detail="Cache unavailable") except Exception as e: logger.exception("Unhandled") raise HTTPException(status_code=500, detail="Internal error")- 异步上下文管理:统一用
async/await,模型加载放在startup事件,避免每次请求重复torch.load。 - 请求限流:基于 Redis + Token bucket,单 IP 超过阈值直接 429,保护后端 GPU 推理节点。
- 敏感信息过滤:正则 + 敏感词库,命中即拒绝,日志脱敏打印前 16 位 UID。
生产建议:让接口扛住 10 k QPS
1. 冷启动优化
BERT 模型 420 M,首次推理 3 s。采用“预热脚本”+“进程保活”:
- 容器启动后,同步执行
warmup()对 100 条样本推理,CUDA kernel 编译完成后再注册 Consul。 - 设置
uvicorn --workers 1+preload,单进程多线程,避免多 worker 重复占显存。
2. 对话 session 的 Redis 策略
- 写:每次状态变更
HSET+EXPIRE 1800,30 min 无访问自动淘汰。 - 读:使用
redis.pipeline()批量拉取,减少 RTT。 - 大促时启用 Redis on Flash,冷数据落 SSD,内存节省 45%。
3. 灰度发布 AB 测试
- 在 Header 带上
X-Canary=1的流量打入新模型,Prometheus 统计意图置信度分布。 - 指标:平均置信度↑、拒识率↓、人工转接率↓,连续 24 h 无异常再全量。
安全合规:GDPR 数据擦除
欧盟用户行使“被遗忘权”时,系统需 30 天内删除可识别数据。实现方案:
- 数据分层:日志、Redis、MySQL、对象存储。
- 统一消息队列:GDPR 主题收到
{"uid":"u123","action":"erase"}。 - 各组件监听消息:
- 日志:按 UID 文件名滚动删除,或采用 Hash 分片后覆写。
- Redis:执行
DEL cx:u123。 - MySQL:替换
email、phone为NULL,UPDATE语句写入伪删除时间戳。
- 回执:所有组件上报 ACK,中心服务汇总后邮件通知用户。
结论与开放讨论
经过两轮压测,我们把 P99 延迟从 2.3 s 压到 78 ms,意图识别准确率回到 93%,服务器节省 30%。但线上仍有一个“灰色地带”:当用户意图置信度 0.5~0.65 之间,系统到底该给出默认兜底回复,还是直接转人工客服?不同业务团队立场各异——产品希望“零误杀”,运营则怕人力爆炸。
你的团队会怎么选?或者有没有更聪明的第三种策略?欢迎留言一起拆招。