news 2026/2/9 5:13:40

[大模型架构] LangGraph AI 工作流编排(21)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
[大模型架构] LangGraph AI 工作流编排(21)
一、核心内容

接口层落地核心内容,聚焦解决 “智能客服的后端接口标准化、LangGraph 工作流与 API 无缝对接、请求响应处理、异常兼容” 问题,实现从 “本地工作流” 到 “可网络调用的智能客服服务” 的转化,为前端 / Vue 对接提供稳定接口支撑。

二、FastAPI 核心接口设计(实战重点)

视频围绕智能客服的核心场景(用户提问、会话管理、历史查询),设计 RESTful 风格接口,兼顾易用性与扩展性:

(一)接口开发前置准备
  1. 安装核心依赖

bash

运行

# FastAPI核心+Uvicorn服务器 pip install fastapi uvicorn # 数据校验+序列化 pip install pydantic python-multipart # LangGraph依赖(复用之前工作流) pip install langgraph langchain
  1. 项目目录结构(接口层 + 工作流层分离)

plaintext

fastapi-langgraph-cs/ ├── main.py # FastAPI入口文件(接口定义) ├── schemas.py # 请求/响应数据模型(Pydantic) ├── langgraph_workflow/ # LangGraph智能客服工作流(复用/改造) │ ├── __init__.py │ ├── agent.py # 客服智能体 │ └── workflow.py # 对话工作流(问答/上下文管理) └── utils/ # 工具类(日志/异常处理) ├── __init__.py └── log_util.py
(二)核心数据模型(schemas.py)

使用 Pydantic 定义请求 / 响应格式,确保数据校验与类型安全:

python

运行

from pydantic import BaseModel from typing import Optional, List, Dict from datetime import datetime # 会话基础信息模型 class SessionInfo(BaseModel): session_id: str # 会话唯一标识(前端传递/后端生成) user_id: Optional[str] = None # 用户ID(可选) create_time: Optional[datetime] = datetime.now() # 会话创建时间 # 提问请求模型(用户输入) class QueryRequest(BaseModel): session_info: SessionInfo query: str # 用户提问内容 context: Optional[List[Dict[str, str]]] = None # 历史对话上下文([{role: user/assistant, content: ...}]) top_k: Optional[int] = 3 # RAG检索TopK(可选,默认3) # 回答响应模型(后端返回) class QueryResponse(BaseModel): code: int = 200 # 状态码(200成功/500失败) message: str = "success" # 状态描述 data: Optional[Dict] = None # 响应数据 error: Optional[str] = None # 错误信息(失败时返回) # 历史对话查询请求 class HistoryRequest(BaseModel): session_id: str # 会话ID user_id: Optional[str] = None # 历史对话响应 class HistoryResponse(BaseModel): code: int = 200 message: str = "success" data: Optional[List[Dict[str, str]]] = None # 历史对话列表
(三)核心接口实现(main.py)

定义 3 个核心接口,覆盖智能客服完整流程:

python

运行

