LangChain 与 LangGraph 的组件化能力,降低了 AI Agent 的原型构建门槛。但在项目迈向生产环境时,AI Agent 工程化落地还存在一些问题。今天想在这里分享一下,作者本人在实际生产环境中 AI Agent 开发过程中所遇到的问题和几点经验。
一切都以实际需求为准,那么就以实际需求为切入点,徐徐展开。
1. 多租户 API 配置的动态切换
需求场景
在 SaaS 场景下,把 Agent 封装成 API 服务对外提供时,会出现不同用户需要使用不同 LLM 配置的情况:
- 费用隔离- 企业客户自带 API Key,按自己的额度计费
- 数据安全- 部分用户要求接入私有化部署的模型
- A/B 测试- 同一服务对不同请求使用不同模型对比效果
除这三个常见的以外,可能还有其他的使用场景。总之,每个请求需要能够动态指定model、base_url和api_key。
LangChain 原生实现的问题
先看看 LangChain 原生是怎么做的:
from langchain_openai import ChatOpenAI # 初始化时就要绑定配置 llm = ChatOpenAI( model="gpt-4", api_key="sk-xxx", base_url="https://api.openai.com/v1" )这样写在单用户场景没问题,但放到多租户 Web 服务里就有几个问题:
- 配置在初始化时绑定,实例创建后无法切换:
ChatOpenAI的model、openai_api_base、openai_api_key都是构造参数,一旦实例创建完成,配置就固定了。如果要切换到另一个服务商或 API Key,就必须创建新实例 - 频繁创建实例:每来一个用户请求,就得
new一个新的ChatOpenAI实例 - HTTP 连接无法复用:每个实例会独立创建底层 HTTP 连接,高并发时连接数不可控,可能导致资源耗尽
解决方案
核心思路是Agent 复用,Client 按需创建,HTTP 连接池共享:
import httpx from openai import AsyncOpenAI # ========== 1. 全局 HTTP 连接池(单例)========== class GlobalHTTPFactory: _client = None @classmethod async def get_client(cls): if cls._client is None: cls._client = httpx.AsyncClient( limits=httpx.Limits(max_connections=50), http2=True ) return cls._client # ========== 2. Agent 基类 ========== class BaseAgent: def __init__(self, default_config=None): self.default_config = default_config or {} async def get_openai_client(self, runtime_config=None): # 合并配置:运行时配置覆盖默认配置 config = {**self.default_config, **runtime_config} if runtime_config else self.default_config # 复用全局 HTTP 连接池 shared_http = await GlobalHTTPFactory.get_client() return AsyncOpenAI( api_key=config["api_key"], base_url=config["base_url"], http_client=shared_http # 关键:注入共享的 HTTP 客户端 ) async def run(self, text, **runtime_config): client = await self.get_openai_client(runtime_config) # 调用 LLM... # ========== 3. Web 服务层 ========== # 服务启动时,创建一次 Agent agent = BaseAgent() @app.post("/chat") async def chat(request): # 每个请求传入自己的配置 return await agent.run( text=request.text, model=request.model, base_url=request.base_url, api_key=request.api_key )整个设计的要点:
- Agent 只创建一次,服务启动时初始化,全局复用
- OpenAI 客户端按需创建,每个请求根据传入的配置生成
- HTTP 连接池全局共享,通过
http_client参数注入,避免连接数爆炸,极大降低运行所需内存
为什么不用全局连接池内存会一直累加?
每次
new AsyncOpenAI()都会创建一个内部的 HTTP 连接池。如果请求完成后没有显式调用close(),底层连接不会立即释放。加上 Python GC 的延迟、HTTP Keep-alive 保持连接、异步对象可能存在的循环引用等因素,这些连接池会不断累积,最终导致内存持续增长甚至 OOM。使用全局连接池后,只维护一个实例,连接数有上限,问题解决。
2. 模型思考过程的流式透传
需求场景
DeepSeek-R1、GLM-4 等推理模型在生成最终答案之前,会先进行一段"思考"。这个思考过程对用户来说是有价值的——它能让用户看到模型是如何分析问题的,增强可信度。
把 Agent 封装成 API 服务时,需要把这个思考过程实时流式地透传给前端:
- 实时展示:用户能看到模型"正在思考",而不是干等
- 区分内容:前端需要区分"思考内容"和"最终回答",分别展示
- 多模型兼容:不同模型厂商的思考参数格式不同(DeepSeek、GLM、Qwen 各有各的写法)
问题在哪
OpenAI SDK 返回的流式 chunk 结构是这样的:
# 普通内容在 delta.content # 思考内容在 delta.reasoning_content(这是 DeepSeek 扩展的字段) chunk.choices[0].delta.content # 最终回答 chunk.choices[0].delta.reasoning_content # 思考过程问题是:
reasoning_content不是标准字段:LangChain 原生不认识这个字段,会直接丢弃- 不同模型参数不同:DeepSeek 用
reasoning_content,GLM 用thinking.type,Qwen 用enable_thinking - 需要手动解析 chunk:拿到 chunk 后,要自己判断是思考内容还是正式内容
解决方案
核心问题是:LangChain 的AIMessageChunk只保留content,会把reasoning_content丢掉。
解决思路是自定义消息类型 + 自定义 ChatModel,把 OpenAI 原始响应完整保留下来:
from langchain_core.messages import BaseMessageChunk from langchain_core.language_models.chat_models import BaseChatModel # ========== 1. 自定义消息类型,保留完整的 OpenAI 原始响应 ========== class ChatMessageChunk(BaseMessageChunk): """关键:把 OpenAI 的原始 chunk 完整保留下来""" chat_completion_chunk: Optional[ChatCompletionChunk] = None # ========== 2. 自定义 ChatModel,流式时用自定义消息类型包装 ========== class LLMClientChatModel(BaseChatModel): async def _astream(self, messages, **kwargs): async for chunk in self.client.astream(messages): # 用自定义消息包装,不让 LangChain 丢掉 reasoning_content message = ChatMessageChunk(content="", chat_completion_chunk=chunk) yield ChatGenerationChunk(message=message) # ========== 3. Agent 中解析 chunk,区分思考/内容 ========== class MyAgent: async def run_stream(self, text, **kwargs): async for event in self.graph.astream_events(inputs, config): if event["event"] == "on_chat_model_stream": chunk = event["data"]["chunk"] # 因为保留了完整 chunk,这里能拿到 reasoning_content delta = chunk.message.chat_completion_chunk.choices[0].delta if delta.reasoning_content: yield StreamChunk(type="thinking", content=delta.reasoning_content) elif delta.content: yield StreamChunk(type="content", content=delta.content) # ========== 4. 不同模型的思考参数统一管理 ========== class ThinkingConfig: def __init__(self): self.model_params = { "glm": {"thinking": {"type": "enabled"}}, "deepseek": {}, # DeepSeek 默认支持 "qwen": {"enable_thinking": True} } def get_params(self, model_name, enable): model_type = self._detect_model_type(model_name) return self.model_params.get(model_type, {}) if enable else {}整个设计的要点:
- 自定义消息类型:
ChatMessageChunk保留完整的ChatCompletionChunk,不让 LangChain 丢弃扩展字段 - 自定义 ChatModel:
LLMClientChatModel在流式输出时用自定义消息包装 - Agent 层解析:从保留的完整 chunk 中取出
reasoning_content,区分类型输出 - 统一配置管理:
ThinkingConfig封装不同模型的参数差异
3. 中间执行路径的可观测性
需求场景
以一个智能研报生成 Agent为例,它的执行流程比较复杂:
输入解析 → 多源数据采集(并行)→ 数据交叉验证 → 深度分析 → 观点提炼 → 报告生成 → 合规审查 → 输出 ↓ ↓ [财报API] [验证失败则重试] [新闻API] [行情API]用户调用 API 生成一份研报时,可能需要等待 30 秒以上。如果这期间前端只显示一个转圈动画,用户体验会很差。我们需要把执行进度实时透传出来:
- 实时反馈:用户能看到「正在采集财报数据 (2/3)」「正在进行深度分析」,而不是干等
- 并行任务状态:多个数据源并行采集时,分别展示各自的进度
- 条件分支可见:数据验证失败触发重试时,用户能知道发生了什么
- 调试定位:出问题时能快速定位是哪个节点、哪个数据源出错
问题在哪
LangGraph 的ainvoke()方法只返回最终结果,中间过程完全是黑盒:
# 只能拿到最终结果,中间 30 秒发生了什么完全不知道 result = await graph.ainvoke(inputs)虽然 LangGraph 提供了astream_events()方法,但它的事件类型很多,需要自己过滤和解析:
# astream_events 会抛出各种事件:on_chain_start, on_chain_end, on_chat_model_stream... # 需要自己判断哪些是节点事件,哪些是 LLM 事件,哪些是子图事件 async for event in graph.astream_events(inputs): # 怎么过滤?怎么区分主图和子图?怎么处理并行节点? pass解决方案
核心是监听on_chain_start/on_chain_end事件,结合节点元数据,输出结构化的进度信息:
from pydantic import BaseModel, Field from typing import Optional, Literal from enum import Enum # ========== 1. 统一的流式输出格式 ========== class ChunkType(str, Enum): PROCESSING = "processing" # 执行进度 THINKING = "thinking" # 思考过程 CONTENT = "content" # 正式内容 FINAL = "final" # 最终结果 ERROR = "error" # 错误信息 class StreamChunk(BaseModel): """流式输出块,前端按 type 分别处理""" type: ChunkType content: str metadata: Optional[dict] = None # ========== 2. 节点进度配置(可配置化)========== NODE_PROGRESS_MAP = { # 节点名 -> (进度描述, 预估耗时秒, 所属阶段) "parse_input": ("解析用户输入", 1, "准备阶段"), "fetch_financial_data": ("采集财报数据", 5, "数据采集"), "fetch_news_data": ("采集新闻资讯", 3, "数据采集"), "fetch_market_data": ("采集行情数据", 2, "数据采集"), "validate_data": ("交叉验证数据", 3, "数据处理"), "deep_analysis": ("深度分析", 10, "智能分析"), "extract_insights": ("提炼核心观点", 5, "智能分析"), "generate_report": ("生成研报内容", 8, "报告生成"), "compliance_check": ("合规性审查", 3, "质量保障"), } # ========== 3. Agent 中监听节点事件 ========== class ResearchReportAgent: def __init__(self): self.parallel_tasks = {} # 跟踪并行任务状态 self.retry_count = {} # 跟踪重试次数 async def run_stream(self, query: str, **kwargs): async for event in self.graph.astream_events(inputs, config): event_type = event.get("event", "") node_name = event.get("name", "") # ===== 节点开始事件 ===== if event_type == "on_chain_start": if node_name in NODE_PROGRESS_MAP: desc, est_time, stage = NODE_PROGRESS_MAP[node_name] # 处理重试场景 retry = self.retry_count.get(node_name, 0) retry_hint = f"(第 {retry + 1} 次尝试)" if retry > 0 else "" yield StreamChunk( type=ChunkType.PROCESSING, content=f"[{stage}] {desc}{retry_hint}...", metadata={ "node": node_name, "stage": stage, "estimated_seconds": est_time, "retry_count": retry } ) # 处理并行数据采集节点 elif node_name == "parallel_data_fetch": self.parallel_tasks = {"financial": "pending", "news": "pending", "market": "pending"} yield StreamChunk( type=ChunkType.PROCESSING, content="[数据采集] 正在并行采集多源数据...", metadata={"parallel_status": self.parallel_tasks} ) # ===== 节点结束事件 ===== elif event_type == "on_chain_end": # 更新并行任务状态 if node_name == "fetch_financial_data": self.parallel_tasks["financial"] = "completed" yield StreamChunk( type=ChunkType.PROCESSING, content="[数据采集] 财报数据采集完成 ✓", metadata={"parallel_status": self.parallel_tasks.copy()} ) # 处理验证失败触发重试的场景 elif node_name == "validate_data": output = event.get("data", {}).get("output", {}) if not output.get("is_valid", True): failed_sources = output.get("failed_sources", []) for src in failed_sources: self.retry_count[f"fetch_{src}_data"] = self.retry_count.get(f"fetch_{src}_data", 0) + 1 yield StreamChunk( type=ChunkType.PROCESSING, content=f"[数据处理] 验证未通过,{failed_sources} 将重新采集...", metadata={"retry_sources": failed_sources} ) # 图执行完毕,输出最终结果 elif node_name == "LangGraph": final_output = event["data"]["output"] yield StreamChunk( type=ChunkType.FINAL, content="", metadata={"report": final_output} ) # ===== LLM 流式输出事件 ===== elif event_type == "on_chat_model_stream": chunk = event["data"]["chunk"] delta = chunk.message.chat_completion_chunk.choices[0].delta if delta.reasoning_content: yield StreamChunk(type=ChunkType.THINKING, content=delta.reasoning_content) elif delta.content: yield StreamChunk(type=ChunkType.CONTENT, content=delta.content)前端拿到的流式输出会是这样:
{"type": "processing", "content": "[准备阶段] 解析用户输入...", "metadata": {"node": "parse_input", "stage": "准备阶段", "estimated_seconds": 1}} {"type": "processing", "content": "[数据采集] 正在并行采集多源数据...", "metadata": {"parallel_status": {"financial": "pending", "news": "pending", "market": "pending"}}} {"type": "processing", "content": "[数据采集] 采集新闻资讯...", "metadata": {"node": "fetch_news_data", "stage": "数据采集"}} {"type": "processing", "content": "[数据采集] 采集财报数据...", "metadata": {"node": "fetch_financial_data", "stage": "数据采集"}} {"type": "processing", "content": "[数据采集] 新闻数据采集完成 ✓", "metadata": {"parallel_status": {"financial": "pending", "news": "completed", "market": "pending"}}} {"type": "processing", "content": "[数据采集] 财报数据采集完成 ✓", "metadata": {"parallel_status": {"financial": "completed", "news": "completed", "market": "pending"}}} {"type": "processing", "content": "[数据采集] 行情数据采集完成 ✓", "metadata": {"parallel_status": {"financial": "completed", "news": "completed", "market": "completed"}}} {"type": "processing", "content": "[数据处理] 交叉验证数据...", "metadata": {"node": "validate_data"}} {"type": "processing", "content": "[数据处理] 验证未通过,['market'] 将重新采集...", "metadata": {"retry_sources": ["market"]}} {"type": "processing", "content": "[数据采集] 采集行情数据(第 2 次尝试)...", "metadata": {"node": "fetch_market_data", "retry_count": 1}} {"type": "processing", "content": "[智能分析] 深度分析...", "metadata": {"node": "deep_analysis", "estimated_seconds": 10}} {"type": "thinking", "content": "我需要从多个维度分析这家公司..."} {"type": "thinking", "content": "首先看财务数据,营收同比增长..."} {"type": "content", "content": "## 一、公司概况\n\n"} {"type": "content", "content": "该公司是国内领先的..."} {"type": "processing", "content": "[智能分析] 提炼核心观点...", "metadata": {"node": "extract_insights"}} {"type": "processing", "content": "[报告生成] 生成研报内容...", "metadata": {"node": "generate_report"}} {"type": "processing", "content": "[质量保障] 合规性审查...", "metadata": {"node": "compliance_check"}} {"type": "final", "content": "", "metadata": {"report": {"title": "XX公司深度研究报告", "sections": [...]}}}整个设计的要点:
- 统一输出格式:
StreamChunk定义type字段,前端按类型分别处理 - 节点配置可扩展:
NODE_PROGRESS_MAP集中管理节点描述,新增节点只需加一行配置 - 并行任务可追踪:通过
parallel_status字段,前端可以渲染多任务进度条 - 重试过程透明:验证失败、重新采集等异常流程对用户可见,增强可信度
- 预估时间可用:
estimated_seconds可用于前端渲染预估进度条
总结
本文分享了在开发open-pilot-agent过程中遇到的三个生产级挑战,以及对应的解决方案:
- 多租户 API 配置的动态切换:通过 Agent 复用 + Client 按需创建 + HTTP 连接池共享,解决配置动态切换和资源管理问题
- 模型思考过程的流式透传:通过自定义消息类型和 ChatModel,保留 OpenAI 原始响应中的
reasoning_content字段 - 中间执行路径的可观测性:通过监听 LangGraph 的
astream_events,将节点执行进度实时透传给前端
学习资源推荐
如果你想更深入地学习大模型,以下是一些非常有价值的学习资源,这些资源将帮助你从不同角度学习大模型,提升你的实践能力。
一、全套AGI大模型学习路线
AI大模型时代的学习之旅:从基础到前沿,掌握人工智能的核心技能!
因篇幅有限,仅展示部分资料,需要点击文章最下方名片即可前往获取
二、640套AI大模型报告合集
这套包含640份报告的合集,涵盖了AI大模型的理论研究、技术实现、行业应用等多个方面。无论您是科研人员、工程师,还是对AI大模型感兴趣的爱好者,这套报告合集都将为您提供宝贵的信息和启示
因篇幅有限,仅展示部分资料,需要点击文章最下方名片即可前往获取
三、AI大模型经典PDF籍
随着人工智能技术的飞速发展,AI大模型已经成为了当今科技领域的一大热点。这些大型预训练模型,如GPT-3、BERT、XLNet等,以其强大的语言理解和生成能力,正在改变我们对人工智能的认识。 那以下这些PDF籍就是非常不错的学习资源。
因篇幅有限,仅展示部分资料,需要点击文章最下方名片即可前往获取
四、AI大模型商业化落地方案
作为普通人,入局大模型时代需要持续学习和实践,不断提高自己的技能和认知水平,同时也需要有责任感和伦理意识,为人工智能的健康发展贡献力量。