news 2026/1/2 11:24:09

(10-5-02)基于MCP实现的多智能体协同系统:基于MCP的代理通信模块

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
(10-5-02)基于MCP实现的多智能体协同系统:基于MCP的代理通信模块

10.4.2 基于MCP的代理通信模块

文件agent_mcp/tools/agent_communication_tools.py是本项目中基于MCP协议的代理通信工具模块,主要功能是实现代理间的安全、规范通信。此文件提供了如下所示的三大核心功能:

  1. 代理间消息发送:通过权限校验(如管理员可与所有代理通信、普通代理仅能与活跃代理通信等)确保通信合法性,支持文本、协助请求等多种消息类型,可通过tmux会话实时投递或存储待取,并记录消息状态。
  2. 消息检索:代理可以获取发送或接收的消息,支持按类型、已读状态等筛选,且能自动标记已读消息。
  3. 管理员广播:允许管理员向所有活跃代理发送消息,适用于系统通知等场景。同时,所有通信操作均记录到数据库和审计日志,确保可追溯,为多代理协作提供了可靠的信息交互支撑。

(1)下面代码的功能是检查两个代理是否允许通信,确保代理间交互符合系统规则。原理是基于发送者身份(是否为管理员)、发送者与接收者是否为同一代理、接收者是否为管理员代理以及双方是否均处于活跃状态等条件,判断通信权限并返回结果及原因。

def _can_agents_communicate(sender_id: str, recipient_id: str, is_admin: bool) -> tuple[bool, str]: """ 检查两个代理是否允许通信。 参数: sender_id: 发送代理的ID recipient_id: 接收代理的ID is_admin: 发送者是否具有管理员权限 返回: 包含(是否允许: bool, 原因: str)的元组 """ # 管理员可以与任何代理通信 if is_admin: return True, "管理员权限" # 不允许自我通信(应使用内部方法) if sender_id == recipient_id: return False, "不允许自我通信" # 管理员代理始终可以被联系 if recipient_id == "admin" or recipient_id.lower().startswith("admin"): return True, "管理员代理始终可被联系" # 检查接收者是否允许来自发送者的通信 # 此处可扩展为数据库中的权限系统 # 目前使用简单规则:两个代理均活跃时可通信 if sender_id in g.active_agents and recipient_id in g.active_agents: return True, "两个代理均处于活跃状态" # 检查代理是否处于同一任务上下文中 # (这需要额外的任务关系检查) return False, "这些代理之间不允许通信"

(2)下面代码的功能是实现代理间消息发送,支持权限校验和多种投递方式。原理是先验证发送者身份和消息合法性,通过通信权限检查后,生成唯一消息ID并存储到数据库,再根据指定方式(tmux实时投递、存储待取或两者结合)尝试投递消息,记录投递状态并返回结果。

async def send_agent_message_tool_impl(arguments: Dict[str, Any]) -> List[mcp_types.TextContent]: """ 带权限检查的代理间消息发送。 消息可通过tmux会话投递或存储供后续检索。 """ sender_token = arguments.get("token") recipient_id = arguments.get("recipient_id") message_content = arguments.get("message") message_type = arguments.get("message_type", "text") # 文本、协助请求、任务更新 priority = arguments.get("priority", "normal") # 低、正常、高、紧急 deliver_method = arguments.get("deliver_method", "tmux") # tmux、存储、两者皆有 # 身份验证 sender_id = get_agent_id(sender_token) if not sender_id: return [mcp_types.TextContent(type="text", text="未授权:需要有效令牌")] # 验证 if not recipient_id or not message_content: return [mcp_types.TextContent(type="text", text="错误:需要recipient_id和message")] if len(message_content) > 4000: # 合理的消息大小限制 return [mcp_types.TextContent(type="text", text="错误:消息过长(最大4000字符)")] # 停止命令仅限管理员使用 is_admin = verify_token(sender_token, "admin") if message_type == "stop_command" and not is_admin: return [mcp_types.TextContent(type="text", text="错误:仅管理员可发送停止命令")] # 权限检查 can_communicate, reason = _can_agents_communicate(sender_id, recipient_id, is_admin) if not can_communicate: return [mcp_types.TextContent(type="text", text=f"通信被拒绝:{reason}")] # 创建消息数据 message_id = _generate_message_id() timestamp = datetime.datetime.now().isoformat() message_data = { "message_id": message_id, "sender_id": sender_id, "recipient_id": recipient_id, "message_content": message_content, "message_type": message_type, "priority": priority, "timestamp": timestamp, "delivered": False, "read": False } conn = None try: conn = get_db_connection() cursor = conn.cursor() # 将消息存储到数据库 cursor.execute(""" INSERT INTO agent_messages (message_id, sender_id, recipient_id, message_content, message_type, priority, timestamp, delivered, read) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, (message_id, sender_id, recipient_id, message_content, message_type, priority, timestamp, False, False)) # 根据方式尝试投递 delivery_status = "stored" # 默认状态:已存储

(3)下面代码的功能是检索代理的消息,支持多种筛选条件。原理是验证代理身份后,根据是否包含发送/接收消息、消息类型、未读状态等筛选条件从数据库查询消息,可自动标记接收的消息为已读,最后格式化消息列表并返回。

