coze搭建智能客服记录对话内容并分析购买意向
1. 背景痛点:对话数据“乱成一锅粥”
做电商客服的同学都懂,用户一句“有没有优惠”可能出现在售前、售中、售后任何一个环节。传统工单系统把聊天记录当普通日志一扔了之,检索全靠 Ctrl+F,结果:
- 活动复盘时,运营想拉“所有问过优惠券且最终下单”的用户,研发得写脚本全表扫描,动辄半小时。
- 规则引擎写“关键词=优惠→意向=高”,结果用户说“优惠力度不如隔壁”,被误判为高意向,客服跟进后转化率不到5%。
一句话:非结构化数据+僵硬规则,让“对话记录”变“对话累赘”,意向识别也陷入准确率瓶颈。
2. 技术方案:让Coze当“中枢”,BERT当“翻译”
2.1 存储选型:MongoDB vs PostgreSQL
- MongoDB:文档型,单条对话自带数组式消息体,水平扩展友好;但复杂事务弱,统计SQL要写聚合管道。
- PostgreSQL:jsonb也能存半结构数据,事务强,窗口函数一把梭;但高并发写放大,磁盘IO容易顶到瓶颈。
实测百万级对话、读多写少场景,MongoDB综合延迟更低,最终敲定MongoDB做主存,PostgreSQL做日批报表库。
2.2 Coze消息持久化API
Coze对外暴露conversation.append与conversation.snapshot两个钩子:
append:用户每发一句,平台POST给我们,含user_id、msg_id、timestamp、raw_text。snapshot:客服关闭会话时触发,一次性推送整段对话数组。
官方要求:
- 响应200且在3s内回ACK,否则重推。
- 需回传msg_id列表做幂等,避免重复计算意向。
2.3 BERT意图分类工程化
直接用bert-base-chinese微调三分类:高意向/中意向/无意向。训练样本1.2万条,F1=0.89。上线时把模型转ONNX,配合ONNXRuntime-GPU,P99延迟<80ms,满足实时。
3. 核心实现:Python代码全链路
3.1 对话数据ETL管道
# pipelines/save_dialog.py import os, pymongo, requests, backoff from datetime import datetime MONGO_URI = os.getenv("MONGO_URI") client = pymongo.MongoClient(MONGO_URI) db = client["coze_dialog"] col = db["dialog_raw"] @backoff.on_exception(backoff.expo, requests.exceptions.Timeout, max_time=30) def ack_to_coze(msg_ids: list): """必须回ACK,保证幂等""" return requests.post( "https://api.coze.com/v1/ack", json={"msg_ids": msg_ids}, timeout=2 ) def process(batch: list): """批量写+异常重试""" for msg in batch: msg["stored_at"] = datetime.utcnow() try: col.insert_many(batch, ordered=False) ack_to_coze([m["msg_id"] for m in batch]) except pymongo.errors.BulkWriteError as e: # 重复msg_id会抛出,捕获后继续ACK ack_to_coze([m["msg_id"] for m in batch]) # Flask钩子入口 from flask import Flask, request, jsonify app = Flask(__name__) @app.route("/coze", methods=["POST"]) def ingest(): data = request.get_json() process(data["events"]) # events即对话数组 return jsonify({"code": 200})3.2 实时意向分析RESTful接口
# services/intent_svc.py from fastapi import FastAPI from pydantic import BaseModel from transformers import AutoTokenizer, AutoModelForSequenceClassification import torch, onnxruntime as ort app = FastAPI() sess = ort.InferenceSession("intent_cls.onnx") tokenizer = AutoTokenizer.from_pretrained("bert-base-chinese") class Item(BaseModel): text: str @app.post("/predict") def predict(item: Item): inputs = tokenizer(item.text, return_tensors="pt", truncation=True, max_length=64) logits = sess.run(None, { "input_ids": inputs["input_ids"].numpy(), "attention_mask": inputs["attention_mask"].numpy() })[0] prob = torch.softmax(torch.tensor(logits), dim=1) label = ["无", "中", "高"][int(prob.argmax())] return {"label": label, "prob": round(float(prob.max()), 3)}3.3 事件驱动架构图
- Coze → 消息网关 → Kafka → Flink → MongoDB & 意向服务
- 采用Kafka削峰,单partition可支撑5k QPS;Flink做10s窗口窗口聚合,回写Redis供大屏实时查询。
4. 生产考量:高并发与合规
4.1 百万QPS下的Redis缓存
- 读热点:用户实时意向分,采用
String+TTL=300s,本地布隆过滤器防穿透。 - 写热点:会话快照推MongoDB后,写Redis队列供后续消息系统消费,用
list+lpush+brpop减轻mongo读压力。 - 集群:Redis-cluster,16分片,每片上限8G;开启
activedefrag避免大key。
4.2 GDPR合规过滤
- 敏感词正则+NER双重过滤,手机、身份证、邮箱一律掩码。
- 提供
/data/erase接口,收到用户注销指令后,异步扫描MongoDB+S3,72h内物理删除,记录审计日志。
5. 避坑指南:踩过的坑,帮你先填
5.1 会话超时处理
常见错误:把Redis TTL当“会话结束”信号,结果用户短暂离线>30min,TTL失效,新消息生成新session,导致同一次购买被拆成两条对话,意向分失真。
正确姿势:Coze自带session.expire事件,监听该事件再标记数据库is_finished=true,保证统计口径一致。
5.2 模型热更新
ONNX模型文件替换瞬间,推理进程可能加载到一半,导致shape不匹配直接coredump。
方案:采用双模型目录models/v1、models/v2,更新时先把新版本写v2,发送USR1信号,服务捕捉信号后atomic地切换session指针,旧请求继续跑完,新请求用新模型,实现零中断。
6. 效果复盘
上线两周,客服人均日处理会话量从220条提升到310条,响应速度提升42%;高意向用户识别准确率由73%提到88%,营销转化率提高5.7个百分点。老板原话:“终于不用天天拉会复盘谁漏跟了高意向客户。”
7. 延伸思考题
- 长对话超过50轮时,BERT 512 token上限导致上下文截断,意向分骤降,你会如何改造模型?
- 如果平台要求“对话实时翻译英文再分析”,你会把翻译模块插在哪一环节,才能保证延迟<200ms?
- 当用户开启语音输入,语音识别错误率10%,你会如何设计鲁棒意图识别,避免错误文本放大偏差?
把这三个问题想透,你的智能客服就离“真正懂用户”更近一步。祝各位落地顺利,少踩坑,多拿绩效。