news 2026/2/8 20:44:01

878-LangChain框架Use-Cases - Multi-AgentSchedulerSystem多智能体调度系统

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
878-LangChain框架Use-Cases - Multi-AgentSchedulerSystem多智能体调度系统

目录

  • 案例目标
  • 技术栈与核心依赖
  • 环境配置
  • 案例实现
  • 案例效果
  • 案例实现思路
  • 扩展建议
  • 总结

案例目标

本案例实现了一个基于多智能体的调度系统,能够从自然语言查询中提取时间信息,执行定时信息检索,并通过邮件发送结果。系统架构由Query Analysis Agent、Search Router、Response Agent和Scheduling System组成,实现了从自然语言查询到定时信息检索与邮件发送的全流程自动化。

主要功能包括:

  • 自然语言查询解析与时间信息提取
  • 多种搜索类型支持(学术论文、新闻、一般搜索)
  • 任务调度与执行管理
  • 搜索结果格式化与邮件发送
  • 任务状态跟踪与持久化存储

技术栈与核心依赖

主要技术栈
  • 语言:Python 3.x
  • AI框架:LangChain, OpenAI GPT-4
  • 任务调度:Schedule
  • 邮件服务:Yagmail
  • 向量数据库:ChromaDB
  • 搜索API:arXiv API, NewsAPI, SerpAPI
核心依赖
库名用途
langchain构建AI应用的核心框架
langchain-openaiOpenAI模型集成
schedule任务调度
yagmail邮件发送
chromadb向量数据库存储
requestsHTTP请求处理
urllibURL处理与请求

环境配置

API密钥配置

系统需要配置以下API密钥:

环境变量配置

import os # OpenAI API密钥 os.environ["OPENAI_API_KEY"] = "your_openai_api_key" # NewsAPI密钥 os.environ["NEWS_API_KEY"] = "your_news_api_key" # SerpAPI密钥 os.environ["SERPAPI_API_KEY"] = "your_serpapi_key"
邮件配置

系统使用Gmail发送邮件,需要配置Gmail应用密码:

注意:需要在Google账户中启用两步验证,并生成应用密码。 Google账户安全设置

邮件配置

