背景痛点:传统客服系统在高并发场景下的三大顽疾
去年双十一,我们组负责的智能客服在零点瞬间被打爆:CPU 飙到 95%,TP99 延迟 3.8 s,用户排队 7 k+,老板在群里连发十个“”。复盘发现,老架构在三个地方掉链子:
- 突发流量无弹性:Tomcat 同步线程池打满后,新连接直接 502,水平扩容需要 3~5 min,而峰值 30 s 就过去了。
- 长上下文保持困难:会话状态放在 Redis String,每轮对话要回传 4 k token 的上下文,网络 RTT 20 ms 变成 200 ms,带宽翻倍。
- 意图识别准确率低:关键词+正则的 NLU 在促销语料上 F1 只有 0.72,一旦用户换种问法就“转人工客服”,人工坐席瞬间被冲垮。
痛定思痛,我们决定把“大模型”搬上线,但前提是不能再让延迟和成本失控,于是有了下面这套高并发低延迟的架构。
架构设计:一张图看懂三层解耦
接入层(Edge Layer)
- 统一接入网关基于 Envoy,支持 HTTP/2、gRPC、WebSocket 多协议,TLS 卸载后内网走明文,减少 1-RTT。
- 无状态设计,只做协议转换与限流,CPU 密集留给后方。
推理层(Inference Layer)
- 模型选型:Transformer 自回归解码,相比 RNN 并行度↑10×,长依赖靠 Attention,上下文 8 k token 无压力。
- 通信协议:gRPC streaming 双工,对比 WebSocket 省掉 20% 头部流量,且天然支持多路复用。
- 推理节点按“模型+缓存”同机部署,避免跨机取 KV Cache 的 50 μs 网络抖动。
数据层(Data Layer)
- 对话状态 → Redis Hash,字段级过期(TTL 30 min),减少大 Key 热迁移。
- 向量检索 → Milvus HNSW,128 维 embedding 平均 10 ms 召回。
- 日志链路:Kafka → Flink → Iceberg,离线训练与实时监控同源,避免样本偏差。
核心实现:Python 异步流水线示例
下面代码演示一次“用户提问→大模型回答”的完整异步路径,符合 PEP8,关键复杂度已标注。
# aio_gateway.py import asyncio, grpc, time from grpc.aio import insecure_channel from model_pb2 import QueryRequest, QueryResponse from model_pb2_grpc import LLMStub from circuit import CircuitBreaker # 自写熔断器 SEMA = asyncio.Semaphore(800) # 限制并发,O(1) 空间 CB = CircuitBreaker(failure=10, timeout=60) async def handle(request_id: str, uid: str, text: str) -> str: """Entrypoint: 网关 -> 推理层""" async with SEMA: start = time.perf_counter() # 1. 负载均衡:pick 1 台推理 pod,grpc 自带 round-robin channel = await insecure_channel("inference:50051") stub = LLMStub(channel) # 2. 组装 prompt,带上历史对话 prompt = await build_prompt(uid, text) # Redis 读取,O(1) # 3. 熔断 + 超时 resp = await CB.call(stub.Generate, QueryRequest(prompt=prompt), timeout=1.5) # 4. 后处理 await save_reply(uid, resp.answer) # 异步写,不阻塞 cost = int((time.perf_counter() - start)*1000) logger.info("req=%s latency=%d ms", request_id, cost) return resp.answer熔断器伪代码(时间复杂度 O(1)):
class CircuitBreaker: def __init__(self, failure: int, timeout: int): self.failure = failure self.timeout = timeout self.fail_cnt = 0 self.state = "closed" # closed/open/half-open self.last_open = 0 async def call(self, func, *args, **kw): if self.state == "open" and time.time() - self.last_open < self.timeout: raise RuntimeError("circuit open") try: res = await func(*args, **kw) self.fail_cnt = 0 self.state = "closed" return res except Exception as e: self.fail_cnt += 1 if self.fail_cnt >= self.failure: self.state = "open" self.last_open = time.time() raise负载均衡由 gRPC 内置的pick_first+round_robin完成,无需重复造轮子;若需自定义,可改custom_load_balancer.py继承grpc.lb_policy.base.Picker。
性能优化:压测数据与调参实录
- 压测环境:48 vCPU 物理机 * 10,模型 7B 参数量化到 int8,显存 6 GB。
- 指标对比(单节点):
| 策略 | QPS | TP99 / ms | 显存 GB | 备注 |
|---|---|---|---|---|
| 原始 fp16 | 120 | 580 | 13 | 无缓存 |
| + KV Cache | 280 | 220 | 13 | 复用历史 |
| + int8 量化 | 450 | 190 | 6 | 精度下降 0.8% |
| + 缓存+量化 | 800 | 160 | 6 | 目标达成 |
KV Cache 参数:
max_seq_len=8192, max_batch_size=64, cache_store_dtype=int8, layer_num=32
显存占用公式:2 * layer * head * hidden * seq * batch / 1024^3 ≈ 5.7 GB,与实测吻合。缓存策略:
对“热点问题”答案做 Redis 负一缓存(TTL=300 s),命中率 42%,回源量直接减半;向量检索结果缓存 10 s,防止相同问题反复刷 Milvus。
避坑指南:那些半夜 2 点的踩坑记录
对话状态管理
错误做法:把整段 JSON 塞 Redis String,每次全量读取→修改→写回,并发高时出现写冲突,数据漂移。
正确姿势:Hash 分散存储user_id:{turn_id -> json_blob},配合 Lua 脚本保证原子自增 turn_id,读写量减少 60%。敏感词过滤边界 case
场景:用户输入“加微信123”,中间插零宽空格\u200b。
解决:正则前统一 NFKC 归一化,再按字符级扫描,时间复杂度 O(n) 不变;同步维护 AC 自动机,多模式匹配一次完成。模型热更新
直接替换容器导致旧请求被杀,用户看到“回答一半”。
改用滚动 + 双缓存:新模型预热 100 请求后注册到 Consul,网关观察 p99 延迟 < 200 ms 才下掉旧节点,实现零中断。
总结与扩展:下一步往哪走
架构演进路线
单机推理 → 多机流水线(prefill/decode 分离)→ 动态批大小调度 → 边缘 GPU 联邦推理,目标把延迟压进 100 ms,成本再降 30%。可供继续探索的方向
- 投机解码(Speculative Decoding):小模型打草稿,大模型并行验证,理论 2× 加速。
- 长上下文压缩:基于旋转位置编码的“窗口滑动+召回”,把 32 k token 压到 4 k,显存节省 50%。
- 强化学习人类反馈(RLHF):用拒答率、满意度实时奖励,微调后人工转接率下降 15%。
把大模型搬进客服不是“换引擎”那么简单,而是把高并发、低延迟、状态一致性、成本优化全链路重新撸一遍。上面这套打法帮我们扛住 6w QPS 的秒杀峰值,TP99 稳在 160 ms,成本只有原来的 38%。如果你也在做类似系统,希望这份笔记能让你少走几个夜班。