news 2026/1/26 22:58:03

(10-6-01)基于MCP实现的多智能体协同系统:应用整合模块(1)为MCP服务的路由配置

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
(10-6-01)基于MCP实现的多智能体协同系统:应用整合模块(1)为MCP服务的路由配置

10.6 应用整合模块

在本项目中,“agent_mcp/app”目录应用于实现应用程序的核心逻辑与流程管理,承载着应用的主要功能实现,包括应用的初始化、路由配置以处理各类请求,以及服务生命周期的管理等,为整个项目的应用层提供基础架构和核心业务逻辑支持,是应用运行和功能交互的核心场所。

10.6.1 为MCP服务的路由配置

文件agent_mcp/app/routes.py是本项目中负责API路由配置的核心文件,与MCP系统的前端交互和数据流转紧密关联。它定义了一系列API 端点,涵盖系统状态查询、代理与任务数据管理、图形化数据展示、节点详情获取、内存(上下文)增删改查等功能,通过调用MCP核心模块(如认证、数据库操作、工具实现等)处理请求,实现了前端与后端的数据交互,为MCP系统的 dashboard 展示、代理管理、任务管控等核心功能提供了接口支撑,是MCP系统中连接前端界面与后端业务逻辑的关键桥梁。

(1)下面代码的功能是提供系统状态查询API接口。原理是处理HTTP请求,验证请求方法,通过调用数据库操作函数获取活跃代理和任务数据,统计不同状态的任务数量,生成包含服务器运行状态、代理数量、任务数量及最后更新时间的JSON响应,为前端提供系统整体状态信息。

async def simple_status_api_route(request: Request) -> JSONResponse: # 处理CORS预请求的OPTIONS方法 if request.method == 'OPTIONS': return await handle_options(request) try: # 获取系统状态 from ..db.actions.agent_db import get_all_active_agents_from_db from ..db.actions.task_db import get_all_tasks_from_db agents = get_all_active_agents_from_db() # 从数据库获取所有活跃代理 tasks = get_all_tasks_from_db() # 从数据库获取所有任务 # 统计任务状态数量 pending_tasks = len([t for t in tasks if t.get('status') == 'pending']) # 待处理任务数 completed_tasks = len([t for t in tasks if t.get('status') == 'completed']) # 已完成任务数 return JSONResponse({ "server_running": True, # 服务器运行状态 "total_agents": len(agents), # 总代理数 "active_agents": len([a for a in agents if a.get('status') == 'active']), # 活跃代理数 "total_tasks": len(tasks), # 总任务数 "pending_tasks": pending_tasks, # 待处理任务数 "completed_tasks": completed_tasks, # 已完成任务数 "last_updated": datetime.datetime.now().isoformat() # 最后更新时间 }) except Exception as e: logger.error(f"simple_status_api_route 错误: {e}", exc_info=True) return JSONResponse({"error": f"获取简单状态失败: {str(e)}"}, status_code=500)

(2)下面代码的功能是提供节点详情查询API接口。原理是接收请求中的节点ID参数,根据节点ID解析节点类型,通过数据库查询获取对应类型节点(代理、任务、上下文、文件、管理员)的详细数据、相关操作记录和关联信息,生成JSON响应返回给前端,支持前端展示节点的详细信息。

async def node_details_api_route(request: Request) -> JSONResponse: node_id = request.query_params.get('node_id') # 从请求参数获取节点ID if not node_id: return JSONResponse({'error': '缺少 node_id 参数'}, status_code=400) # 初始化节点详情字典 details: Dict[str, Any] = {'id': node_id, 'type': 'unknown', 'data': {}, 'actions': [], 'related': {}} conn = None try: conn = get_db_connection() cursor = conn.cursor() # 从节点ID解析节点类型和实际ID parts = node_id.split('_', 1) node_type_from_id = parts[0] if len(parts) > 1 else node_id actual_id_from_node = parts[1] if len(parts) > 1 else (node_id if node_type_from_id != 'admin' else 'admin') details['type'] = node_type_from_id # 根据节点类型查询详情 if node_type_from_id == 'agent': # 代理节点 cursor.execute("SELECT * FROM agents WHERE agent_id = ?", (actual_id_from_node,)) row = cursor.fetchone() if row: details['data'] = dict(row) # 查询代理相关操作记录 cursor.execute("SELECT timestamp, action_type, task_id, details FROM agent_actions WHERE agent_id = ? ORDER BY timestamp DESC LIMIT 10", (actual_id_from_node,)) details['actions'] = [dict(r) for r in cursor.fetchall()] # 查询代理分配的任务 cursor.execute("SELECT task_id, title, status, priority FROM tasks WHERE assigned_to = ? ORDER BY created_at DESC LIMIT 10", (actual_id_from_node,)) details['related']['assigned_tasks'] = [dict(r) for r in cursor.fetchall()] # 其他节点类型(任务、上下文、文件、管理员)的查询逻辑类似... if not details.get('data') and node_type_from_id not in ['admin']: return JSONResponse({'error': '节点数据未找到或类型不识别'}, status_code=404) except Exception as e: logger.error(f"获取节点 {node_id} 详情错误: {e}", exc_info=True) return JSONResponse({'error': f'获取节点详情失败: {str(e)}'}, status_code=500) finally: if conn: conn.close() # 确保数据库连接关闭 return JSONResponse(details)

