10.4.2 基于MCP的代理通信模块
文件agent_mcp/tools/agent_communication_tools.py是本项目中基于MCP协议的代理通信工具模块,主要功能是实现代理间的安全、规范通信。此文件提供了如下所示的三大核心功能:
- 代理间消息发送:通过权限校验(如管理员可与所有代理通信、普通代理仅能与活跃代理通信等)确保通信合法性,支持文本、协助请求等多种消息类型,可通过tmux会话实时投递或存储待取,并记录消息状态。
- 消息检索:代理可以获取发送或接收的消息,支持按类型、已读状态等筛选,且能自动标记已读消息。
- 管理员广播:允许管理员向所有活跃代理发送消息,适用于系统通知等场景。同时,所有通信操作均记录到数据库和审计日志,确保可追溯,为多代理协作提供了可靠的信息交互支撑。
(1)下面代码的功能是检查两个代理是否允许通信,确保代理间交互符合系统规则。原理是基于发送者身份(是否为管理员)、发送者与接收者是否为同一代理、接收者是否为管理员代理以及双方是否均处于活跃状态等条件,判断通信权限并返回结果及原因。
def _can_agents_communicate(sender_id: str, recipient_id: str, is_admin: bool) -> tuple[bool, str]: """ 检查两个代理是否允许通信。 参数: sender_id: 发送代理的ID recipient_id: 接收代理的ID is_admin: 发送者是否具有管理员权限 返回: 包含(是否允许: bool, 原因: str)的元组 """ # 管理员可以与任何代理通信 if is_admin: return True, "管理员权限" # 不允许自我通信(应使用内部方法) if sender_id == recipient_id: return False, "不允许自我通信" # 管理员代理始终可以被联系 if recipient_id == "admin" or recipient_id.lower().startswith("admin"): return True, "管理员代理始终可被联系" # 检查接收者是否允许来自发送者的通信 # 此处可扩展为数据库中的权限系统 # 目前使用简单规则:两个代理均活跃时可通信 if sender_id in g.active_agents and recipient_id in g.active_agents: return True, "两个代理均处于活跃状态" # 检查代理是否处于同一任务上下文中 # (这需要额外的任务关系检查) return False, "这些代理之间不允许通信"(2)下面代码的功能是实现代理间消息发送,支持权限校验和多种投递方式。原理是先验证发送者身份和消息合法性,通过通信权限检查后,生成唯一消息ID并存储到数据库,再根据指定方式(tmux实时投递、存储待取或两者结合)尝试投递消息,记录投递状态并返回结果。
async def send_agent_message_tool_impl(arguments: Dict[str, Any]) -> List[mcp_types.TextContent]: """ 带权限检查的代理间消息发送。 消息可通过tmux会话投递或存储供后续检索。 """ sender_token = arguments.get("token") recipient_id = arguments.get("recipient_id") message_content = arguments.get("message") message_type = arguments.get("message_type", "text") # 文本、协助请求、任务更新 priority = arguments.get("priority", "normal") # 低、正常、高、紧急 deliver_method = arguments.get("deliver_method", "tmux") # tmux、存储、两者皆有 # 身份验证 sender_id = get_agent_id(sender_token) if not sender_id: return [mcp_types.TextContent(type="text", text="未授权:需要有效令牌")] # 验证 if not recipient_id or not message_content: return [mcp_types.TextContent(type="text", text="错误:需要recipient_id和message")] if len(message_content) > 4000: # 合理的消息大小限制 return [mcp_types.TextContent(type="text", text="错误:消息过长(最大4000字符)")] # 停止命令仅限管理员使用 is_admin = verify_token(sender_token, "admin") if message_type == "stop_command" and not is_admin: return [mcp_types.TextContent(type="text", text="错误:仅管理员可发送停止命令")] # 权限检查 can_communicate, reason = _can_agents_communicate(sender_id, recipient_id, is_admin) if not can_communicate: return [mcp_types.TextContent(type="text", text=f"通信被拒绝:{reason}")] # 创建消息数据 message_id = _generate_message_id() timestamp = datetime.datetime.now().isoformat() message_data = { "message_id": message_id, "sender_id": sender_id, "recipient_id": recipient_id, "message_content": message_content, "message_type": message_type, "priority": priority, "timestamp": timestamp, "delivered": False, "read": False } conn = None try: conn = get_db_connection() cursor = conn.cursor() # 将消息存储到数据库 cursor.execute(""" INSERT INTO agent_messages (message_id, sender_id, recipient_id, message_content, message_type, priority, timestamp, delivered, read) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, (message_id, sender_id, recipient_id, message_content, message_type, priority, timestamp, False, False)) # 根据方式尝试投递 delivery_status = "stored" # 默认状态:已存储(3)下面代码的功能是检索代理的消息,支持多种筛选条件。原理是验证代理身份后,根据是否包含发送/接收消息、消息类型、未读状态等筛选条件从数据库查询消息,可自动标记接收的消息为已读,最后格式化消息列表并返回。
async def get_agent_messages_tool_impl(arguments: Dict[str, Any]) -> List[mcp_types.TextContent]: """ 检索代理的消息。 """ agent_token = arguments.get("token") include_sent = arguments.get("include_sent", False) include_received = arguments.get("include_received", True) mark_as_read = arguments.get("mark_as_read", True) limit = arguments.get("limit", 20) message_type_filter = arguments.get("message_type") unread_only = arguments.get("unread_only", False) # 身份验证 agent_id = get_agent_id(agent_token) if not agent_id: return [mcp_types.TextContent(type="text", text="未授权:需要有效令牌")] # 验证limit参数 try: limit = int(limit) if not (1 <= limit <= 100): limit = 20 except (ValueError, TypeError): limit = 20 conn = None try: conn = get_db_connection() cursor = conn.cursor() # 构建查询 query_conditions = [] query_params = [] if include_received and include_sent: query_conditions.append("(recipient_id = ? OR sender_id = ?)") query_params.extend([agent_id, agent_id]) elif include_received: query_conditions.append("recipient_id = ?") query_params.append(agent_id) elif include_sent: query_conditions.append("sender_id = ?") query_params.append(agent_id) else: return [mcp_types.TextContent(type="text", text="错误:必须包含发送或接收的消息")] if message_type_filter: query_conditions.append("message_type = ?") query_params.append(message_type_filter) if unread_only: query_conditions.append("read = ?") query_params.append(False) where_clause = " AND ".join(query_conditions) query = f""" SELECT message_id, sender_id, recipient_id, message_content, message_type, priority, timestamp, delivered, read FROM agent_messages WHERE {where_clause} ORDER BY timestamp DESC LIMIT ? """ query_params.append(limit) cursor.execute(query, query_params) messages = cursor.fetchall() # 如果需要,将接收的消息标记为已读 if mark_as_read and include_received: message_ids_to_mark = [msg["message_id"] for msg in messages if msg["recipient_id"] == agent_id and not msg["read"]] if message_ids_to_mark: placeholders = ",".join("?" * len(message_ids_to_mark)) cursor.execute(f"UPDATE agent_messages SET read = ? WHERE message_id IN ({placeholders})", [True] + message_ids_to_mark) conn.commit()