背景痛点:自建AI客服最怕的三连击
第一次在公司内部落地 AI 客服时,我踩的坑比写的代码还多。总结下来就是“三连击”:
- 意图识别漂移:上线第一周准确率 92%,第二周掉到 78%,用户换了个问法就被打回原形。
- 多轮对话状态丢失:用户问“那运费呢?”,机器人回“请问您想了解哪座城市?”——上下文完全断片。
- 并发响应延迟:促销高峰期,200 QPS 直接把 Flask 打挂,客服群里瞬间刷屏“机器人又挂了”。
这三件事如果不在架构阶段解决,后面每改一次都是“拆东墙补西墙”。下文就按“选型→实现→压测→上线”四步走,把我在生产环境验证过的套路一次性摊开。
技术选型:为什么最后留下 TensorFlow+Flask
| 框架/平台 | 月活 5 万条请求成本 | 定制化自由度 | 本地部署 | 结论 |
|---|---|---|---|---|
| Dialogflow ES | 约 450 美元 | 低,实体模型黑盒 | 不可 | 小团队钱包先扛不住 |
| AWS Lex | 约 600 美元 | 中,Lambda 钩子受限 | 不可 | 同上,且中文支持一般 |
| Rasa 3.x | 0 美元 | 高,源码级修改 | 可 | 学习曲线陡,GPU 依赖大 |
| 自研 TF+Flask | 0 美元 | 极高,组件可插拔 | 可 | 需要踩坑,但长期最省钱 |
我们团队只有 3 名 Python 后端 + 1 名算法,综合“钱包”和“可控性”后,决定用 TensorFlow 做意图模型,Flask 只负责轻量级 HTTP 接口,复杂对话管理拆到独立微服务。这样既能本地热更新模型,又不会被云厂商 API 的限速卡脖子。
核心实现:BERT 微调 + 状态机 + 消息队列
1. 数据准备:清洗 + 增强
公司内部历史工单 1.8 万条,先过一遍正则清洗:
import re, json, pandas as pd def clean_utterance(text: str) -> str: """ 移除网址、表情、手机号 """ text = re.sub(r'http[s]?://\S+', '', text) text = re.sub(r'1[3-9]\d{9}', '', text) text = text.strip() return text or 'unknown' df = pd.read_csv('raw_ticket.csv') df['text'] = df['text'].apply(clean_utterance) df.to_csv('clean.csv', index=False)再做同义词回译增强:中文→英文→中文,用 googletrans 批量跑,平均每条扩 3 倍,意图类别不变,结果准确率提升 4.3%。
2. BERT 微调意图分类
用 bert-base-chinese,加一层 Dense 输出 18 个意图。关键代码片段:
from transformers import TFBertForSequenceClassification import tensorflow as tf class BertIntent: def __init__(self, num_labels: int, lr: float = 2e-5): self.model = TFBertForSequenceClassification.from_pretrained( 'bert-base-chinese', num_labels=num_labels) self.optimizer = tf.keras.optimizers.Adam(learning_rate=lr) self.loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) @tf.function def train_step(self, x, y): with tf.GradientTape() as tape: logits = self.model(x).logits loss = self.loss(y, logits) gradients = tape.gradient(loss, self.model.trainable_variables) self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables)) return loss训练 5 个 epoch,验证集 F1 0.91,导出 SavedModel 供线上 TF Serving 调用。
3. 对话状态机:Redis + 有限状态机
多轮场景举例:用户想开发价,必须先提供“商品型号+数量+地址”。用 Redis Hash 存状态,key 为session:{user_id},field 存“intent、slot、turn”。
状态转移图如下:
核心代码:
import redis, json from enum import Enum, auto class State(Enum): INIT = auto() AWAIT_MODEL = auto() AWAIT_QTY = auto() AWAIT_CITY = auto() COMPLETE = auto() class DialogManager: def __init__(self, redis_host='127.0.0.1'): self.r = redis.Redis(host=redis_host, decode_responses=True) def get_state(self, uid: str) -> State: s = self.r.hget(f'session:{uid}', 'state') return State[s] if s else State.INIT def transition(self, uid: str, intent: str, slots: dict): cur = self.get_state(uid) if cur == State.INIT and intent == 'quotation': self._save(uid, State.AWAIT_MODEL, slots) return '请问具体型号?' if cur == State.AWAIT_MODEL and intent == 'inform': self._save(uid, State.AWAIT_QTY, slots) return '需要多少件?' # ... 更多分支 return '默认兜底回复' def _save(self, uid: str, state: State, slots: dict): self.r.hset(f'session:{uid}', mapping={ 'state': state.name, 'slots': json.dumps(slots) }) self.r.expire(f'session:{uid}', 600) # TTL 10 分钟4. 异步消息队列:Celery + RabbitMQ
Flask 只负责鉴权 + 把请求丢给 Celery,worker 节点可水平扩展:
from celery import Celery cel = Celery('ai_bot', broker='pyamqp://guest@localhost//') @cel.task(bind=True, max_retries=2) def infer_intent(self, text: str): try: # 调 TF Serving REST resp = requests.post(TF_SERVING_URL, json={"instances": [text]}, timeout=3) return resp.json()['predictions'][0] except requests.exceptions.Timeout: raise self.retry(countdown=1)压测时 worker 开到 8 实例,可稳吃 800 QPS,p99 延迟 220 ms。
代码规范:PEP8 与异常处理示例
关键函数必须写 docstring,异常捕获至少包三层:
def query_knowledge(slots: dict) -> str: """ 根据槽位查询知识库,返回自然语言答案 :param slots: 槽位字典,必须含 model, qty, city :return: 答案文本 :raises: ValueError 参数缺失;RuntimeError 查询失败 """ if not all(k in slots for k in ('model', 'qty', 'city')): raise ValueError('缺少必要槽位') try: r = requests.post(KNOWLEDGE_API, json=slots, timeout=2) r.raise_for_status() except requests.exceptions.RequestException as e: raise RuntimeError('知识库不可用') from e return r.json()['answer']网络超时、JSON 解析、Redis 连接全部 try/except 后打日志,再抛自定义业务异常,方便 Sentry 聚合报警。
生产考量:压测、鉴权与敏感词
1. 压力测试:Locust 脚本
from locust import HttpUser, task, between class BotUser(HttpUser): wait_time = between(1, 2) @task(10) def ask_price(self): self.client.post("/chat", json={"uid": "u100", "text": "这款笔记本多少钱?"})单机 4 进程可模拟 2k 并发,CPU 瓶颈先出现在 TF Serving GPU 节点,Flask 端无恙。
2. JWT 鉴权 + 敏感信息过滤
- 网关层统一校验 JWT,uid 写在 sub 字段,拒绝无 Token 请求。
- 返回前过一遍敏感词树(AC 自动机),命中 ** 替换,并 log 审计。
避坑指南:冷启动与缓存 TTL
- 冷启动无历史数据,模型会胡乱分类。上线前先用“相似问法生成+置信度阈值”双保险:置信度 < 0.65 时走默认回复“亲,转人工客服”。
- TTL 别省内存设太长,实测 10 分钟最佳;超过后用户再说话,状态机回到 INIT,不会张冠李戴。
- GPU 显存碎片化让 TF Serving 自己重启,加
--monitoring_config_file定时收集显存,>90% 就滚动重启,用户侧只损 1 次 502,重试即可。
延伸思考:把知识图谱接进来
当用户问“那款显卡和主板兼容吗?”需要多跳推理。下一步可把商品知识图谱(Neo4j)接入:
- 实体链接:把“显卡”“主板”映射到图谱节点;
- 路径查询:用 Cypher
MATCH p=(c:Card)-[:COMPATIBLE]->(b:Board) RETURN b.name; - 答案生成:把子图序列化喂给 T5 做文本化,返回自然句。
demo 环境跑通后,单轮能回答 2-hop 问题,准确率再提 7%,但图谱更新延迟要 <5 min,否则会出现“刚下架的商品仍推荐”的尴尬。
以上就是在生产环境跑通“AI 机器人智能体客服”的完整路线。回头看,最大感受是:别把全部希望押在模型准确率,状态机和工程容错才是让用户“不骂娘”的关键。先把骨架搭稳,再一点点喂数据、调阈值、加图谱,客服机器人才有机会从“人工智障”进化成“人工智能”。祝你落地顺利,少踩坑,多睡觉。