最近在做一个智能客服系统的升级项目,老系统基于规则匹配,遇到复杂问题或者多轮对话时,体验确实一言难尽。用户反馈经常答非所问,或者聊着聊着就忘了之前说过什么。技术指标上也很明显,P99响应延迟经常超过2秒,长对话的意图准确率掉得厉害。这促使我们去寻找一个能更好管理对话状态和流程的框架,最终选型并实践了LangGraph,效果提升显著。这里把整个架构设计和实战中的一些关键点记录下来,供有类似需求的同学参考。
背景痛点:为什么规则引擎不够用了?
我们之前的系统是典型的“if-else”规则树加上一个基础的意图分类模型。这套架构在初期简单场景下跑得还行,但随着业务复杂,问题就暴露出来了。
- 上下文断裂:这是最头疼的。用户问“我想订一张明天去北京的机票”,系统识别出“订机票”意图。用户接着问“那后天呢?”,规则引擎很难把“后天”和上一句的“订机票”意图关联起来,常常会当作一个新问题处理,导致对话逻辑断裂。
- 意图识别僵化:规则和传统分类模型对表达方式的泛化能力弱。用户说“帮我找个能修马桶的师傅”和“马桶堵了,急需上门疏通”,本质是同一个“预约维修”意图,但句式不同就可能匹配失败或触发不同分支。
- 流程维护成本高:每个业务场景的对话流程(比如退货、查物流、转人工)都需要手动编写大量的状态跳转逻辑。增加一个新业务,开发和测试周期很长,而且各流程间的状态难以复用和共享。
- 性能瓶颈:为了维持对话状态,我们用了外部数据库存储会话历史。每次请求都需要“读历史->分类->查规则->写历史”,链条长,导致P99延迟很高(>2s),在流量高峰时体验更差。
这些痛点让我们意识到,需要一个有状态的、可编程的、且能清晰定义对话流程的框架来重构核心引擎。
技术选型:为什么是LangGraph?
在LLM应用开发栈里,LangChain知名度很高,我们最初也用它做了原型。但在深入设计客服这种多轮、有状态、需严格流程控制的场景时,我们发现LangGraph更对味。
简单来说,LangChain更像一个丰富的“工具箱”,提供了各种与LLM交互的组件(Chains, Agents, Tools)。而LangGraph则是一个专注于构建有状态多智能体工作流的框架,它明确引入了“状态”的概念和基于图的执行模型。
两者的核心差异点:
- 状态管理:LangGraph内置了
StateGraph和GraphState的概念,状态在整个图的执行过程中自动传递和更新,这是实现连贯对话的基石。在LangChain中,状态传递需要开发者自己通过链式调用的输入输出来管理,不够直观和结构化。 - 执行模型:LangGraph将工作流明确定义为一个有向无环图(DAG)。节点是处理函数(如:意图识别、信息查询、回复生成),边定义了节点的执行顺序和条件。这种图结构让复杂的、带分支的对话流程变得一目了然,且易于调试。LangChain的SequentialChain虽然也是顺序执行,但缺乏这种显式的、可视化的图结构来描述复杂分支。
- 并行与循环:LangGraph天然支持基于条件的分支(
conditional edges)和循环(将边指向之前的节点),这对于实现“根据识别结果走不同流程”或“信息不足时追问用户”的客服场景至关重要。在LangChain中实现类似逻辑会更复杂。
下图展示了一个简化的智能客服LangGraph工作流架构:
这个图清晰地描述了消息处理的管道:先进行预处理和敏感词过滤,然后进行意图识别,根据意图结果决定是走“信息查询”、“业务办理”还是“转人工”路径,最后统一生成回复。这种设计让系统流程变得非常清晰。
核心实现:构建Graph与状态管理
1. 定义GraphState
一切始于状态的定义。我们用一个Pydantic模型来封装整个对话生命周期中需要维护的所有数据。
from typing import TypedDict, List, Annotated import operator from langgraph.graph import StateGraph, END from pydantic import BaseModel class GraphState(TypedDict): """ 定义图执行过程中的状态流。 所有节点函数都接收并返回这个状态字典。 """ # 用户当前输入的问题 user_input: str # 清理后的输入(如去除敏感词后) cleaned_input: str # 识别出的意图,如 'greeting', 'query_order', 'complain' intent: str # 从输入中提取的实体信息,如订单号、日期等 entities: dict # 对话历史,用于维护上下文 chat_history: List[Annotated[dict, operator.add]] # 当前节点收集到的、用于最终回复的信息 collected_data: dict # 标记是否需要人工介入 need_human: bool2. 构建工作流图
接下来,我们创建StateGraph,并添加节点和边。
# 初始化图 workflow = StateGraph(GraphState) # 1. 定义各个节点函数 def preprocess_node(state: GraphState) -> GraphState: """节点1:输入预处理,包括敏感词过滤、标准化等。""" cleaned = sensitive_filter(state['user_input']) state['cleaned_input'] = cleaned return state def intent_classify_node(state: GraphState) -> GraphState: """节点2:意图识别,调用LLM或分类模型。""" # 这里可以结合chat_history进行更精准的分类 intent = call_intent_model(state['cleaned_input'], state['chat_history']) state['intent'] = intent return state def route_node(state: GraphState) -> GraphState: """节点3:路由决策,根据意图决定下一个节点。""" intent = state['intent'] if intent == 'query_order': return 'query_db_node' elif intent == 'cancel_order': return 'business_flow_node' elif intent == 'human': state['need_human'] = True return 'generate_response_node' # 直接跳转到生成回复,准备转人工话术 else: return 'general_qa_node' def query_db_node(state: GraphState) -> GraphState: """节点4:查询数据库。""" order_id = state['entities'].get('order_number') data = database.query_order(order_id) state['collected_data'] = data return state def generate_response_node(state: GraphState) -> GraphState: """最终节点:合成所有信息,生成最终回复。""" if state['need_human']: response = "您的问题需要人工协助,正在为您转接..." else: response = llm_generate(state['chat_history'], state['collected_data']) # 将本轮对话加入历史 state['chat_history'].append({ 'user': state['user_input'], 'assistant': response }) state['response'] = response return state # 2. 将节点添加到图中 workflow.add_node("preprocess", preprocess_node) workflow.add_node("intent_classify", intent_classify_node) workflow.add_node("route", route_node) workflow.add_node("query_db", query_db_node) workflow.add_node("generate_response", generate_response_node) # 3. 设置图的入口和边 workflow.set_entry_point("preprocess") workflow.add_edge("preprocess", "intent_classify") workflow.add_edge("intent_classify", "route") # 条件边:由route_node的返回值动态决定下一个节点 workflow.add_conditional_edges( "route", route_node, # 这个函数返回下一个节点的名称 { "query_db_node": "query_db", "business_flow_node": "business_flow_node", # 其他节点需定义 "general_qa_node": "general_qa_node", "generate_response_node": "generate_response" } ) workflow.add_edge("query_db", "generate_response") workflow.add_edge("generate_response", END) # 4. 编译图 app = workflow.compile()3. 对话记忆的持久化存储
在生产环境中,chat_history不能只放在内存里。我们使用Redis作为持久化存储,并利用连接池提升性能。
import redis from redis.connection import ConnectionPool import json import pickle # 注意性能陷阱,后面会讲 class DialogueMemoryManager: def __init__(self, redis_url: str): # 创建Redis连接池,避免频繁创建断开连接的开销 self.pool = ConnectionPool.from_url(redis_url, max_connections=10, decode_responses=False) self._redis = redis.Redis(connection_pool=self.pool) # 设置会话过期时间,例如30分钟无活动则清除 self.session_ttl = 1800 def save_session(self, session_id: str, state: GraphState): """将会话状态保存到Redis。""" # 关键:对state进行序列化。GraphState本质是字典。 # 简单场景可以用json,但复杂对象可能需pickle。 serialized_state = pickle.dumps(state) self._redis.setex(f"chat_session:{session_id}", self.session_ttl, serialized_state) def load_session(self, session_id: str) -> GraphState: """从Redis加载会话状态。""" data = self._redis.get(f"chat_session:{session_id}") if data: return pickle.loads(data) # 如果不存在,返回一个初始化的空状态 return { 'user_input': '', 'cleaned_input': '', 'intent': '', 'entities': {}, 'chat_history': [], 'collected_data': {}, 'need_human': False } def add_to_history(self, session_id: str, user_msg: str, assistant_msg: str): """便捷方法:直接更新历史记录。""" state = self.load_session(session_id) state['chat_history'].append({'user': user_msg, 'assistant': assistant_msg}) self.save_session(session_id, state) # 使用示例 memory_manager = DialogueMemoryManager("redis://localhost:6379/0") # 在处理请求时 session_id = request.headers.get('X-Session-ID') initial_state = memory_manager.load_session(session_id) initial_state['user_input'] = user_query # 运行图 final_state = app.invoke(initial_state) # 保存更新后的状态 memory_manager.save_session(session_id, final_state) response = final_state['response']4. 异常处理与熔断策略
分布式系统中,对LLM或外部服务的调用必须要有熔断机制,防止一个慢节点拖垮整个系统。
from circuitbreaker import circuit import time class ExternalServiceUnavailable(Exception): pass @circuit(failure_threshold=5, expected_exception=ExternalServiceUnavailable, recovery_timeout=30) def call_llm_with_fallback(prompt: str, chat_history: list) -> str: """ 调用LLM服务,集成熔断器。 failure_threshold: 连续失败5次后熔断 recovery_timeout: 熔断30秒后进入半开状态尝试恢复 """ try: # 模拟调用,这里替换为实际的LLM API调用 # response = openai_chat_completion(...) response = mock_llm_call(prompt, chat_history) return response except (TimeoutError, ConnectionError) as e: # 记录失败次数,达到阈值后熔断器会打开,后续调用直接快速失败 raise ExternalServiceUnavailable(f"LLM service error: {e}") from e def mock_llm_call(prompt: str, history: list) -> str: # 模拟偶尔的失败 if time.time() % 10 > 8: # 假设20%的失败率 raise TimeoutError("LLM API timeout") return "This is a mock LLM response." # 在节点函数中使用 def general_qa_node(state: GraphState) -> GraphState: try: answer = call_llm_with_fallback(state['cleaned_input'], state['chat_history']) state['collected_data']['answer'] = answer except ExternalServiceUnavailable: # 熔断打开或服务不可用,使用降级策略 state['collected_data']['answer'] = "系统当前繁忙,请稍后再试。" state['need_human'] = True # 可以考虑转人工 return state生产考量:让系统健壮可靠
1. 负载测试方案
系统上线前,我们用Locust进行了压力测试,模拟用户并发提问。
# locustfile.py from locust import HttpUser, task, between import uuid class ChatbotUser(HttpUser): wait_time = between(1, 3) # 用户思考时间1-3秒 session_id = str(uuid.uuid4()) @task def send_message(self): headers = {"X-Session-ID": self.session_id, "Content-Type": "application/json"} # 模拟不同类型的用户问题 questions = [ "我的订单123456到哪里了?", "怎么退货?", "人工客服", "谢谢你的帮助。" ] import random payload = {"message": random.choice(questions)} # 发送请求到我们的智能客服API端点 response = self.client.post("/v1/chat", json=payload, headers=headers) # 可以在这里对响应时间或状态码进行断言 assert response.status_code == 200 assert response.elapsed.total_seconds() < 2.0 # P95响应时间应小于2秒通过调整并发用户数,我们找到了系统的瓶颈(最初是Redis连接数),并进行了优化。
2. API网关与JWT鉴权
所有请求首先经过API网关(如Kong或Nginx + Lua),在这里完成统一的鉴权、限流和日志记录。
- JWT验证:网关验证请求头中的
Authorization: Bearer <token>。JWT的payload中应包含租户ID(tenant_id)、用户ID(user_id)和权限范围。验证通过后,网关将X-Tenant-ID和X-User-ID等头信息转发给后端的智能客服服务。 - 会话亲和性(Session Affinity):虽然状态存在Redis,但为了进一步提升缓存效率,网关可以根据
X-Session-ID将同一会话的请求路由到同一个后端服务实例。
3. 敏感词过滤的异步优化
敏感词过滤是一个CPU密集型或I/O密集型(如果词库在DB)操作。为了不阻塞主对话流程,我们将其异步化。
import asyncio import ahocorasick # 使用Aho-Corasick算法进行高效多模式匹配 class AsyncSensitiveFilter: def __init__(self): self.automaton = ahocorasick.Automaton() # 初始化加载敏感词库 self.load_keywords() self.automaton.make_automaton() def load_keywords(self): # 从文件或数据库加载 keywords = ["违规词1", "不良词2"] for idx, word in enumerate(keywords): self.automaton.add_word(word, (idx, word)) async def filter_text(self, text: str) -> str: """异步过滤文本,返回清理后的文本和标记。""" # 将计算密集型任务放到线程池中运行,避免阻塞事件循环 loop = asyncio.get_event_loop() cleaned_text, found_words = await loop.run_in_executor( None, self._sync_filter, text ) if found_words: # 记录日志或触发告警 log_sensitive_attempt(found_words) return cleaned_text def _sync_filter(self, text: str): found_positions = [] # 找出所有敏感词位置 for end_index, (_, original_word) in self.automaton.iter(text): start_index = end_index - len(original_word) + 1 found_positions.append((start_index, end_index)) # 根据位置进行替换(如替换为*) # ... 替换逻辑 ... cleaned = text for start, end in reversed(found_positions): # 从后往前替换,避免索引变化 cleaned = cleaned[:start] + '*' * (end - start + 1) + cleaned[end+1:] return cleaned, [text[start:end+1] for start, end in found_positions] # 在预处理节点中异步调用 async def preprocess_node_async(state: GraphState) -> GraphState: filter_tool = AsyncSensitiveFilter() cleaned = await filter_tool.filter_text(state['user_input']) state['cleaned_input'] = cleaned return state避坑指南:实践中遇到的挑战
1. 避免循环依赖的Graph设计
在定义图时,如果不小心让A节点指向B,B又指回A,且没有终止条件,就会形成死循环。LangGraph虽然允许循环(用于多轮交互),但必须有明确的退出条件。
建议模式:对于需要循环的流程(比如不断追问用户直到信息收集完整),设计一个专门的“信息收集”子图或节点。该节点检查collected_data是否满足条件,如果满足,就通过条件边指向生成回复节点;如果不满足,则生成一个追问问题并暂停,将问题返回给用户,等待用户下一次输入。循环的驱动者是用户的连续请求,而不是图内部的无限循环。
2. 对话状态序列化的性能陷阱
我们最初直接使用pickle序列化整个state字典。后来发现,当chat_history变得很长时,序列化和反序列化的开销巨大,Redis的读写延迟明显增加。
优化方案:
- 分离存储:将频繁更新的
chat_history和相对稳定的intent、entities等分开存储。chat_history可以存成Redis List,用LPUSH和LRANGE操作,避免每次读写整个大对象。 - 压缩历史:不要无限制存储原始对话。可以使用LLM对长历史进行摘要(Summarization),只保留摘要和最近几轮对话,大幅减少数据量。
- 选择序列化工具:对于剩下的状态字典,可以测试
msgpack、orjson等相比pickle更高效、更安全的序列化库。
3. 多租户场景下的资源隔离
一个系统服务多个客户(租户)时,隔离至关重要。
- 数据隔离:在Redis键设计中加入
tenant_id,如chat_session:{tenant_id}:{session_id}。确保一个租户的数据绝不会被另一个租户访问到。 - 计算资源隔离:可以为不同优先级的租户配置不同的LLM调用队列和限流策略。例如,VIP租户的请求进入高优先级队列,并使用更快的LLM模型。
- 配置隔离:每个租户可能有自己的敏感词库、业务规则和问答知识库。这些配置需要在系统层面支持按租户加载和切换。
结语与开放性问题
经过这次基于LangGraph的重构,我们的智能客服系统在吞吐量上提升了约40%,冷启动(新会话首句)耗时降低了30%,更重要的是,长对话的意图保持和用户体验有了质的飞跃。LangGraph的图编程模型让复杂的对话逻辑变得可描述、可维护。
当然,引入LLM和如此灵活的框架也带来了新的挑战和思考:
- 成本与速度的平衡:LLM的调用是主要的成本和延迟来源。如何设计策略?是否所有请求都需要调用LLM?能否用更小的模型处理简单意图,大模型只处理复杂问题?缓存LLM的常见回答是否可行且安全?
- 评估与迭代:如何自动化评估这种基于图的智能客服的效果?传统的准确率指标可能不够,需要结合对话完成率、用户满意度、转人工率等多维度指标。如何建立高效的闭环迭代流程,快速从badcase中学习并更新图节点或流程?
- 图的复杂度管理:当业务极其复杂,图变得非常庞大时,可视化、调试和版本管理会成为新的挑战。是否需要引入“子图”的概念进行模块化?如何对图进行有效的单元测试和集成测试?
这些问题没有标准答案,需要我们在实践中继续探索。希望这篇笔记能为你搭建自己的智能客服系统提供一些切实可行的思路和代码参考。