AI Agent 多任务处理:并行编排、状态隔离与失败恢复的工程实践
一、从串行到并行:Agent 系统的任务瓶颈
在 AI Agent 系统中,单任务串行执行是最简单的实现方式,但也是性能最差的。一个典型的数据处理 Agent 流水线包含:数据获取、清洗、分析、报告生成四个步骤。串行执行时,每个步骤必须等前一步完成,总耗时是各步骤之和。
实际生产中的痛点更具体:一个金融风控 Agent 需要同时调用征信查询、行为分析、关联网络扫描三个子任务。串行执行耗时约 12 秒,而用户对风控决策的容忍上限是 3 秒。并行执行三个子任务后,总耗时降至最慢子任务的耗时(约 2.8 秒),满足 SLA 要求。
但并行编排引入了新的工程问题:子任务之间的状态如何隔离?部分失败时如何恢复?结果如何聚合?这些问题的处理质量直接决定 Agent 系统在生产环境的可靠性。
二、并行编排的架构模型:Fan-out/Fan-in 与 DAG 调度
Agent 多任务处理的核心架构是Fan-out/Fan-in模式:将一个主任务拆分为多个子任务并行执行(Fan-out),然后收集结果并聚合(Fan-in)。更复杂的场景需要 DAG(有向无环图)调度,处理子任务之间的依赖关系。
graph TB A[主任务:风控决策] --> B[子任务1:征信查询] A --> C[子任务2:行为分析] A --> D[子任务3:关联网络扫描] B --> E[结果聚合器] C --> E D --> E E --> F{全部成功?} F -->|是| G[生成风控报告] F -->|否| H[失败恢复策略] H --> I[重试 / 降级 / 部分结果] style A fill:#e1f5fe style E fill:#fff3e0 style F fill:#ffebee style G fill:#e8f5e9关键设计要素:
- 状态隔离:每个子任务拥有独立的上下文(Context),互不污染。子任务的中间状态通过消息传递而非共享内存交互。
- 超时控制:每个子任务设置独立超时,避免单个慢任务拖垮整体。主任务设置全局超时作为兜底。
- 失败策略:区分可重试失败(网络超时、限流)和不可重试失败(参数错误、权限拒绝),分别处理。
三、生产级代码:基于 Python 的并行 Agent 编排框架
以下代码实现了一个支持并行编排、状态隔离、失败恢复的 Agent 任务调度器。
import asyncio import time from dataclasses import dataclass, field from enum import Enum from typing import Any, Callable, Coroutine import logging logger = logging.getLogger(__name__) class TaskStatus(Enum): PENDING = "pending" RUNNING = "running" SUCCESS = "success" FAILED = "failed" TIMEOUT = "timeout" @dataclass class TaskResult: """子任务执行结果,包含状态、返回值和元信息""" task_id: str status: TaskStatus value: Any = None error: str | None = None duration_ms: float = 0 retry_count: int = 0 @dataclass class TaskSpec: """子任务定义,包含执行函数、超时和重试策略""" task_id: str func: Callable[..., Coroutine] args: tuple = () kwargs: dict = field(default_factory=dict) timeout_sec: float = 10.0 max_retries: int = 2 retry_delay_sec: float = 1.0 # 是否为关键任务:关键任务失败则整个编排失败 critical: bool = True class AgentOrchestrator: """Agent 多任务并行编排器,支持状态隔离与失败恢复""" def __init__(self, global_timeout_sec: float = 30.0): self.global_timeout_sec = global_timeout_sec self._results: dict[str, TaskResult] = {} async def execute_parallel( self, tasks: list[TaskSpec] ) -> dict[str, TaskResult]: """并行执行多个子任务,返回每个任务的结果""" start = time.monotonic() # 为每个子任务创建独立的 asyncio.Task,实现状态隔离 coroutines = [self._execute_with_retry(t) for t in tasks] # 使用 asyncio.gather 收集结果,不因单个失败取消其他任务 results = await asyncio.gather(*coroutines, return_exceptions=True) for i, result in enumerate(results): task_id = tasks[i].task_id if isinstance(result, Exception): # gather 捕获的异常,转为 TaskResult self._results[task_id] = TaskResult( task_id=task_id, status=TaskStatus.FAILED, error=str(result), duration_ms=(time.monotonic() - start) * 1000, ) else: self._results[task_id] = result # 检查关键任务是否全部成功 critical_failed = [ t.task_id for t in tasks if t.critical and self._results.get(t.task_id) and self._results[t.task_id].status != TaskStatus.SUCCESS ] if critical_failed: logger.error( "关键任务失败: %s,编排终止", critical_failed ) return self._results async def _execute_with_retry(self, spec: TaskSpec) -> TaskResult: """带重试和超时的单任务执行""" start = time.monotonic() last_error: str | None = None for attempt in range(spec.max_retries + 1): try: # 每次重试使用独立的超时控制 value = await asyncio.wait_for( spec.func(*spec.args, **spec.kwargs), timeout=spec.timeout_sec, ) return TaskResult( task_id=spec.task_id, status=TaskStatus.SUCCESS, value=value, duration_ms=(time.monotonic() - start) * 1000, retry_count=attempt, ) except asyncio.TimeoutError: last_error = f"超时({spec.timeout_sec}s)" logger.warning( "任务 %s 第 %d 次超时", spec.task_id, attempt + 1 ) except Exception as e: last_error = str(e) logger.warning( "任务 %s 第 %d 次失败: %s", spec.task_id, attempt + 1, e, ) # 重试前等待(最后一次不需要) if attempt < spec.max_retries: await asyncio.sleep(spec.retry_delay_sec) return TaskResult( task_id=spec.task_id, status=TaskStatus.FAILED, error=last_error, duration_ms=(time.monotonic() - start) * 1000, retry_count=spec.max_retries, ) def get_summary(self) -> dict[str, Any]: """获取执行摘要,用于监控和日志""" total = len(self._results) success = sum( 1 for r in self._results.values() if r.status == TaskStatus.SUCCESS ) return { "total": total, "success": success, "failed": total - success, "success_rate": f"{success / total:.1%}" if total else "N/A", "details": { tid: {"status": r.status.value, "duration_ms": round(r.duration_ms)} for tid, r in self._results.items() }, } # ========== 使用示例:风控决策 Agent ========== async def query_credit(user_id: str) -> dict: """模拟征信查询""" await asyncio.sleep(1.5) return {"user_id": user_id, "score": 720, "level": "A"} async def analyze_behavior(user_id: str) -> dict: """模拟行为分析""" await asyncio.sleep(2.0) return {"user_id": user_id, "risk_tags": [], "anomaly_score": 0.12} async def scan_network(user_id: str) -> dict: """模拟关联网络扫描""" await asyncio.sleep(2.5) return {"user_id": user_id, "linked_accounts": 3, "fraud_score": 0.05} async def main(): orchestrator = AgentOrchestrator(global_timeout_sec=15.0) tasks = [ TaskSpec( task_id="credit_query", func=query_credit, args=("user_001",), timeout_sec=5.0, critical=True, ), TaskSpec( task_id="behavior_analysis", func=analyze_behavior, args=("user_001",), timeout_sec=5.0, critical=True, ), TaskSpec( task_id="network_scan", func=scan_network, args=("user_001",), timeout_sec=5.0, # 非关键任务:失败不影响整体决策 critical=False, ), ] results = await orchestrator.execute_parallel(tasks) summary = orchestrator.get_summary() logger.info("执行摘要: %s", summary) if __name__ == "__main__": asyncio.run(main())核心设计点:
- 状态隔离:每个子任务通过独立的
asyncio.wait_for执行,超时互不影响。TaskResult数据结构独立存储每个任务的状态。 - 失败恢复:通过
max_retries和retry_delay_sec控制重试策略。区分关键任务和非关键任务,非关键任务失败不阻断整体流程。 - 结果聚合:
asyncio.gather(return_exceptions=True)确保单个任务异常不会取消其他任务,所有结果统一收集。
四、并行编排的代价与适用边界
4.1 并行度不是越高越好
并行子任务数过多会导致资源竞争。LLM API 调用通常有并发限制(如 OpenAI 的 RPM/TPM 限制),无限制并行会触发限流,反而增加重试开销。建议根据 API 的并发上限设置信号量(asyncio.Semaphore)控制并发度。
4.2 子任务间有依赖时需 DAG 调度
Fan-out/Fan-in 模型假设子任务之间无依赖。如果子任务 B 依赖子任务 A 的输出,需要 DAG 调度器(如 Prefect、Airflow 的思路)。DAG 调度的工程复杂度显著高于简单并行,需要处理拓扑排序、循环依赖检测、中间结果传递等问题。
4.3 部分失败的业务语义
非关键任务失败后,聚合结果中缺少该部分数据。下游消费方必须能处理不完整结果,否则需要降级策略(如用默认值填充)。这个业务语义问题无法在编排层解决,必须在 Agent 的业务逻辑层设计。
4.4 可观测性要求
并行任务的调试难度远高于串行。每个子任务的开始时间、结束时间、重试次数必须记录,否则生产事故排查无从下手。get_summary()方法提供基础信息,生产环境应接入分布式追踪(如 OpenTelemetry)。
4.5 适用与禁用场景
| 场景 | 是否适用 | 原因 |
|---|---|---|
| 多个独立 API 并行调用 | 适用 | 无依赖,收益明确 |
| LLM 多轮对话 | 不适用 | 前后轮次有依赖 |
| 数据 ETL 流水线 | 看情况 | 有依赖时需 DAG |
| 实时决策(SLA < 1s) | 适用 | 并行缩短耗时 |
| 批量离线处理 | 不适用 | 串行更简单,并行收益低 |
五、总结
Agent 多任务并行编排的核心收益是降低整体耗时,核心代价是工程复杂度和调试难度。Fan-out/Fan-in 模型适合子任务无依赖的场景,有依赖时需引入 DAG 调度。工程实现上,状态隔离、超时控制、失败恢复、关键任务标记是四个必须覆盖的要素。并行度需要根据下游 API 的并发限制做约束,避免触发限流。可观测性不是可选项,而是生产环境的基本要求。