背景痛点:电商客服接口的三座大山
电商大促 0 点瞬间流量是日常的 30 倍,智能客服接口必须在 500 ms 内返回,否则用户直接转人工,成本翻倍。
- SLA 99 %:大促 1 h 内不可用时间 ≤ 36 s,任何一次 Full GC 或慢 SQL 都会超标。
- 多轮状态保持:用户问“优惠券”→“退货”→“优惠券还能用吗”,对话 ID 跨 3 次请求,状态不能丢。
- 突发流量:秒杀入口 5 s 内涌入 10 w 并发,线程池被打爆后整站雪崩。
技术对比:同步、异步、长连接怎么选
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 同步 REST | 开发简单,调试直观 | 线程阻塞,高并发下 CPU 空转 | 低频后台查询 |
| 异步队列(Celery/Kafka) | 削峰填谷,可重试 | 链路长,排障难 | 订单、物流等可延迟回答 |
| 长连接 WebSocket | 全双工,低延迟 | 连接管理复杂,内存占用高 | 网页端实时对话 |
| 短连接 HTTP/2 | 多路复用,兼容 CDN | 仍有 TLS 握手开销 | App 端主流方案 |
结论:
- C 端问答用短连接 + 异步队列兜底;
- B 端坐席后台用 WebSocket 推送,减少轮询。
核心实现:FastAPI + Redis 状态管理
以下示例基于 Python 3.11,单文件即可跑通,含 JWT 鉴权、异步推理、状态缓存。
1. 项目结构
ai-service/ ├─ main.py # FastAPI 入口 ├─ model.py # 业务模型 ├─ redis_pool.py # 连接池 └─ settings.py # 配置2. 依赖安装
pip install fastapi[all] redis httpx python-jose3. JWT 鉴权工具(时间复杂度 O(1))
# settings.py from datetime import datetime, timedelta from jose import jwt SECRET = "dev_secret" ALG = "HS256" def create_token(uid: str) -> str: payload = {"sub": uid, "exp": datetime.utcnow() + timedelta(hours=2)} return jwt.encode(payload, SECRET, algorithm=ALG)4. Redis 连接池(全局复用)
# redis_pool.py import aioredis from contextlib import asynccontextmanager POOL = aioredis.ConnectionPool.from_url( "redis://@127.0.0.1:6379/0", max_connections=50, # 根据容器规格调整 retry_on_timeout=True ) @asynccontextmanager async def get_redis(): redis = aioredis.Redis(connection_pool=POOL) try: yield redis finally: await redis.close() # 归还连接池,非断开5. 对话状态模型
# model.py from pydantic import BaseModel, Field from typing import List, Optional class Message(BaseModel): role: str # user / bot content: str class ChatReq(BaseModel): uid: str text: str dialog_id: Optional[str] = Field(None, description="为空时后端生成") class ChatResp(BaseModel): dialog_id: str answer: str cost_ms: int6. FastAPI 异步接口
# main.py import uuid, time, httpx from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from model import ChatReq, ChatResp from redis_pool import get_redis from settings import create_token app = FastAPI(title="E-commerce AI-CS") security = HTTPBearer() async def verify_token(cred: HTTPAuthorizationCredentials = Depends(security)): from jose import jwt try: payload = jwt.decode(cred.credentials, settings.SECRET, algorithms=[settings.ALG]) return payload["sub"] except Exception: raise HTTPException(status_code=401, detail="Invalid token") @app.post("/chat", response_model=ChatResp) async def chat(req: ChatReq, uid: str = Depends(verify_token)): start = time.time() dialog_id = req.dialog_id or uuid.uuid4().hex # 冲突概率 2^-122,可忽略 async with get_redis() as redis: # 拉取历史 key = f"dlg:{dialog_id}" history: List[Message] = [] raw = await redis.lrange(key, 0, -1) for item in raw: history.append(Message.parse_raw(item)) # 调第三方大模型(异步) answer = await call_llm(req.text, history) # 见下 # 写回缓存,设置 30 min 过期 pipe = redis.pipeline() pipe.lpush(key, Message(role="user", content=req.text).json()) pipe.lpush(key, Message(role="bot", content=answer).json()) pipe.expire(key, 1800) await pipe.execute() cost = int((time.time() - start) * 1000) return ChatResp(dialog_id=dialog_id, answer=answer, cost_ms=cost) async def call_llm(text: str, history: List[Message]) -> str: """调用外部大模型,带重试""" url = "https://api.llm.vendor/v1/completions" headers = {"Authorization": "Bearer YOUR_KEY"} payload = { "prompt": format_prompt(text, history), "max_tokens": 200, "temperature": 0.3 } async with httpx.AsyncClient(timeout=5) as client: for attempt in range(1, 4): # 最多 3 次 try: r = await client.post(url, json=payload) r.raise_for_status() return r.json()["choices"][0]["text"].strip() except Exception as e: if attempt == 3: return "系统繁忙,请稍后再试" await asyncio.sleep(0.5 * attempt)7. 运行
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4性能优化:把 QPS 从 500 拉到 3000
压测环境
- 4C8G 容器,单副本,FastAPI workers=4。
- 使用 locust,200 并发用户,持续 5 min。
优化前后对比
| 阶段 | 平均 RT | P99 RT | QPS | CPU |
|---|---|---|---|---|
| 初始同步 + 短连接 | 380 ms | 1.2 s | 520 | 95 % |
| ① 连接池复用 | 220 ms | 600 ms | 1100 | 80 % |
| ② 异步 LLM | 150 ms | 400 ms | 1800 | 75 % |
| ③ Redis Pipeline + 本地缓存热点答案 | 90 ms | 220 ms | 3200 | 65 % |
- 关键参数
# httpx 连接池 limits = httpx.Limits(max_keepalive=20, max_connections=100) # Redis 池见上文,max_connections=50 # FastAPI 层开启 gzip from fastapi.middleware.gzip import GZipMiddleware app.add_middleware(GZipMiddleware, minimum_size=500)- 本地缓存
对“发货时间”“会员等级”等命中率 > 5 % 的 query,直接缓存 JSON,TTL 60 s,减少 30 % Redis 流量。
避坑指南:三个血与泪的教训
对话 ID 冲突
uuid.uuid4 重复概率极低,但日志里仍出现“串号”。根因是 Nginx 转发时丢失了大小写,导致前端生成 ID 被截断。
→ 统一后端生成,前端只读;ID 限长 32 位十六进制,正则校验^[0-9a-f]{32}$。第三方重试策略
指数退避容易把瞬时故障拖成长尾。
→ 采用“线性退避 + 熔断”:失败 3 次熔断 10 s,同时返回兜底文案,避免用户空等。敏感信息过滤
用户会上传订单截图含手机号,模型可能原样返回。
→ 接入正则脱敏 + 人名库,命中即替换为*,并记录审计日志;正则耗时 ≤ 1 ms,对 RT 影响可忽略。
生产 checklist
- 日志:统一 trace-id,方便跨 Redis、LLM 链路追踪。
- 限流:令牌桶 500/s,超量返回 429,前端弹“客服繁忙”。
- 监控:Prometheus 采集
chat_qps、chat_latency_p99,告警阈值 P99 > 500 ms。 - 灰度:按 UID 尾号灰度 5 %,观察 30 min 无异常再全量。
思考题:如何设计分级降级策略?
当 LLM 侧 503 大面积报错时,你会先降级到检索式 FAQ,还是直接返回“人工客服稍后联系”?
欢迎分享你的分级条件、开关实现与数据回流方案。