async def get_agent_messages_tool_impl(arguments: Dict[str, Any]) -> List[mcp_types.TextContent]: """ 检索代理的消息。 """ agent_token = arguments.get("token") include_sent = arguments.get("include_sent", False) include_received = arguments.get("include_received", True) mark_as_read = arguments.get("mark_as_read", True) limit = arguments.get("limit", 20) message_type_filter = arguments.get("message_type") unread_only = arguments.get("unread_only", False) # 身份验证 agent_id = get_agent_id(agent_token) if not agent_id: return [mcp_types.TextContent(type="text", text="未授权:需要有效令牌")] # 验证limit参数 try: limit = int(limit) if not (1 <= limit <= 100): limit = 20 except (ValueError, TypeError): limit = 20 conn = None try: conn = get_db_connection() cursor = conn.cursor() # 构建查询 query_conditions = [] query_params = [] if include_received and include_sent: query_conditions.append("(recipient_id = ? OR sender_id = ?)") query_params.extend([agent_id, agent_id]) elif include_received: query_conditions.append("recipient_id = ?") query_params.append(agent_id) elif include_sent: query_conditions.append("sender_id = ?") query_params.append(agent_id) else: return [mcp_types.TextContent(type="text", text="错误:必须包含发送或接收的消息")] if message_type_filter: query_conditions.append("message_type = ?") query_params.append(message_type_filter) if unread_only: query_conditions.append("read = ?") query_params.append(False) where_clause = " AND ".join(query_conditions) query = f""" SELECT message_id, sender_id, recipient_id, message_content, message_type, priority, timestamp, delivered, read FROM agent_messages WHERE {where_clause} ORDER BY timestamp DESC LIMIT ? """ query_params.append(limit) cursor.execute(query, query_params) messages = cursor.fetchall() # 如果需要,将接收的消息标记为已读 if mark_as_read and include_received: message_ids_to_mark = [msg["message_id"] for msg in messages if msg["recipient_id"] == agent_id and not msg["read"]] if message_ids_to_mark: placeholders = ",".join("?" * len(message_ids_to_mark)) cursor.execute(f"UPDATE agent_messages SET read = ? WHERE message_id IN ({placeholders})", [True] + message_ids_to_mark) conn.commit()
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2025/12/29 11:18:57

WAN2.2-14B-Rapid-AllInOne:让8GB显存设备也能玩转AI视频创作

WAN2.2-14B-Rapid-AllInOne&#xff1a;让8GB显存设备也能玩转AI视频创作 【免费下载链接】WAN2.2-14B-Rapid-AllInOne 项目地址: https://ai.gitcode.com/hf_mirrors/Phr00t/WAN2.2-14B-Rapid-AllInOne 还在为AI视频生成的高硬件门槛而苦恼吗&#xff1f;WAN2.2-14B-R…

作者头像 李华
网站建设 2025/12/29 11:18:38

CEM-1板材电气绝缘性能测试方法-工程师实操指南

作为 PCB 工程师&#xff0c;我们不仅要懂 CEM-1 板材的绝缘性能指标&#xff0c;还要掌握正确的测试方法&#xff0c;这样才能验证基材和成品 PCB 是否符合设计要求。今天就给大家详细介绍 CEM-1 板材电气绝缘性能的三种核心测试方法&#xff0c;包括原理、步骤和注意事项&…

作者头像 李华
网站建设 2026/1/2 0:23:05

ComfyUI Portrait Master中文版:从零开始打造专业级肖像生成工作流

ComfyUI Portrait Master中文版&#xff1a;从零开始打造专业级肖像生成工作流 【免费下载链接】comfyui-portrait-master-zh-cn 肖像大师 中文版 comfyui-portrait-master 项目地址: https://gitcode.com/gh_mirrors/co/comfyui-portrait-master-zh-cn 你是否曾经为生成…

作者头像 李华
网站建设 2025/12/29 11:17:39

终极指南:Doom Emacs中异步进程管理引发的性能瓶颈与优化策略

终极指南&#xff1a;Doom Emacs中异步进程管理引发的性能瓶颈与优化策略 【免费下载链接】doomemacs 项目地址: https://gitcode.com/gh_mirrors/doo/doom-emacs 问题背景 在大型项目开发环境中&#xff0c;使用Doom Emacs的开发者经常遇到编辑器响应迟缓的问题&…

作者头像 李华
网站建设 2025/12/29 11:17:25

设计模式:工厂模式概要

目录 一、工厂模式的核心分类 二、1. 简单工厂模式 核心思想 适用场景 无人售货柜项目案例&#xff1a;支付渠道创建 步骤 1&#xff1a;定义产品接口 步骤 2&#xff1a;实现具体产品 步骤 3&#xff1a;创建简单工厂类 步骤 4&#xff1a;客户端调用 优缺点 三、2…

作者头像 李华
网站建设 2026/1/2 8:28:00

深入计算机世界:编码原理终极指南与学习路径

深入计算机世界&#xff1a;编码原理终极指南与学习路径 【免费下载链接】编码---隐匿在计算机软硬件背后的语言.上高清PDF下载 《编码---隐匿在计算机软硬件背后的语言.上》 高清 PDF 下载 项目地址: https://gitcode.com/open-source-toolkit/2c344 探索计算机软硬件背…

作者头像 李华