(3)下面代码的功能是提供创建代理的API接口。原理是处理POST请求,验证管理员身份,解析请求中的代理ID、能力等参数,调用代理创建工具实现函数创建代理,解析工具返回结果,生成包含创建状态和代理令牌的JSON响应,支持通过前端界面创建新代理。

async def create_agent_dashboard_api_route(request: Request) -> JSONResponse: """用于创建代理的仪表盘API端点,内部调用管理员工具。""" if request.method != 'POST': return JSONResponse({"error": "方法不允许"}, status_code=405) try: data = await get_sanitized_json_body(request) # 获取并验证请求体JSON数据 admin_auth_token = data.get("token") # 管理员认证令牌 agent_id = data.get("agent_id") # 要创建的代理ID capabilities = data.get("capabilities", []) # 代理能力(可选) working_directory = data.get("working_directory") # 工作目录(可选) # 验证管理员身份 if not verify_token(admin_auth_token, "admin"): return JSONResponse({"message": "未授权: API调用的管理员令牌无效"}, status_code=401) if not agent_id: # 检查代理ID是否存在 return JSONResponse({"message": "代理ID为必需项"}, status_code=400) # 准备工具函数的参数 tool_args = { "token": admin_auth_token, # 工具实现会再次验证此令牌 "agent_id": agent_id, "capabilities": capabilities, "working_directory": working_directory } # 调用已重构的工具实现函数 result_list: List[mcp_types.TextContent] = await create_agent_tool_impl(tool_args) # 处理工具返回结果,构建JSON响应 if result_list and result_list[0].text.startswith(f"代理 '{agent_id}' 创建成功。"): # 从结果中提取代理令牌(用于仪表盘显示) agent_token_from_result = None for line in result_list[0].text.split('\n'): if line.startswith("令牌: "): agent_token_from_result = line.split("令牌: ", 1)[1] break return JSONResponse({ "message": f"代理 '{agent_id}' 通过仪表盘API创建成功。", "agent_token": agent_token_from_result # 可能为None(如果解析失败) }) else: # 返回工具返回的错误信息 error_message = result_list[0].text if result_list else "创建代理时发生未知错误。" status_code = 400 # 默认错误状态码 if "未授权" in error_message: status_code = 401 if "已存在" in error_message: status_code = 409 # 冲突 return JSONResponse({"message": error_message}, status_code=status_code) except ValueError as e_val: # 来自get_sanitized_json_body的错误 return JSONResponse({"message": str(e_val)}, status_code=400) except Exception as e: logger.error(f"create_agent_dashboard_api_route 错误: {e}", exc_info=True) return JSONResponse({"message": f"通过仪表盘API创建代理时出错: {str(e)}"}, status_code=500)

(4)下面代码的功能是提供全量数据查询API接口。原理是处理GET 请求,连接数据库,查询所有代理(含令牌)、任务、上下文条目、最近操作记录和文件元数据,整合全局文件映射和管理员令牌等信息,生成包含所有关键数据的JSON响应,支持前端缓存数据以减少请求次数。