email_config = { "username": "your_email@gmail.com", "password": "your_app_password", # Gmail应用密码 }
依赖安装

requirements.txt

langchain langchain-openai langchain-community schedule yagmail chromadb requests pytz

案例实现

1. Query Analysis Agent - 查询分析智能体

Query Analysis Agent负责解析自然语言查询,提取时间信息和搜索需求。

QueryAnalysisAgent类

from langchain_openai import ChatOpenAI from langchain.prompts import PromptTemplate from langchain.schema import HumanMessage import json import re from datetime import datetime, timedelta import pytz class QueryAnalysisAgent: def __init__(self): self.llm = ChatOpenAI(model="gpt-4o") self.setup_prompts() def setup_prompts(self): # 时间提取提示 self.time_extraction_prompt = PromptTemplate.from_template( """Extract time information from the following query. Query: {query} Return a JSON object with the following fields: - target_time: The specific time mentioned in the query (in YYYY-MM-DD HH:MM:SS format) - execution_time: The time to execute the task (5 minutes before target_time) - time_sensitivity: "urgent" if the query needs immediate results, "normal" otherwise If no specific time is mentioned, use current time as target_time.""" ) # 任务分析提示 self.task_analysis_prompt = PromptTemplate.from_template( """Analyze the following query to determine search requirements. Query: {query} Time info: {time_info} Return a JSON object with the following fields: - task_type: "search" or other task types - search_type: "research_paper", "news", or "general" - keywords: List of relevant keywords for the search - requirements: Any specific requirements mentioned - time_sensitivity: "urgent" or "normal" """ ) def extract_time(self, query: str) -> dict: """从查询中提取时间信息""" try: # 使用LLM提取时间信息 response = self.time_extraction_prompt.format(query=query) response = self.llm.invoke(response) time_info = json.loads(response.content) # 处理时间信息 if "target_time" in time_info: # 如果是相对时间(如"7 AM"),转换为具体时间 target_time = self._parse_relative_time(time_info["target_time"]) time_info["target_time"] = target_time # 执行时间设为目标时间前5分钟 execution_time = target_time - timedelta(minutes=5) time_info["execution_time"] = execution_time return time_info except Exception as e: # 默认处理:当前时间+1小时作为目标时间 now = datetime.now(pytz.utc) target_time = now + timedelta(hours=1) execution_time = target_time - timedelta(minutes=5) return { "target_time": target_time, "execution_time": execution_time, "time_sensitivity": "normal" } def analyze_task(self, query: str, time_info: dict) -> dict: """分析查询任务类型和搜索需求""" try: # 使用LLM分析任务 response = self.task_analysis_prompt.format( query=query, time_info=json.dumps(time_info) ) response = self.llm.invoke(response) task_info = json.loads(response.content) # 添加原始查询和状态 task_info["original_query"] = query task_info["status"] = "success" return task_info except Exception as e: # 默认处理:一般搜索 return { "task_type": "search", "search_type": "general", "keywords": query.split(), "requirements": "minimum 5 results", "time_sensitivity": "normal", "original_query": query, "status": "success" } def analyze_query(self, query: str) -> dict: """完整的查询分析流程""" # 提取时间信息 time_info = self.extract_time(query) # 分析任务需求 task_info = self.analyze_task(query, time_info) # 合并结果 result = {**time_info, **task_info} return result def _parse_relative_time(self, time_str: str) -> datetime: """解析相对时间字符串为具体时间""" now = datetime.now(pytz.utc) # 处理常见时间格式 # 例如: "7 AM", "3 PM", "14:15" time_match = re.search(r'(\d{1,2})(?::(\d{2}))?\s*(AM|PM|am|pm)?', time_str) if time_match: hour = int(time_match.group(1)) minute = int(time_match.group(2)) if time_match.group(2) else 0 period = time_match.group(3) # 处理AM/PM if period and period.upper() == "PM" and hour != 12: hour += 12 elif period and period.upper() == "AM" and hour == 12: hour = 0 # 创建目标时间(今天) target_time = now.replace(hour=hour, minute=minute, second=0, microsecond=0) # 如果目标时间已过,则设为明天 if target_time < now: target_time += timedelta(days=1) return target_time # 默认返回当前时间+1小时 return now + timedelta(hours=1)
2. Search Router - 搜索路由器

Search Router作为智能交通控制器,根据查询分析将请求路由到最合适的专业搜索代理。

SearchRouter类

import urllib.request import xml.etree.ElementTree as ET import requests from urllib.parse import urlencode from langchain_community.utilities import SerpAPIWrapper from typing import Dict, Any, List from datetime import datetime, timedelta import pytz class SearchRouter: def __init__(self): # 初始化专业搜索代理 self.paper_search_agent = PaperSearchAgent() self.news_search_agent = NewsSearchAgent() self.general_search_agent = GeneralSearchAgent() def route_and_search(self, query_analysis: dict) -> dict: """根据查询分析将搜索请求路由到合适的搜索代理 Args: query_analysis (dict): QueryAnalysisAgent的查询分析结果 Returns: dict: 包含搜索结果的字典,包括成功/失败状态和相关信息 """ try: # 检查搜索类型 search_type = query_analysis.get("search_type") # 记录开始时间用于日志记录 start_time = datetime.now(pytz.utc) # 执行搜索 if search_type == "research_paper": print("Performing research paper search...") result = self.paper_search_agent.perform_search(query_analysis) elif search_type == "news": print("Performing news search...") result = self.news_search_agent.perform_search(query_analysis) elif search_type == "general": print("Performing general search...") result = self.general_search_agent.perform_search(query_analysis) else: return { "status": "error", "error_message": f"Unsupported search type: {search_type}", "original_query": query_analysis.get("original_query"), } # 计算搜索持续时间 end_time = datetime.now(pytz.utc) search_duration = (end_time - start_time).total_seconds() # 添加元数据到结果 result.update({ "search_type": search_type, "search_duration": search_duration, "search_timestamp": end_time.isoformat(), "original_query": query_analysis.get("original_query"), }) return result except Exception as e: return { "status": "error", "error_message": str(e), "original_query": query_analysis.get("original_query"), "search_type": query_analysis.get("search_type"), }
3. 专业搜索代理

系统包含三种专业搜索代理:PaperSearchAgent、NewsSearchAgent和GeneralSearchAgent。

3.1 PaperSearchAgent - 学术论文搜索代理

PaperSearchAgent类

class PaperSearchAgent: def __init__(self): self.base_url = "http://export.arxiv.org/api/query" def perform_search(self, query_info: Dict[str, Any]) -> Dict[str, Any]: try: keywords = self._process_keywords(query_info["keywords"]) max_results = self._extract_max_results(query_info.get("requirements", "")) url = f"{self.base_url}?search_query=all:{keywords}&start=0&max_results={max_results}" response = urllib.request.urlopen(url) data = response.read().decode("utf-8") results = self._parse_arxiv_results(data) return { "status": "success", "results": results, "total_found": len(results), "returned_count": len(results), "query_info": query_info, } except Exception as e: return { "status": "error", "error_message": str(e), "query_info": query_info, } def _process_keywords(self, keywords: List[str]) -> str: # 移除时间相关关键词 filtered_keywords = [ k for k in keywords if not any( time in k.lower() for time in ["hour", "morning", "afternoon", "evening"] ) ] return "+".join(filtered_keywords) def _extract_max_results(self, requirements: str) -> int: import re # 提取数字 numbers = re.findall(r"\d+", requirements) return int(numbers[0]) if numbers else 5 def _parse_arxiv_results(self, data: str) -> List[Dict[str, Any]]: root = ET.fromstring(data) results = [] for entry in root.findall("{http://www.w3.org/2005/Atom}entry"): title = entry.find("{http://www.w3.org/2005/Atom}title").text url = entry.find("{http://www.w3.org/2005/Atom}id").text published = entry.find("{http://www.w3.org/2005/Atom}published").text summary = entry.find("{http://www.w3.org/2005/Atom}summary").text results.append({ "type": "research_paper", "title": title, "url": url, "published_date": published[:10], "summary": summary, "source": "arxiv", }) return results
3.2 NewsSearchAgent - 新闻搜索代理

NewsSearchAgent类

class NewsSearchAgent: def __init__(self, api_key: str = None): """使用NewsAPI初始化新闻搜索代理。 NewsAPI遵循REST API结构,基础URL为'https://newsapi.org/v2'。 它提供两个主要端点: - /everything: 搜索整个新闻档案。 - /top-headlines: 获取最新的头条新闻。 """ self.api_key = os.environ["NEWS_API_KEY"] if not self.api_key: raise ValueError("NewsAPI key is required") self.base_url = "https://newsapi.org/v2" def perform_search( self, query_info: Dict[str, Any], max_results: int = 5 ) -> Dict[str, Any]: """根据给定的查询信息执行新闻搜索。 Args: query_info (Dict[str, Any]): 包含搜索参数的字典。 max_results (int): 返回的最大结果数。 Returns: Dict[str, Any]: 包含搜索结果或错误消息的字典。 """ try: # 提取实际搜索词(排除时间相关关键词) search_terms = query_info.get( "search_terms", query_info.get("keywords", []) ) # 检查是否为实时新闻搜索 is_realtime = query_info.get("time_sensitivity") == "urgent" from_date = datetime.now() - timedelta(hours=1 if is_realtime else 24) # 配置'everything'端点的参数 params = { "q": " ".join(search_terms), # 排除时间相关关键词 "from": from_date.strftime("%Y-%m-%d"), "sortBy": "publishedAt", "language": "en", "apiKey": self.api_key, } # 构建API请求URL url = f"{self.base_url}/everything?{urlencode(params)}" # 发送API请求 response = requests.get(url) data = response.json() # 检查响应状态 if response.status_code != 200: return { "status": "error", "error_message": data.get("message", "Unknown error"), "query_info": query_info, } # 处理和格式化结果 articles = data.get("articles", []) formatted_results = [] for article in articles[:max_results]: formatted_results.append({ "title": article.get("title"), "description": article.get("description"), "url": article.get("url"), "published_at": article.get("publishedAt"), "source": article.get("source", {}).get("name"), "content": article.get("content"), }) return { "status": "success", "results": formatted_results, "total_results": data.get("totalResults", 0), "returned_count": len(formatted_results), "search_parameters": { "keywords": query_info["keywords"], "from_date": from_date.strftime("%Y-%m-%d"), "language": "en", }, } except Exception as e: return { "status": "error", "error_message": str(e), "query_info": query_info, } def get_top_headlines( self, country: str = "us", category: str = None ) -> Dict[str, Any]: """获取头条新闻。 Args: country (str): 国家代码(默认:'us'代表美国)。 category (str, optional): 新闻类别(例如:business, technology)。 Returns: Dict[str, Any]: 包含头条新闻的字典。 """ params = {"country": country, "apiKey": self.api_key} if category: params["category"] = category url = f"{self.base_url}/top-headlines?{urlencode(params)}" response = requests.get(url) return response.json()
3.3 GeneralSearchAgent - 一般搜索代理

GeneralSearchAgent类

class GeneralSearchAgent: def __init__(self, serpapi_key: str = None): if serpapi_key: os.environ["SERPAPI_API_KEY"] = serpapi_key self.search = SerpAPIWrapper() def setup_search_parameters(self, query_info: Dict[str, Any]) -> List[str]: """为一般搜索构建搜索查询。""" keywords = " ".join(query_info["keywords"]) # 设置基础搜索查询 search_queries = [ f"{keywords} lang:ko", # 韩语结果 keywords, # 一般搜索 ] return search_queries def perform_search( self, query_info: Dict[str, Any], max_results: int = 5 ) -> Dict[str, Any]: """执行一般搜索并返回结果。""" try: search_queries = self.setup_search_parameters(query_info) all_results = [] for query in search_queries: raw_results = self.search.run(query) parsed_results = self._parse_general_results(raw_results) all_results.extend(parsed_results) # 按相关性分数排序 sorted_results = sorted( all_results, key=lambda x: x.get("relevance_score", 0), reverse=True )[:max_results] return { "status": "success", "results": sorted_results, "total_found": len(all_results), "returned_count": len(sorted_results), "query_info": query_info, } except Exception as e: return { "status": "error", "error_message": str(e), "query_info": query_info, } def _parse_general_results(self, raw_results: str) -> List[Dict[str, Any]]: """解析一般搜索结果。""" parsed_results = [] for result in raw_results.split("\n"): if not result.strip(): continue parsed_results.append({ "type": "general", "title": self._extract_title(result), "content": result, "url": self._extract_url(result), "relevance_score": self._calculate_relevance(result), }) return parsed_results def _extract_title(self, result: str) -> str: """从结果中提取标题。""" return result.split(".")[0].strip()[:100] def _extract_url(self, result: str) -> str: """从结果中提取URL。""" import re urls = re.findall(r"https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+[^\s]*", result) return urls[0] if urls else "" def _calculate_relevance(self, result: str) -> float: """计算搜索结果的相关性分数。""" relevance_score = 0.5 # 基础分数 # 基于关键词匹配计算分数 keywords = ["official", "guide", "tutorial", "review", "recommendation"] lower_result = result.lower() for keyword in keywords: if keyword in lower_result: relevance_score += 0.1 return min(relevance_score, 1.0) # 确保分数不超过1.0
4. Response Agent - 响应代理

Response Agent作为系统的专家通信者,将原始搜索结果转换为结构良好、可读的内容,满足用户需求。

ResponseAgent类

from langchain_openai import ChatOpenAI from langchain.prompts import PromptTemplate import json class ResponseAgent: def __init__(self): self.llm = ChatOpenAI(model="gpt-4o") self.setup_prompts() def setup_prompts(self): # 研究论文搜索提示 self.paper_prompt = PromptTemplate.from_template( """Please organize the following research paper search results in email format. Search term: {query} Search results: {results} Format as follows: 1. Email subject: "Research Paper Search Results for [search term]" 2. Body: - Greeting - "Here are the organized research paper results for your search." - Number each paper and format as follows: 1. Paper title: [title] - Summary: [core research content and key findings] - URL: [link] - Closing remarks """ ) # 新闻搜索提示 self.news_prompt = PromptTemplate.from_template( """Please organize the following news search results in email format. Search term: {query} Search results: {results} Format as follows: 1. Email subject: "Latest News Updates for [search term]" 2. Body: - Greeting - "Here are the latest news articles related to your search topic." - Number each news item and format as follows: 1. [title] - [news source] - Main content: [key content summary] - Published date: [date] - URL: [link] - Closing remarks """ ) # 一般搜索提示 self.general_prompt = PromptTemplate.from_template( """Please organize the following search results in email format. Search term: {query} Search results: {results} Format as follows: 1. Email subject: "Search Results for [search term]" 2. Body: - Greeting - "Here are the organized results for your search." - Number each result and format as follows: 1. [title] - Content: [main content summary] - Source: [website or platform name] - URL: [link] - Closing remarks """ ) def format_results(self, search_results): try: # 处理无结果或错误的情况 if search_results.get("status") == "error": return { "subject": "Search Error Notification", "body": f"An error occurred during search: {search_results.get('error_message', 'Unknown error')}", } # 处理无结果的情况 if not search_results.get("results"): return { "subject": "No Search Results", "body": f"No results found for search term '{search_results.get('original_query', '')}'.", } # 根据搜索类型选择提示 search_type = search_results.get("search_type", "general") if search_type == "research_paper": prompt = self.paper_prompt elif search_type == "news": prompt = self.news_prompt else: prompt = self.general_prompt # 准备结果格式化的输入 formatted_input = { "query": search_results.get("original_query", ""), "results": json.dumps( search_results.get("results", []), ensure_ascii=False, indent=2 ), } # 通过LLM生成响应 response = prompt.format(**formatted_input) response = self.llm.invoke(response) try: # 尝试JSON解析 return json.loads(response.content) except json.JSONDecodeError: # 如果JSON解析失败,返回默认格式 return { "subject": f"Search Results for [{formatted_input['query']}]", "body": response.content, } except Exception as e: return { "subject": "Result Processing Error", "body": f"An error occurred while processing results: {str(e)}\n\nOriginal query: {search_results.get('original_query', '')}", }
5. Scheduled Search System - 调度搜索系统

ScheduledSearchSystem管理搜索任务的完整生命周期,从调度到结果交付。

ScheduledSearchSystem类

import schedule import time import yagmail import chromadb from chromadb.config import Settings import threading import time from queue import Queue class ScheduledSearchSystem: def __init__(self, email_config: Dict[str, Any]): print(f"\n[{self._get_current_time()}] Initializing system...") self.query_analyzer = QueryAnalysisAgent() self.search_router = SearchRouter() self.response_agent = ResponseAgent() self.client = chromadb.PersistentClient(path="./search_data") # 邮件配置 self.email_config = email_config self.yag = yagmail.SMTP(email_config["username"], email_config["password"]) print(f"[{self._get_current_time()}] Email client configuration complete") self.scheduled_tasks = {} self.setup_collections() # 添加完成标志 self.is_completed = False self.completion_event = threading.Event() # 启动调度器 self.scheduler_thread = threading.Thread(target=self._run_scheduler) self.scheduler_thread.daemon = True self.scheduler_thread.start() print(f"[{self._get_current_time()}] System initialization complete\n") def _get_current_time(self): """返回当前时间字符串""" return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def setup_collections(self): """设置ChromaDB集合""" print(f"[{self._get_current_time()}] Starting ChromaDB collection setup...") self.results_collection = self.client.get_or_create_collection( name="search_results", metadata={"description": "Raw search results"} ) self.formatted_collection = self.client.get_or_create_collection( name="formatted_responses", metadata={"description": "Formatted responses for email delivery"}, ) self.schedule_collection = self.client.get_or_create_collection( name="scheduled_tasks", metadata={"description": "Scheduled search and email tasks"}, ) print(f"[{self._get_current_time()}] ChromaDB collection setup complete") def schedule_task(self, query: str, user_email: str) -> Dict[str, Any]: """调度任务""" try: print(f"\n[{self._get_current_time()}] Starting new task scheduling...") print(f"Query: {query}") print(f"Email: {user_email}") # 查询分析 query_analysis = self.query_analyzer.analyze_query(query) execution_time = query_analysis["execution_time"] target_time = query_analysis["target_time"] print(f"Scheduled search execution time: {execution_time}") print(f"Scheduled email delivery time: {target_time}") # 生成任务ID schedule_id = f"task_{datetime.now(pytz.UTC).timestamp()}" print(f"Generated task ID: {schedule_id}") # 保存任务信息 task_info = { "query": query, "email": user_email, "execution_time": execution_time.isoformat(), "target_time": target_time.isoformat(), "search_type": query_analysis["search_type"], "status": "scheduled", } self.scheduled_tasks[schedule_id] = task_info # 保存到ChromaDB print( f"[{self._get_current_time()}] Saving task information to ChromaDB..." ) self.schedule_collection.add( documents=[json.dumps(task_info)], metadatas=[{"type": "schedule", "status": "pending"}], ids=[schedule_id], ) print(f"[{self._get_current_time()}] Task information saved") # 调度搜索执行 execution_time_str = execution_time.strftime("%H:%M") schedule.every().day.at(execution_time_str).do( self.execute_search, schedule_id=schedule_id ).tag(schedule_id) print(f"[{self._get_current_time()}] Search task scheduling complete") return { "status": "success", "message": "Task successfully scheduled", "schedule_id": schedule_id, "execution_time": execution_time, "target_time": target_time, } except Exception as e: print(f"[{self._get_current_time()}] Task scheduling failed: {str(e)}") return {"status": "error", "error_message": str(e)} def execute_search(self, schedule_id: str) -> bool: """执行搜索""" try: print( f"\n[{self._get_current_time()}] Starting search execution (ID: {schedule_id})" ) task_info = self.scheduled_tasks.get(schedule_id) if not task_info: print(f"[{self._get_current_time()}] Task information not found") return False print(f"[{self._get_current_time()}] Analyzing search query...") query_analysis = self.query_analyzer.analyze_query(task_info["query"]) print(f"[{self._get_current_time()}] Performing search...") search_results = self.search_router.route_and_search(query_analysis) print(f"[{self._get_current_time()}] Formatting search results...") formatted_response = self.response_agent.format_results(search_results) # 保存结果 print(f"[{self._get_current_time()}] Saving search results to ChromaDB...") response_id = f"response_{schedule_id}" self.formatted_collection.add( documents=[json.dumps(formatted_response)], metadatas=[ { "schedule_id": schedule_id, "email": task_info["email"], "target_time": task_info["target_time"], } ], ids=[response_id], ) print(f"[{self._get_current_time()}] Search results saved") # 调度邮件发送 target_time = datetime.fromisoformat(task_info["target_time"]) target_time_str = target_time.strftime("%H:%M") schedule.every().day.at(target_time_str).do( self.send_email, schedule_id=schedule_id ).tag(f"email_{schedule_id}") print( f"[{self._get_current_time()}] Email delivery scheduled (Time: {target_time_str})" ) return True except Exception as e: print(f"[{self._get_current_time()}] Search execution failed: {str(e)}") return False def send_email(self, schedule_id: str) -> bool: """发送邮件""" try: print( f"\n[{self._get_current_time()}] Starting email delivery (ID: {schedule_id})" ) response_id = f"response_{schedule_id}" print(f"[{self._get_current_time()}] Retrieving saved search results...") response_results = self.formatted_collection.get(ids=[response_id]) if not response_results["documents"]: print(f"[{self._get_current_time()}] Search results not found") return False formatted_response = json.loads(response_results["documents"][0]) metadata = response_results["metadatas"][0] print(f"[{self._get_current_time()}] Sending email...") print(f"Recipient: {metadata['email']}") print(f"Subject: {formatted_response['subject']}") self.yag.send( to=metadata["email"], subject=formatted_response["subject"], contents=formatted_response["body"], ) print(f"[{self._get_current_time()}] Email sent successfully") # 更新任务状态 print(f"[{self._get_current_time()}] Updating task status...") task_info = self.scheduled_tasks[schedule_id] task_info["status"] = "completed" self.schedule_collection.update( documents=[json.dumps(task_info)], ids=[schedule_id] ) # 清除调度 schedule.clear(f"email_{schedule_id}") print(f"[{self._get_current_time()}] Task completion processing complete\n") # 设置完成标志 self.is_completed = True self.completion_event.set() print( f"[{self._get_current_time()}] All tasks completed. Shutting down system.\n" ) return True except Exception as e: print(f"[{self._get_current_time()}] Email delivery failed: {str(e)}") return False def _run_scheduler(self): """运行调度器""" print(f"[{self._get_current_time()}] Scheduler started...") while not self.is_completed: schedule.run_pending() time.sleep(1) # 每秒检查一次 def wait_for_completion(self, timeout=None): """等待任务完成""" try: completed = self.completion_event.wait(timeout=timeout) if not completed: print(f"[{self._get_current_time()}] Task completion timeout") if hasattr(self, "yag"): self.yag.close() except KeyboardInterrupt: print(f"\n[{self._get_current_time()}] Terminated by user.") if hasattr(self, "yag"): self.yag.close()
6. 系统使用示例

系统使用示例

# 系统配置 email_config = { "username": "your_email@gmail.com", "password": "your_app_password", } print("\n=== Starting Scheduling System Test ===\n") # 初始化系统 system = ScheduledSearchSystem(email_config) # 调度任务 result = system.schedule_task( query="find Modular RAG paper at 14:15 PM", # 示例:下午2:15 user_email="recipient@gmail.com", ) print("\n=== Scheduling Result ===") print(json.dumps(result, indent=2, ensure_ascii=False)) print("\n=== Task will execute at scheduled time... ===\n") # 等待任务完成(最多4小时) system.wait_for_completion(timeout=14400)
7. 测试函数

测试函数

def datetime_handler(obj): """处理程序,将datetime对象转换为JSON可序列化的字符串。 Args: obj: 要转换的对象。 Returns: str: 格式化的datetime字符串(格式:YYYY-MM-DD HH:MM:SS+ZZZZ)。 Raises: TypeError: 如果对象不是datetime实例,则引发。 """ if isinstance(obj, datetime): return obj.strftime("%Y-%m-%d %H:%M:%S%z") raise TypeError(f"Object of type {type(obj)} is not JSON serializable") def test_search_system(): """测试搜索系统""" # 初始化组件 query_analyzer = QueryAnalysisAgent() search_router = SearchRouter() response_agent = ResponseAgent() # 测试查询 test_queries = [ "recommend place to eat at seoul in 7 pm", "find rag persona paper at 3 pm", "find news us president speech in 7 am", ] for query in test_queries: print(f"\n{'='*60}") print(f"Test Query: {query}") print(f"{'='*60}") try: query_analysis = query_analyzer.analyze_query(query) print("\n1. Query Analysis Results:") print( json.dumps( query_analysis, indent=2, ensure_ascii=False, default=datetime_handler, ) ) if query_analysis["status"] != "success": print("Query analysis failed!") continue search_results = search_router.route_and_search(query_analysis) print("\n2. Search Results:") print( json.dumps( search_results, indent=2, ensure_ascii=False, default=datetime_handler, ) ) # 添加:结果格式化 print("\n3. Formatted Results:") formatted_results = response_agent.format_results(search_results) print(json.dumps(formatted_results, indent=2, ensure_ascii=False)) except Exception as e: print(f"Error occurred during test: {str(e)}") if __name__ == "__main__": test_search_system()

案例效果

多智能体调度系统实现了以下效果:

  • 自然语言理解:系统能够解析包含时间信息的自然语言查询,如"find rag persona paper at 3 pm"
  • 智能搜索路由:根据查询内容自动选择最合适的搜索类型(学术论文、新闻或一般搜索)
  • 定时任务执行:在指定时间自动执行搜索任务,比目标时间提前5分钟开始执行
  • 结果格式化:将原始搜索结果转换为结构化、易读的邮件格式
  • 自动邮件发送:在目标时间自动发送格式化后的搜索结果到指定邮箱
  • 任务状态跟踪:使用ChromaDB持久化存储任务信息和搜索结果
示例输出

查询分析结果

{ "target_time": "2025-02-08 15:00:00+0000", "execution_time": "2025-02-08 14:55:00+0000", "task_type": "search", "search_type": "research_paper", "keywords": [ "rag", "persona", "paper", "3", "pm" ], "requirements": "minimum 5 results", "time_sensitivity": "normal", "original_query": "find rag persona paper at 3 pm", "status": "success" }

格式化邮件内容

{ "subject": "Search Results for [find rag persona paper at 3 pm]", "body": "Subject: Search Results for \"find rag persona paper at 3 pm\"\n\n---\n\nHello,\n\nHere are the organized research paper results for your search:\n\n1. Paper title: PeaCoK: Persona Commonsense Knowledge for Consistent and Engaging Narratives\n - Summary: This paper introduces PeaCoK, a method for incorporating persona-based commonsense knowledge into narrative generation systems...\n - URL: http://arxiv.org/abs/2305.02364v2\n\n2. Paper title: RAG vs. Fine-tuning: Pipelines, Tradeoffs, and a Case Study on Agriculture\n - Summary: This paper presents a comprehensive comparison between RAG and fine-tuning approaches...\n - URL: http://arxiv.org/abs/2401.15868\n\n...\n\nThank you for using our service. Should you have any further questions or need additional information, feel free to reach out.\n\nBest regards,\n\n[Your Name]" }

案例实现思路

多智能体调度系统的实现基于以下思路:

1. 模块化设计

系统采用模块化设计,将功能分解为独立的智能体组件:

  • QueryAnalysisAgent:负责自然语言理解和查询分析
  • SearchRouter:负责根据查询类型路由到合适的搜索代理
  • 专业搜索代理:负责执行特定类型的搜索任务
  • ResponseAgent:负责结果格式化和内容生成
  • ScheduledSearchSystem:负责任务调度和系统协调
2. 智能体协作

各智能体通过标准化的数据格式和接口进行协作:

  • 使用JSON格式传递查询分析结果
  • 统一的搜索结果格式便于后续处理
  • 标准化的邮件格式确保一致的用户体验
3. 时间管理

系统实现精确的时间管理机制:

  • 从自然语言查询中提取时间信息
  • 设置执行时间比目标时间提前5分钟
  • 使用Schedule库实现定时任务调度
  • 支持相对时间(如"7 AM")和绝对时间
4. 数据持久化

使用ChromaDB实现数据持久化:

  • 存储任务信息和状态
  • 保存搜索结果和格式化响应
  • 支持任务状态跟踪和历史查询
5. 异步处理

系统采用异步处理机制:

  • 使用线程实现非阻塞操作
  • 调度器在后台持续运行
  • 支持多个并发任务调度
6. 错误处理

实现全面的错误处理机制:

  • 每个组件都有异常捕获和处理
  • 提供有意义的错误信息
  • 确保系统稳定性和可靠性

扩展建议

基于当前实现,可以考虑以下扩展方向:

1. 增强搜索能力
  • 添加更多专业搜索代理(如专利搜索、代码搜索等)
  • 实现搜索结果去重和相关性排序优化
  • 支持多语言搜索和结果翻译
  • 添加搜索结果缓存机制,提高响应速度
2. 提升智能体能力
  • 增强QueryAnalysisAgent的时间解析能力,支持更复杂的时间表达式
  • 改进ResponseAgent,支持更多输出格式(如PDF、Word等)
  • 添加搜索结果摘要和关键信息提取功能
  • 实现基于用户偏好的个性化结果排序
3. 扩展调度功能
  • 支持周期性任务调度(如每天、每周等)
  • 添加任务优先级管理和资源分配
  • 实现任务依赖关系和复杂工作流
  • 添加任务执行监控和性能统计
4. 增强通知机制
  • 支持多种通知方式(如短信、即时消息等)
  • 添加通知模板自定义功能
  • 实现通知优先级和分组管理
  • 添加通知历史记录和统计
5. 用户界面与交互
  • 开发Web界面,提供图形化任务管理
  • 添加任务进度实时显示功能
  • 实现用户认证和权限管理
  • 提供API接口,支持第三方集成
6. 系统优化
  • 实现分布式架构,提高系统可扩展性
  • 添加负载均衡和故障转移机制
  • 优化资源使用,提高系统效率
  • 添加全面的日志记录和监控功能

总结

多智能体调度系统是一个综合性的AI应用案例,展示了如何将多个智能体组件协同工作,实现从自然语言查询到定时信息检索与邮件发送的完整自动化流程。

该案例的核心价值在于:

  • 智能体协作:通过多个专业智能体的协作,实现复杂任务的自动化处理
  • 自然语言理解:利用LLM的能力解析自然语言查询,提取关键信息
  • 任务调度:实现精确的时间管理和任务调度,满足用户定时需求
  • 结果处理:将原始搜索结果转换为用户友好的格式,并通过邮件发送

该案例可以应用于多种场景,如:

  • 研究人员定时获取最新论文
  • 商务人士定期获取行业新闻
  • 个人用户定时获取特定主题信息
  • 企业自动化信息收集与分发

通过进一步扩展和优化,该系统可以成为一个强大的信息管理和自动化工具,为用户提供个性化的信息服务。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/7 20:24:37

0.8秒出高清:SeedVR2-7B单步推理技术重构视频修复行业标准

0.8秒出高清&#xff1a;SeedVR2-7B单步推理技术重构视频修复行业标准 【免费下载链接】SeedVR2-7B 项目地址: https://ai.gitcode.com/hf_mirrors/ByteDance-Seed/SeedVR2-7B 导语 字节跳动开源的SeedVR2-7B视频修复模型通过创新的"一步式"扩散对抗训练技术…

作者头像 李华
网站建设 2026/2/7 7:27:31

3分钟快速上手LiteLoaderQQNT插件的终极指南

3分钟快速上手LiteLoaderQQNT插件的终极指南 【免费下载链接】LiteLoaderQQNT_Install 针对 LiteLoaderQQNT 的安装脚本 项目地址: https://gitcode.com/gh_mirrors/li/LiteLoaderQQNT_Install 想要为你的QQNT桌面客户端添加更多实用功能吗&#xff1f;LiteLoaderQQNT插…

作者头像 李华
网站建设 2026/2/7 17:50:52

基于SSM+Vue的废品买卖回收管理系统的设计与实现

前言如今社会上各行各业&#xff0c;都喜欢用自己行业的专属软件工作&#xff0c;互联网发展到这个时候&#xff0c;人们已经发现离不开了互联网。新技术的产生&#xff0c;往往能解决一些老技术的弊端问题。因为传统废品买卖回收管理系统信息管理难度大&#xff0c;容错率低&a…

作者头像 李华
网站建设 2026/2/5 15:22:46

Java String 中的字符串为什么是 final 的?

Java 中将 String 设计为 final&#xff08;不可变&#xff09;是语言设计的核心决策&#xff0c;本质是为了平衡安全性、性能、并发、设计简洁性四大核心目标。以下从技术原理、核心原因、实践影响三个维度拆解&#xff1a; 一、先明确&#xff1a;final 修饰 String 的两层含…

作者头像 李华
网站建设 2026/2/6 1:27:50

用AI快速开发texlive安装教程应用

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 开发一个texlive安装教程应用&#xff0c;利用快马平台的AI辅助功能&#xff0c;展示智能代码生成和优化。点击项目生成按钮&#xff0c;等待项目生成完整后预览效果 最近在做一个L…

作者头像 李华