一、核心内容
接口层落地核心内容,聚焦解决 “智能客服的后端接口标准化、LangGraph 工作流与 API 无缝对接、请求响应处理、异常兼容” 问题,实现从 “本地工作流” 到 “可网络调用的智能客服服务” 的转化,为前端 / Vue 对接提供稳定接口支撑。
二、FastAPI 核心接口设计(实战重点)
视频围绕智能客服的核心场景(用户提问、会话管理、历史查询),设计 RESTful 风格接口,兼顾易用性与扩展性:
(一)接口开发前置准备
- 安装核心依赖
bash
运行
# FastAPI核心+Uvicorn服务器 pip install fastapi uvicorn # 数据校验+序列化 pip install pydantic python-multipart # LangGraph依赖(复用之前工作流) pip install langgraph langchain- 项目目录结构(接口层 + 工作流层分离)
plaintext
fastapi-langgraph-cs/ ├── main.py # FastAPI入口文件(接口定义) ├── schemas.py # 请求/响应数据模型(Pydantic) ├── langgraph_workflow/ # LangGraph智能客服工作流(复用/改造) │ ├── __init__.py │ ├── agent.py # 客服智能体 │ └── workflow.py # 对话工作流(问答/上下文管理) └── utils/ # 工具类(日志/异常处理) ├── __init__.py └── log_util.py(二)核心数据模型(schemas.py)
使用 Pydantic 定义请求 / 响应格式,确保数据校验与类型安全:
python
运行
from pydantic import BaseModel from typing import Optional, List, Dict from datetime import datetime # 会话基础信息模型 class SessionInfo(BaseModel): session_id: str # 会话唯一标识(前端传递/后端生成) user_id: Optional[str] = None # 用户ID(可选) create_time: Optional[datetime] = datetime.now() # 会话创建时间 # 提问请求模型(用户输入) class QueryRequest(BaseModel): session_info: SessionInfo query: str # 用户提问内容 context: Optional[List[Dict[str, str]]] = None # 历史对话上下文([{role: user/assistant, content: ...}]) top_k: Optional[int] = 3 # RAG检索TopK(可选,默认3) # 回答响应模型(后端返回) class QueryResponse(BaseModel): code: int = 200 # 状态码(200成功/500失败) message: str = "success" # 状态描述 data: Optional[Dict] = None # 响应数据 error: Optional[str] = None # 错误信息(失败时返回) # 历史对话查询请求 class HistoryRequest(BaseModel): session_id: str # 会话ID user_id: Optional[str] = None # 历史对话响应 class HistoryResponse(BaseModel): code: int = 200 message: str = "success" data: Optional[List[Dict[str, str]]] = None # 历史对话列表(三)核心接口实现(main.py)
定义 3 个核心接口,覆盖智能客服完整流程:
python
运行
from fastapi import FastAPI, HTTPException from schemas import QueryRequest, QueryResponse, HistoryRequest, HistoryResponse from langgraph_workflow.workflow import客服_workflow # 复用LangGraph工作流 from utils.log_util import logger import uuid # 创建FastAPI实例 app = FastAPI(title="LangGraph+FastAPI智能客服接口", version="1.0.0") # 1. 智能客服提问接口(核心) @app.post("/api/chat/query", response_model=QueryResponse) async def chat_query(request: QueryRequest) -> QueryResponse: try: # 生成/验证会话ID session_id = request.session_info.session_id or str(uuid.uuid4()) logger.info(f"会话{session_id}:用户提问 - {request.query}") # 调用LangGraph工作流处理(问答+上下文管理) workflow_result = 客服_workflow.run( session_id=session_id, query=request.query, context=request.context, top_k=request.top_k ) # 构造响应 return QueryResponse( data={ "session_id": session_id, "response": workflow_result["answer"], # 客服回答 "context": workflow_result["updated_context"], # 更新后的上下文 "retrieved_docs": workflow_result.get("retrieved_docs", []) # RAG检索文档(可选返回) } ) except Exception as e: logger.error(f"会话{request.session_info.session_id}:提问处理失败 - {str(e)}") return QueryResponse( code=500, message="failed", error=str(e) ) # 2. 历史对话查询接口 @app.post("/api/chat/history", response_model=HistoryResponse) async def chat_history(request: HistoryRequest) -> HistoryResponse: try: # 调用工作流的历史查询方法 history = 客服_workflow.get_history(session_id=request.session_id, user_id=request.user_id) return HistoryResponse(data=history) except Exception as e: logger.error(f"查询会话{request.session_id}历史失败 - {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # 3. 会话关闭接口(可选) @app.post("/api/chat/close") async def close_session(session_id: str) -> dict: try: # 清理会话资源(如缓存、临时数据) 客服_workflow.close_session(session_id=session_id) return {"code": 200, "message": "会话关闭成功"} except Exception as e: return {"code": 500, "message": f"会话关闭失败:{str(e)}"} # 启动服务器(本地调试) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)三、LangGraph 工作流与 FastAPI 集成(核心联动)
视频重点讲解如何改造原有 LangGraph 工作流,适配 FastAPI 的接口调用场景,核心是 “会话隔离、上下文管理、异步兼容”:
(一)LangGraph 工作流改造(workflow.py)
python
运行
from langgraph import Graph from langgraph_workflow.agent import CustomerServiceAgent from typing import Dict, List, Optional import json import os # 初始化客服智能体(含RAG检索、对话生成能力) cs_agent = CustomerServiceAgent() # 定义工作流State(适配接口传递的会话/上下文数据) class ChatWorkflowState: session_id: str # 会话ID query: str # 当前查询 context: List[Dict[str, str]] # 历史上下文 top_k: int # RAG检索TopK answer: Optional[str] = None # 生成的回答 updated_context: Optional[List[Dict[str, str]]] = None # 更新后的上下文 retrieved_docs: Optional[List[str]] = None # 检索到的文档 # 工作流节点1:RAG检索节点 def retrieve_node(state: ChatWorkflowState) -> ChatWorkflowState: # 调用智能体的RAG检索能力 retrieved_docs = cs_agent.retrieve(query=state.query, top_k=state.top_k) state.retrieved_docs = retrieved_docs return state # 工作流节点2:回答生成节点 def generate_answer_node(state: ChatWorkflowState) -> ChatWorkflowState: # 结合上下文+检索文档生成回答 answer = cs_agent.generate_answer( query=state.query, context=state.context, retrieved_docs=state.retrieved_docs ) state.answer = answer # 更新上下文(添加当前问答) state.updated_context = state.context + [ {"role": "user", "content": state.query}, {"role": "assistant", "content": answer} ] # 保存上下文到本地(JSON文件,生产可用数据库) save_context(state.session_id, state.updated_context) return state # 辅助函数:保存上下文到本地 def save_context(session_id: str, context: List[Dict[str, str]]): context_dir = "./chat_contexts" os.makedirs(context_dir, exist_ok=True) with open(os.path.join(context_dir, f"{session_id}.json"), "w", encoding="utf-8") as f: json.dump(context, f, ensure_ascii=False, indent=2) # 构建LangGraph工作流 graph = Graph(ChatWorkflowState) graph.add_node("retrieve", retrieve_node) # 检索节点 graph.add_node("generate_answer", generate_answer_node) # 生成节点 graph.add_edge("retrieve", "generate_answer") # 检索→生成 graph.set_entry_point("retrieve") # 入口节点 graph.set_finish_point("generate_answer") # 结束节点 # 编译工作流 客服_workflow = graph.compile() # 工作流扩展方法:获取历史对话 def get_history(self, session_id: str, user_id: Optional[str] = None) -> List[Dict[str, str]]: context_path = os.path.join("./chat_contexts", f"{session_id}.json") if os.path.exists(context_path): with open(context_path, "r", encoding="utf-8") as f: return json.load(f) return [] # 工作流扩展方法:关闭会话(清理上下文) def close_session(self, session_id: str): context_path = os.path.join("./chat_contexts", f"{session_id}.json") if os.path.exists(context_path): os.remove(context_path) # 给工作流绑定扩展方法 客服_workflow.get_history = get_history.__get__(客服_workflow) 客服_workflow.close_session = close_session.__get__(客服_workflow)(二)客服智能体实现(agent.py)
封装 RAG 检索与对话生成核心能力:
python
运行
from langchain_community.vectorstores import Chroma from langchain_openai import OpenAIEmbeddings, ChatOpenAI from langchain.prompts import PromptTemplate from typing import List class CustomerServiceAgent: def __init__(self): # 初始化嵌入模型(OpenAI Embeddings,可替换为本地模型) self.embeddings = OpenAIEmbeddings() # 初始化向量数据库(Chroma,本地持久化) self.vector_db = Chroma( persist_directory="./chroma_db", embedding_function=self.embeddings ) # 初始化大模型(ChatGPT,可替换为本地大模型如Llama3) self.llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.3) # 对话提示模板 self.prompt_template = PromptTemplate( input_variables=["query", "context", "retrieved_docs"], template=""" 你是专业的智能客服,基于以下信息回答用户问题: 1. 历史对话上下文:{context} 2. 相关参考文档:{retrieved_docs} 要求: - 回答必须基于提供的信息,不编造内容; - 语言简洁明了,贴合用户问题; - 若没有相关信息,回复"抱歉,暂时无法解答你的问题"。 用户问题:{query} """ ) def retrieve(self, query: str, top_k: int = 3) -> List[str]: """RAG检索相关文档""" docs = self.vector_db.similarity_search(query, k=top_k) return [doc.page_content for doc in docs] def generate_answer(self, query: str, context: List[Dict[str, str]], retrieved_docs: List[str]) -> str: """生成回答(结合上下文+检索文档)""" # 格式化上下文 context_str = "\n".join([f"{item['role']}:{item['content']}" for item in context]) # 格式化检索文档 docs_str = "\n".join([f"参考文档{i+1}:{doc}" for i, doc in enumerate(retrieved_docs)]) # 构建提示词 prompt = self.prompt_template.format( query=query, context=context_str, retrieved_docs=docs_str ) # 调用大模型生成回答 response = self.llm.invoke(prompt) return response.content四、接口测试与部署准备
(一)本地接口测试
- 启动 FastAPI 服务器
bash
运行
uvicorn main:app --reload --host 0.0.0.0 --port 8000- 访问自动生成的 API 文档(保姆级测试入口)
- Swagger UI:http://localhost:8000/docs
- ReDoc:http://localhost:8000/redoc
- 测试流程
- 打开 Swagger UI,找到
/api/chat/query接口,点击 “Try it out” - 输入请求示例:
json
{ "session_info": { "session_id": "test-session-001", "user_id": "user-001" }, "query": "什么是智能客服?", "context": [], "top_k": 3 }- 点击 “Execute”,查看返回的客服回答与上下文。
(二)部署准备(生产环境适配)
- 关闭调试模式,设置日志级别为 INFO
- 替换本地向量数据库(Chroma)为生产级数据库(如 Milvus、Pinecone)
- 大模型替换为企业级部署(如私有化部署的 Llama3、通义千问)
- 接口添加认证(如 API Key、JWT Token),避免未授权访问:
python
运行
# 添加API Key认证示例 from fastapi.security import APIKeyHeader from fastapi import Security, HTTPException API_KEY = "your-production-api-key" api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) def get_api_key(api_key: str = Security(api_key_header)): if api_key != API_KEY: raise HTTPException(status_code=401, detail="无效的API Key") return api_key # 给接口添加认证依赖 @app.post("/api/chat/query", response_model=QueryResponse, dependencies=[Depends(get_api_key)])五、常见问题与避坑指南
| 问题现象 | 核心原因 | 解决方案 |
|---|---|---|
| 接口调用提示 “401 Unauthorized” | 生产环境未传递 API Key 或 Key 无效 | 1. 前端请求头添加X-API-Key: 你的API Key;2. 核对 Key 是否与后端配置一致 |
| 大模型调用超时 / 失败 | 网络问题(OpenAI API)或本地模型未启动 | 1. 配置大模型超时时间(timeout=30);2. 切换为本地部署的大模型;3. 添加重试机制 |
| 上下文数据过大导致接口响应慢 | 历史对话上下文未做长度限制 | 1. 前端限制上下文长度(如仅传递最近 10 轮对话);2. 后端对上下文进行截断处理;3. 大文本数据压缩传输 |
| 向量数据库检索结果不准确 | 嵌入模型与文档适配性差,或 TopK 设置不合理 | 1. 更换更贴合业务的嵌入模型(如通义 Embedding);2. 调整 TopK 值(建议 3-5);3. 对文档进行预处理(分段、去重) |
| FastAPI 服务器启动失败(端口占用) | 8000 端口被其他程序占用 | 1. 更换端口(--port 8080);2. 关闭占用端口的程序;3. 配置随机端口 |