from fastapi import FastAPI, HTTPException from schemas import QueryRequest, QueryResponse, HistoryRequest, HistoryResponse from langgraph_workflow.workflow import客服_workflow # 复用LangGraph工作流 from utils.log_util import logger import uuid # 创建FastAPI实例 app = FastAPI(title="LangGraph+FastAPI智能客服接口", version="1.0.0") # 1. 智能客服提问接口(核心) @app.post("/api/chat/query", response_model=QueryResponse) async def chat_query(request: QueryRequest) -> QueryResponse: try: # 生成/验证会话ID session_id = request.session_info.session_id or str(uuid.uuid4()) logger.info(f"会话{session_id}:用户提问 - {request.query}") # 调用LangGraph工作流处理(问答+上下文管理) workflow_result = 客服_workflow.run( session_id=session_id, query=request.query, context=request.context, top_k=request.top_k ) # 构造响应 return QueryResponse( data={ "session_id": session_id, "response": workflow_result["answer"], # 客服回答 "context": workflow_result["updated_context"], # 更新后的上下文 "retrieved_docs": workflow_result.get("retrieved_docs", []) # RAG检索文档(可选返回) } ) except Exception as e: logger.error(f"会话{request.session_info.session_id}:提问处理失败 - {str(e)}") return QueryResponse( code=500, message="failed", error=str(e) ) # 2. 历史对话查询接口 @app.post("/api/chat/history", response_model=HistoryResponse) async def chat_history(request: HistoryRequest) -> HistoryResponse: try: # 调用工作流的历史查询方法 history = 客服_workflow.get_history(session_id=request.session_id, user_id=request.user_id) return HistoryResponse(data=history) except Exception as e: logger.error(f"查询会话{request.session_id}历史失败 - {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # 3. 会话关闭接口(可选) @app.post("/api/chat/close") async def close_session(session_id: str) -> dict: try: # 清理会话资源(如缓存、临时数据) 客服_workflow.close_session(session_id=session_id) return {"code": 200, "message": "会话关闭成功"} except Exception as e: return {"code": 500, "message": f"会话关闭失败:{str(e)}"} # 启动服务器(本地调试) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
三、LangGraph 工作流与 FastAPI 集成(核心联动)

视频重点讲解如何改造原有 LangGraph 工作流,适配 FastAPI 的接口调用场景,核心是 “会话隔离、上下文管理、异步兼容”:

(一)LangGraph 工作流改造(workflow.py)

python

运行

from langgraph import Graph from langgraph_workflow.agent import CustomerServiceAgent from typing import Dict, List, Optional import json import os # 初始化客服智能体(含RAG检索、对话生成能力) cs_agent = CustomerServiceAgent() # 定义工作流State(适配接口传递的会话/上下文数据) class ChatWorkflowState: session_id: str # 会话ID query: str # 当前查询 context: List[Dict[str, str]] # 历史上下文 top_k: int # RAG检索TopK answer: Optional[str] = None # 生成的回答 updated_context: Optional[List[Dict[str, str]]] = None # 更新后的上下文 retrieved_docs: Optional[List[str]] = None # 检索到的文档 # 工作流节点1:RAG检索节点 def retrieve_node(state: ChatWorkflowState) -> ChatWorkflowState: # 调用智能体的RAG检索能力 retrieved_docs = cs_agent.retrieve(query=state.query, top_k=state.top_k) state.retrieved_docs = retrieved_docs return state # 工作流节点2:回答生成节点 def generate_answer_node(state: ChatWorkflowState) -> ChatWorkflowState: # 结合上下文+检索文档生成回答 answer = cs_agent.generate_answer( query=state.query, context=state.context, retrieved_docs=state.retrieved_docs ) state.answer = answer # 更新上下文(添加当前问答) state.updated_context = state.context + [ {"role": "user", "content": state.query}, {"role": "assistant", "content": answer} ] # 保存上下文到本地(JSON文件,生产可用数据库) save_context(state.session_id, state.updated_context) return state # 辅助函数:保存上下文到本地 def save_context(session_id: str, context: List[Dict[str, str]]): context_dir = "./chat_contexts" os.makedirs(context_dir, exist_ok=True) with open(os.path.join(context_dir, f"{session_id}.json"), "w", encoding="utf-8") as f: json.dump(context, f, ensure_ascii=False, indent=2) # 构建LangGraph工作流 graph = Graph(ChatWorkflowState) graph.add_node("retrieve", retrieve_node) # 检索节点 graph.add_node("generate_answer", generate_answer_node) # 生成节点 graph.add_edge("retrieve", "generate_answer") # 检索→生成 graph.set_entry_point("retrieve") # 入口节点 graph.set_finish_point("generate_answer") # 结束节点 # 编译工作流 客服_workflow = graph.compile() # 工作流扩展方法:获取历史对话 def get_history(self, session_id: str, user_id: Optional[str] = None) -> List[Dict[str, str]]: context_path = os.path.join("./chat_contexts", f"{session_id}.json") if os.path.exists(context_path): with open(context_path, "r", encoding="utf-8") as f: return json.load(f) return [] # 工作流扩展方法:关闭会话(清理上下文) def close_session(self, session_id: str): context_path = os.path.join("./chat_contexts", f"{session_id}.json") if os.path.exists(context_path): os.remove(context_path) # 给工作流绑定扩展方法 客服_workflow.get_history = get_history.__get__(客服_workflow) 客服_workflow.close_session = close_session.__get__(客服_workflow)
(二)客服智能体实现(agent.py)