async def all_data_api_route(request: Request) -> JSONResponse: """一次调用获取所有数据,用于前端缓存""" if request.method == 'OPTIONS': return await handle_options(request) conn = None try: conn = get_db_connection() cursor = conn.cursor() # 获取所有代理及其令牌 cursor.execute("SELECT * FROM agents ORDER BY created_at DESC") agents_data = [] for row in cursor.fetchall(): agent_dict = dict(row) agent_id = agent_dict['agent_id'] # 从活跃代理中查找该代理的令牌 agent_token = None for token, data in g.active_agents.items(): if data.get("agent_id") == agent_id and data.get("status") != "terminated": agent_token = token break agent_dict['auth_token'] = agent_token agents_data.append(agent_dict) # 添加管理员作为特殊代理 agents_data.insert(0, { 'agent_id': 'Admin', 'status': 'system', 'auth_token': g.admin_token, 'created_at': 'N/A', 'current_task': 'N/A' }) # 获取所有任务 cursor.execute("SELECT * FROM tasks ORDER BY created_at DESC") tasks_data = [dict(row) for row in cursor.fetchall()] # 获取所有上下文条目 cursor.execute("SELECT * FROM project_context ORDER BY last_updated DESC") context_data = [dict(row) for row in cursor.fetchall()] # 获取最近的代理操作(最近100条) cursor.execute(""" SELECT * FROM agent_actions ORDER BY timestamp DESC LIMIT 100 """) actions_data = [dict(row) for row in cursor.fetchall()] # 获取文件元数据 cursor.execute("SELECT * FROM file_metadata") file_metadata = [dict(row) for row in cursor.fetchall()] # 构建响应数据 response_data = { "agents": agents_data, "tasks": tasks_data, "context": context_data, "actions": actions_data, "file_metadata": file_metadata, "file_map": g.file_map, "admin_token": g.admin_token, "timestamp": datetime.datetime.now().isoformat() } return JSONResponse( response_data, headers={ # 设置CORS头 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'GET, OPTIONS', 'Access-Control-Allow-Headers': 'Content-Type' } ) except Exception as e: logger.error(f"获取所有数据错误: {e}", exc_info=True) return JSONResponse({"error": f"获取所有数据失败: {str(e)}"}, status_code=500) finally: if conn: conn.close() # 确保数据库连接关闭
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/26 17:29:36

终极指南:在Windows 11上完美运行安卓应用的完整配置方案

终极指南:在Windows 11上完美运行安卓应用的完整配置方案 【免费下载链接】WSA Developer-related issues and feature requests for Windows Subsystem for Android 项目地址: https://gitcode.com/gh_mirrors/ws/WSA 想要在Windows 11电脑上无缝使用海量安…

作者头像 李华
网站建设 2026/1/26 20:54:32

猫抓cat-catch:浏览器资源嗅探工具的终极使用指南

猫抓cat-catch:浏览器资源嗅探工具的终极使用指南 【免费下载链接】cat-catch 猫抓 chrome资源嗅探扩展 项目地址: https://gitcode.com/GitHub_Trending/ca/cat-catch 还在为无法下载网页视频而烦恼吗?猫抓cat-catch这款强大的浏览器资源嗅探工具…

作者头像 李华
网站建设 2026/1/23 15:58:51

Universal x86 Tuning Utility:释放硬件潜能的智能调优方案

Universal x86 Tuning Utility:释放硬件潜能的智能调优方案 【免费下载链接】Universal-x86-Tuning-Utility Unlock the full potential of your Intel/AMD based device. 项目地址: https://gitcode.com/gh_mirrors/un/Universal-x86-Tuning-Utility 想要充…

作者头像 李华
网站建设 2026/1/25 12:42:11

5分钟搞定网盘直链下载:免客户端高速下载终极方案

还在为网盘限速烦恼吗?还在纠结要不要购买昂贵的会员服务吗?网盘直链下载助手为您提供完美的解决方案!这款免费开源工具能够将百度网盘、阿里云盘等主流网盘的文件分享链接转换为真实下载地址,让您无需安装官方客户端即可实现高速…

作者头像 李华
网站建设 2026/1/24 19:19:32

XUnity.AutoTranslator终极指南:3分钟实现游戏无障碍汉化

XUnity.AutoTranslator终极指南:3分钟实现游戏无障碍汉化 【免费下载链接】XUnity.AutoTranslator 项目地址: https://gitcode.com/gh_mirrors/xu/XUnity.AutoTranslator 还在为心爱的海外游戏语言不通而困扰吗?XUnity.AutoTranslator作为一款功…

作者头像 李华