封装 RAG 检索与对话生成核心能力:

python

运行

from langchain_community.vectorstores import Chroma from langchain_openai import OpenAIEmbeddings, ChatOpenAI from langchain.prompts import PromptTemplate from typing import List class CustomerServiceAgent: def __init__(self): # 初始化嵌入模型(OpenAI Embeddings,可替换为本地模型) self.embeddings = OpenAIEmbeddings() # 初始化向量数据库(Chroma,本地持久化) self.vector_db = Chroma( persist_directory="./chroma_db", embedding_function=self.embeddings ) # 初始化大模型(ChatGPT,可替换为本地大模型如Llama3) self.llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.3) # 对话提示模板 self.prompt_template = PromptTemplate( input_variables=["query", "context", "retrieved_docs"], template=""" 你是专业的智能客服,基于以下信息回答用户问题: 1. 历史对话上下文:{context} 2. 相关参考文档:{retrieved_docs} 要求: - 回答必须基于提供的信息,不编造内容; - 语言简洁明了,贴合用户问题; - 若没有相关信息,回复"抱歉,暂时无法解答你的问题"。 用户问题:{query} """ ) def retrieve(self, query: str, top_k: int = 3) -> List[str]: """RAG检索相关文档""" docs = self.vector_db.similarity_search(query, k=top_k) return [doc.page_content for doc in docs] def generate_answer(self, query: str, context: List[Dict[str, str]], retrieved_docs: List[str]) -> str: """生成回答(结合上下文+检索文档)""" # 格式化上下文 context_str = "\n".join([f"{item['role']}:{item['content']}" for item in context]) # 格式化检索文档 docs_str = "\n".join([f"参考文档{i+1}:{doc}" for i, doc in enumerate(retrieved_docs)]) # 构建提示词 prompt = self.prompt_template.format( query=query, context=context_str, retrieved_docs=docs_str ) # 调用大模型生成回答 response = self.llm.invoke(prompt) return response.content
四、接口测试与部署准备
(一)本地接口测试
  1. 启动 FastAPI 服务器

bash

运行

uvicorn main:app --reload --host 0.0.0.0 --port 8000
  1. 访问自动生成的 API 文档(保姆级测试入口)
  • Swagger UI:http://localhost:8000/docs
  • ReDoc:http://localhost:8000/redoc
  1. 测试流程
  • 打开 Swagger UI,找到/api/chat/query接口,点击 “Try it out”
  • 输入请求示例:

json

{ "session_info": { "session_id": "test-session-001", "user_id": "user-001" }, "query": "什么是智能客服?", "context": [], "top_k": 3 }
  • 点击 “Execute”,查看返回的客服回答与上下文。
(二)部署准备(生产环境适配)
  1. 关闭调试模式,设置日志级别为 INFO
  2. 替换本地向量数据库(Chroma)为生产级数据库(如 Milvus、Pinecone)
  3. 大模型替换为企业级部署(如私有化部署的 Llama3、通义千问)
  4. 接口添加认证(如 API Key、JWT Token),避免未授权访问:

python

运行

# 添加API Key认证示例 from fastapi.security import APIKeyHeader from fastapi import Security, HTTPException API_KEY = "your-production-api-key" api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) def get_api_key(api_key: str = Security(api_key_header)): if api_key != API_KEY: raise HTTPException(status_code=401, detail="无效的API Key") return api_key # 给接口添加认证依赖 @app.post("/api/chat/query", response_model=QueryResponse, dependencies=[Depends(get_api_key)])
五、常见问题与避坑指南
问题现象核心原因解决方案
接口调用提示 “401 Unauthorized”生产环境未传递 API Key 或 Key 无效1. 前端请求头添加X-API-Key: 你的API Key;2. 核对 Key 是否与后端配置一致
大模型调用超时 / 失败网络问题(OpenAI API)或本地模型未启动1. 配置大模型超时时间(timeout=30);2. 切换为本地部署的大模型;3. 添加重试机制
上下文数据过大导致接口响应慢历史对话上下文未做长度限制1. 前端限制上下文长度(如仅传递最近 10 轮对话);2. 后端对上下文进行截断处理;3. 大文本数据压缩传输
向量数据库检索结果不准确嵌入模型与文档适配性差,或 TopK 设置不合理1. 更换更贴合业务的嵌入模型(如通义 Embedding);2. 调整 TopK 值(建议 3-5);3. 对文档进行预处理(分段、去重)
FastAPI 服务器启动失败(端口占用)8000 端口被其他程序占用1. 更换端口(--port 8080);2. 关闭占用端口的程序;3. 配置随机端口
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/4 2:29:46

结合深度学习与众包学习的医学图像多专家标注分割研究【附代码】

✅ 博主简介:擅长数据搜集与处理、建模仿真、程序设计、仿真代码、论文写作与指导,毕业论文、期刊论文经验交流。✅成品或者定制,扫描文章底部微信二维码。(1) 本研究针对医学图像分割中专家间标注差异导致的训练数据不一致性问题&#xff0c…

作者头像 李华
网站建设 2026/2/8 19:42:52

基于深度学习的前列腺超声图像超分辨率重建与分类研究【附代码】

✅ 博主简介:擅长数据搜集与处理、建模仿真、程序设计、仿真代码、论文写作与指导,毕业论文、期刊论文经验交流。✅成品或者定制,扫描文章底部微信二维码。(1) 融合U-Net的循环生成对抗网络前列腺超声图像超分辨率重建方法前列腺超声图像在临…

作者头像 李华
网站建设 2026/2/4 17:10:26

具身智能十年演进

下面给你一条从工程现实、系统能力与规模化落地视角出发的 「具身智能十年演进路线(2025–2035)」。 我会刻意避开“通用智能”“像人一样聪明”的叙事,聚焦哪些能力真的会发生跃迁、为什么、以及工程上意味着什么。一、核心判断(…

作者头像 李华
网站建设 2026/2/7 16:11:34

机器人系统软件架构十年演进

下面给你一条站在系统工程与长期演进视角的 「机器人系统软件架构十年演进路线(2025–2035)」。 这不是“ROS2 vs XXX”的对比,而是机器人软件架构如何从“能跑”进化为“可治理、可自治”的系统骨架。一、核心判断(一句话&#x…

作者头像 李华
网站建设 2026/2/1 5:51:10

全球机器人OS对比

全球主流机器人 OS(操作系统/平台)并不是“谁更强”的问题,而是“为哪一类机器人、哪一阶段、哪一种治理模式而生”。 下面我从系统定位、技术哲学、工程成熟度、长期演进能力四个维度,给你一份真正可用于选型与战略判断的全球机器…

作者头像 李华
网站建设 2026/2/8 10:56:33

稳定性质量系列-系统稳定性建设实践

开篇 在开始介绍服务稳定性之前,我们先聊一下 SLA。SLA(service-level agreement,即 服务级别协议)也称服务等级协议,经常被用来衡量服务稳定性指标。通常被称作“几个 9”,9 越多代表服务全年可用时间越长…

作者头像 李华