目录
- 案例目标
- 技术栈与核心依赖
- 环境配置
- 案例实现
- 案例效果
- 案例实现思路
- 扩展建议
- 总结
案例目标
本案例实现了一个基于多智能体的调度系统,能够从自然语言查询中提取时间信息,执行定时信息检索,并通过邮件发送结果。系统架构由Query Analysis Agent、Search Router、Response Agent和Scheduling System组成,实现了从自然语言查询到定时信息检索与邮件发送的全流程自动化。
主要功能包括:
- 自然语言查询解析与时间信息提取
- 多种搜索类型支持(学术论文、新闻、一般搜索)
- 任务调度与执行管理
- 搜索结果格式化与邮件发送
- 任务状态跟踪与持久化存储
技术栈与核心依赖
主要技术栈
- 语言:Python 3.x
- AI框架:LangChain, OpenAI GPT-4
- 任务调度:Schedule
- 邮件服务:Yagmail
- 向量数据库:ChromaDB
- 搜索API:arXiv API, NewsAPI, SerpAPI
核心依赖
| 库名 | 用途 |
|---|---|
| langchain | 构建AI应用的核心框架 |
| langchain-openai | OpenAI模型集成 |
| schedule | 任务调度 |
| yagmail | 邮件发送 |
| chromadb | 向量数据库存储 |
| requests | HTTP请求处理 |
| urllib | URL处理与请求 |
环境配置
API密钥配置
系统需要配置以下API密钥:
环境变量配置
import os # OpenAI API密钥 os.environ["OPENAI_API_KEY"] = "your_openai_api_key" # NewsAPI密钥 os.environ["NEWS_API_KEY"] = "your_news_api_key" # SerpAPI密钥 os.environ["SERPAPI_API_KEY"] = "your_serpapi_key"邮件配置
系统使用Gmail发送邮件,需要配置Gmail应用密码:
注意:需要在Google账户中启用两步验证,并生成应用密码。 Google账户安全设置
邮件配置
email_config = { "username": "your_email@gmail.com", "password": "your_app_password", # Gmail应用密码 }依赖安装
requirements.txt
langchain langchain-openai langchain-community schedule yagmail chromadb requests pytz案例实现
1. Query Analysis Agent - 查询分析智能体
Query Analysis Agent负责解析自然语言查询,提取时间信息和搜索需求。
QueryAnalysisAgent类
from langchain_openai import ChatOpenAI from langchain.prompts import PromptTemplate from langchain.schema import HumanMessage import json import re from datetime import datetime, timedelta import pytz class QueryAnalysisAgent: def __init__(self): self.llm = ChatOpenAI(model="gpt-4o") self.setup_prompts() def setup_prompts(self): # 时间提取提示 self.time_extraction_prompt = PromptTemplate.from_template( """Extract time information from the following query. Query: {query} Return a JSON object with the following fields: - target_time: The specific time mentioned in the query (in YYYY-MM-DD HH:MM:SS format) - execution_time: The time to execute the task (5 minutes before target_time) - time_sensitivity: "urgent" if the query needs immediate results, "normal" otherwise If no specific time is mentioned, use current time as target_time.""" ) # 任务分析提示 self.task_analysis_prompt = PromptTemplate.from_template( """Analyze the following query to determine search requirements. Query: {query} Time info: {time_info} Return a JSON object with the following fields: - task_type: "search" or other task types - search_type: "research_paper", "news", or "general" - keywords: List of relevant keywords for the search - requirements: Any specific requirements mentioned - time_sensitivity: "urgent" or "normal" """ ) def extract_time(self, query: str) -> dict: """从查询中提取时间信息""" try: # 使用LLM提取时间信息 response = self.time_extraction_prompt.format(query=query) response = self.llm.invoke(response) time_info = json.loads(response.content) # 处理时间信息 if "target_time" in time_info: # 如果是相对时间(如"7 AM"),转换为具体时间 target_time = self._parse_relative_time(time_info["target_time"]) time_info["target_time"] = target_time # 执行时间设为目标时间前5分钟 execution_time = target_time - timedelta(minutes=5) time_info["execution_time"] = execution_time return time_info except Exception as e: # 默认处理:当前时间+1小时作为目标时间 now = datetime.now(pytz.utc) target_time = now + timedelta(hours=1) execution_time = target_time - timedelta(minutes=5) return { "target_time": target_time, "execution_time": execution_time, "time_sensitivity": "normal" } def analyze_task(self, query: str, time_info: dict) -> dict: """分析查询任务类型和搜索需求""" try: # 使用LLM分析任务 response = self.task_analysis_prompt.format( query=query, time_info=json.dumps(time_info) ) response = self.llm.invoke(response) task_info = json.loads(response.content) # 添加原始查询和状态 task_info["original_query"] = query task_info["status"] = "success" return task_info except Exception as e: # 默认处理:一般搜索 return { "task_type": "search", "search_type": "general", "keywords": query.split(), "requirements": "minimum 5 results", "time_sensitivity": "normal", "original_query": query, "status": "success" } def analyze_query(self, query: str) -> dict: """完整的查询分析流程""" # 提取时间信息 time_info = self.extract_time(query) # 分析任务需求 task_info = self.analyze_task(query, time_info) # 合并结果 result = {**time_info, **task_info} return result def _parse_relative_time(self, time_str: str) -> datetime: """解析相对时间字符串为具体时间""" now = datetime.now(pytz.utc) # 处理常见时间格式 # 例如: "7 AM", "3 PM", "14:15" time_match = re.search(r'(\d{1,2})(?::(\d{2}))?\s*(AM|PM|am|pm)?', time_str) if time_match: hour = int(time_match.group(1)) minute = int(time_match.group(2)) if time_match.group(2) else 0 period = time_match.group(3) # 处理AM/PM if period and period.upper() == "PM" and hour != 12: hour += 12 elif period and period.upper() == "AM" and hour == 12: hour = 0 # 创建目标时间(今天) target_time = now.replace(hour=hour, minute=minute, second=0, microsecond=0) # 如果目标时间已过,则设为明天 if target_time < now: target_time += timedelta(days=1) return target_time # 默认返回当前时间+1小时 return now + timedelta(hours=1)2. Search Router - 搜索路由器
Search Router作为智能交通控制器,根据查询分析将请求路由到最合适的专业搜索代理。
SearchRouter类
import urllib.request import xml.etree.ElementTree as ET import requests from urllib.parse import urlencode from langchain_community.utilities import SerpAPIWrapper from typing import Dict, Any, List from datetime import datetime, timedelta import pytz class SearchRouter: def __init__(self): # 初始化专业搜索代理 self.paper_search_agent = PaperSearchAgent() self.news_search_agent = NewsSearchAgent() self.general_search_agent = GeneralSearchAgent() def route_and_search(self, query_analysis: dict) -> dict: """根据查询分析将搜索请求路由到合适的搜索代理 Args: query_analysis (dict): QueryAnalysisAgent的查询分析结果 Returns: dict: 包含搜索结果的字典,包括成功/失败状态和相关信息 """ try: # 检查搜索类型 search_type = query_analysis.get("search_type") # 记录开始时间用于日志记录 start_time = datetime.now(pytz.utc) # 执行搜索 if search_type == "research_paper": print("Performing research paper search...") result = self.paper_search_agent.perform_search(query_analysis) elif search_type == "news": print("Performing news search...") result = self.news_search_agent.perform_search(query_analysis) elif search_type == "general": print("Performing general search...") result = self.general_search_agent.perform_search(query_analysis) else: return { "status": "error", "error_message": f"Unsupported search type: {search_type}", "original_query": query_analysis.get("original_query"), } # 计算搜索持续时间 end_time = datetime.now(pytz.utc) search_duration = (end_time - start_time).total_seconds() # 添加元数据到结果 result.update({ "search_type": search_type, "search_duration": search_duration, "search_timestamp": end_time.isoformat(), "original_query": query_analysis.get("original_query"), }) return result except Exception as e: return { "status": "error", "error_message": str(e), "original_query": query_analysis.get("original_query"), "search_type": query_analysis.get("search_type"), }3. 专业搜索代理
系统包含三种专业搜索代理:PaperSearchAgent、NewsSearchAgent和GeneralSearchAgent。
3.1 PaperSearchAgent - 学术论文搜索代理
PaperSearchAgent类
class PaperSearchAgent: def __init__(self): self.base_url = "http://export.arxiv.org/api/query" def perform_search(self, query_info: Dict[str, Any]) -> Dict[str, Any]: try: keywords = self._process_keywords(query_info["keywords"]) max_results = self._extract_max_results(query_info.get("requirements", "")) url = f"{self.base_url}?search_query=all:{keywords}&start=0&max_results={max_results}" response = urllib.request.urlopen(url) data = response.read().decode("utf-8") results = self._parse_arxiv_results(data) return { "status": "success", "results": results, "total_found": len(results), "returned_count": len(results), "query_info": query_info, } except Exception as e: return { "status": "error", "error_message": str(e), "query_info": query_info, } def _process_keywords(self, keywords: List[str]) -> str: # 移除时间相关关键词 filtered_keywords = [ k for k in keywords if not any( time in k.lower() for time in ["hour", "morning", "afternoon", "evening"] ) ] return "+".join(filtered_keywords) def _extract_max_results(self, requirements: str) -> int: import re # 提取数字 numbers = re.findall(r"\d+", requirements) return int(numbers[0]) if numbers else 5 def _parse_arxiv_results(self, data: str) -> List[Dict[str, Any]]: root = ET.fromstring(data) results = [] for entry in root.findall("{http://www.w3.org/2005/Atom}entry"): title = entry.find("{http://www.w3.org/2005/Atom}title").text url = entry.find("{http://www.w3.org/2005/Atom}id").text published = entry.find("{http://www.w3.org/2005/Atom}published").text summary = entry.find("{http://www.w3.org/2005/Atom}summary").text results.append({ "type": "research_paper", "title": title, "url": url, "published_date": published[:10], "summary": summary, "source": "arxiv", }) return results3.2 NewsSearchAgent - 新闻搜索代理
NewsSearchAgent类
class NewsSearchAgent: def __init__(self, api_key: str = None): """使用NewsAPI初始化新闻搜索代理。 NewsAPI遵循REST API结构,基础URL为'https://newsapi.org/v2'。 它提供两个主要端点: - /everything: 搜索整个新闻档案。 - /top-headlines: 获取最新的头条新闻。 """ self.api_key = os.environ["NEWS_API_KEY"] if not self.api_key: raise ValueError("NewsAPI key is required") self.base_url = "https://newsapi.org/v2" def perform_search( self, query_info: Dict[str, Any], max_results: int = 5 ) -> Dict[str, Any]: """根据给定的查询信息执行新闻搜索。 Args: query_info (Dict[str, Any]): 包含搜索参数的字典。 max_results (int): 返回的最大结果数。 Returns: Dict[str, Any]: 包含搜索结果或错误消息的字典。 """ try: # 提取实际搜索词(排除时间相关关键词) search_terms = query_info.get( "search_terms", query_info.get("keywords", []) ) # 检查是否为实时新闻搜索 is_realtime = query_info.get("time_sensitivity") == "urgent" from_date = datetime.now() - timedelta(hours=1 if is_realtime else 24) # 配置'everything'端点的参数 params = { "q": " ".join(search_terms), # 排除时间相关关键词 "from": from_date.strftime("%Y-%m-%d"), "sortBy": "publishedAt", "language": "en", "apiKey": self.api_key, } # 构建API请求URL url = f"{self.base_url}/everything?{urlencode(params)}" # 发送API请求 response = requests.get(url) data = response.json() # 检查响应状态 if response.status_code != 200: return { "status": "error", "error_message": data.get("message", "Unknown error"), "query_info": query_info, } # 处理和格式化结果 articles = data.get("articles", []) formatted_results = [] for article in articles[:max_results]: formatted_results.append({ "title": article.get("title"), "description": article.get("description"), "url": article.get("url"), "published_at": article.get("publishedAt"), "source": article.get("source", {}).get("name"), "content": article.get("content"), }) return { "status": "success", "results": formatted_results, "total_results": data.get("totalResults", 0), "returned_count": len(formatted_results), "search_parameters": { "keywords": query_info["keywords"], "from_date": from_date.strftime("%Y-%m-%d"), "language": "en", }, } except Exception as e: return { "status": "error", "error_message": str(e), "query_info": query_info, } def get_top_headlines( self, country: str = "us", category: str = None ) -> Dict[str, Any]: """获取头条新闻。 Args: country (str): 国家代码(默认:'us'代表美国)。 category (str, optional): 新闻类别(例如:business, technology)。 Returns: Dict[str, Any]: 包含头条新闻的字典。 """ params = {"country": country, "apiKey": self.api_key} if category: params["category"] = category url = f"{self.base_url}/top-headlines?{urlencode(params)}" response = requests.get(url) return response.json()3.3 GeneralSearchAgent - 一般搜索代理
GeneralSearchAgent类
class GeneralSearchAgent: def __init__(self, serpapi_key: str = None): if serpapi_key: os.environ["SERPAPI_API_KEY"] = serpapi_key self.search = SerpAPIWrapper() def setup_search_parameters(self, query_info: Dict[str, Any]) -> List[str]: """为一般搜索构建搜索查询。""" keywords = " ".join(query_info["keywords"]) # 设置基础搜索查询 search_queries = [ f"{keywords} lang:ko", # 韩语结果 keywords, # 一般搜索 ] return search_queries def perform_search( self, query_info: Dict[str, Any], max_results: int = 5 ) -> Dict[str, Any]: """执行一般搜索并返回结果。""" try: search_queries = self.setup_search_parameters(query_info) all_results = [] for query in search_queries: raw_results = self.search.run(query) parsed_results = self._parse_general_results(raw_results) all_results.extend(parsed_results) # 按相关性分数排序 sorted_results = sorted( all_results, key=lambda x: x.get("relevance_score", 0), reverse=True )[:max_results] return { "status": "success", "results": sorted_results, "total_found": len(all_results), "returned_count": len(sorted_results), "query_info": query_info, } except Exception as e: return { "status": "error", "error_message": str(e), "query_info": query_info, } def _parse_general_results(self, raw_results: str) -> List[Dict[str, Any]]: """解析一般搜索结果。""" parsed_results = [] for result in raw_results.split("\n"): if not result.strip(): continue parsed_results.append({ "type": "general", "title": self._extract_title(result), "content": result, "url": self._extract_url(result), "relevance_score": self._calculate_relevance(result), }) return parsed_results def _extract_title(self, result: str) -> str: """从结果中提取标题。""" return result.split(".")[0].strip()[:100] def _extract_url(self, result: str) -> str: """从结果中提取URL。""" import re urls = re.findall(r"https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+[^\s]*", result) return urls[0] if urls else "" def _calculate_relevance(self, result: str) -> float: """计算搜索结果的相关性分数。""" relevance_score = 0.5 # 基础分数 # 基于关键词匹配计算分数 keywords = ["official", "guide", "tutorial", "review", "recommendation"] lower_result = result.lower() for keyword in keywords: if keyword in lower_result: relevance_score += 0.1 return min(relevance_score, 1.0) # 确保分数不超过1.04. Response Agent - 响应代理
Response Agent作为系统的专家通信者,将原始搜索结果转换为结构良好、可读的内容,满足用户需求。
ResponseAgent类
from langchain_openai import ChatOpenAI from langchain.prompts import PromptTemplate import json class ResponseAgent: def __init__(self): self.llm = ChatOpenAI(model="gpt-4o") self.setup_prompts() def setup_prompts(self): # 研究论文搜索提示 self.paper_prompt = PromptTemplate.from_template( """Please organize the following research paper search results in email format. Search term: {query} Search results: {results} Format as follows: 1. Email subject: "Research Paper Search Results for [search term]" 2. Body: - Greeting - "Here are the organized research paper results for your search." - Number each paper and format as follows: 1. Paper title: [title] - Summary: [core research content and key findings] - URL: [link] - Closing remarks """ ) # 新闻搜索提示 self.news_prompt = PromptTemplate.from_template( """Please organize the following news search results in email format. Search term: {query} Search results: {results} Format as follows: 1. Email subject: "Latest News Updates for [search term]" 2. Body: - Greeting - "Here are the latest news articles related to your search topic." - Number each news item and format as follows: 1. [title] - [news source] - Main content: [key content summary] - Published date: [date] - URL: [link] - Closing remarks """ ) # 一般搜索提示 self.general_prompt = PromptTemplate.from_template( """Please organize the following search results in email format. Search term: {query} Search results: {results} Format as follows: 1. Email subject: "Search Results for [search term]" 2. Body: - Greeting - "Here are the organized results for your search." - Number each result and format as follows: 1. [title] - Content: [main content summary] - Source: [website or platform name] - URL: [link] - Closing remarks """ ) def format_results(self, search_results): try: # 处理无结果或错误的情况 if search_results.get("status") == "error": return { "subject": "Search Error Notification", "body": f"An error occurred during search: {search_results.get('error_message', 'Unknown error')}", } # 处理无结果的情况 if not search_results.get("results"): return { "subject": "No Search Results", "body": f"No results found for search term '{search_results.get('original_query', '')}'.", } # 根据搜索类型选择提示 search_type = search_results.get("search_type", "general") if search_type == "research_paper": prompt = self.paper_prompt elif search_type == "news": prompt = self.news_prompt else: prompt = self.general_prompt # 准备结果格式化的输入 formatted_input = { "query": search_results.get("original_query", ""), "results": json.dumps( search_results.get("results", []), ensure_ascii=False, indent=2 ), } # 通过LLM生成响应 response = prompt.format(**formatted_input) response = self.llm.invoke(response) try: # 尝试JSON解析 return json.loads(response.content) except json.JSONDecodeError: # 如果JSON解析失败,返回默认格式 return { "subject": f"Search Results for [{formatted_input['query']}]", "body": response.content, } except Exception as e: return { "subject": "Result Processing Error", "body": f"An error occurred while processing results: {str(e)}\n\nOriginal query: {search_results.get('original_query', '')}", }5. Scheduled Search System - 调度搜索系统
ScheduledSearchSystem管理搜索任务的完整生命周期,从调度到结果交付。
ScheduledSearchSystem类
import schedule import time import yagmail import chromadb from chromadb.config import Settings import threading import time from queue import Queue class ScheduledSearchSystem: def __init__(self, email_config: Dict[str, Any]): print(f"\n[{self._get_current_time()}] Initializing system...") self.query_analyzer = QueryAnalysisAgent() self.search_router = SearchRouter() self.response_agent = ResponseAgent() self.client = chromadb.PersistentClient(path="./search_data") # 邮件配置 self.email_config = email_config self.yag = yagmail.SMTP(email_config["username"], email_config["password"]) print(f"[{self._get_current_time()}] Email client configuration complete") self.scheduled_tasks = {} self.setup_collections() # 添加完成标志 self.is_completed = False self.completion_event = threading.Event() # 启动调度器 self.scheduler_thread = threading.Thread(target=self._run_scheduler) self.scheduler_thread.daemon = True self.scheduler_thread.start() print(f"[{self._get_current_time()}] System initialization complete\n") def _get_current_time(self): """返回当前时间字符串""" return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def setup_collections(self): """设置ChromaDB集合""" print(f"[{self._get_current_time()}] Starting ChromaDB collection setup...") self.results_collection = self.client.get_or_create_collection( name="search_results", metadata={"description": "Raw search results"} ) self.formatted_collection = self.client.get_or_create_collection( name="formatted_responses", metadata={"description": "Formatted responses for email delivery"}, ) self.schedule_collection = self.client.get_or_create_collection( name="scheduled_tasks", metadata={"description": "Scheduled search and email tasks"}, ) print(f"[{self._get_current_time()}] ChromaDB collection setup complete") def schedule_task(self, query: str, user_email: str) -> Dict[str, Any]: """调度任务""" try: print(f"\n[{self._get_current_time()}] Starting new task scheduling...") print(f"Query: {query}") print(f"Email: {user_email}") # 查询分析 query_analysis = self.query_analyzer.analyze_query(query) execution_time = query_analysis["execution_time"] target_time = query_analysis["target_time"] print(f"Scheduled search execution time: {execution_time}") print(f"Scheduled email delivery time: {target_time}") # 生成任务ID schedule_id = f"task_{datetime.now(pytz.UTC).timestamp()}" print(f"Generated task ID: {schedule_id}") # 保存任务信息 task_info = { "query": query, "email": user_email, "execution_time": execution_time.isoformat(), "target_time": target_time.isoformat(), "search_type": query_analysis["search_type"], "status": "scheduled", } self.scheduled_tasks[schedule_id] = task_info # 保存到ChromaDB print( f"[{self._get_current_time()}] Saving task information to ChromaDB..." ) self.schedule_collection.add( documents=[json.dumps(task_info)], metadatas=[{"type": "schedule", "status": "pending"}], ids=[schedule_id], ) print(f"[{self._get_current_time()}] Task information saved") # 调度搜索执行 execution_time_str = execution_time.strftime("%H:%M") schedule.every().day.at(execution_time_str).do( self.execute_search, schedule_id=schedule_id ).tag(schedule_id) print(f"[{self._get_current_time()}] Search task scheduling complete") return { "status": "success", "message": "Task successfully scheduled", "schedule_id": schedule_id, "execution_time": execution_time, "target_time": target_time, } except Exception as e: print(f"[{self._get_current_time()}] Task scheduling failed: {str(e)}") return {"status": "error", "error_message": str(e)} def execute_search(self, schedule_id: str) -> bool: """执行搜索""" try: print( f"\n[{self._get_current_time()}] Starting search execution (ID: {schedule_id})" ) task_info = self.scheduled_tasks.get(schedule_id) if not task_info: print(f"[{self._get_current_time()}] Task information not found") return False print(f"[{self._get_current_time()}] Analyzing search query...") query_analysis = self.query_analyzer.analyze_query(task_info["query"]) print(f"[{self._get_current_time()}] Performing search...") search_results = self.search_router.route_and_search(query_analysis) print(f"[{self._get_current_time()}] Formatting search results...") formatted_response = self.response_agent.format_results(search_results) # 保存结果 print(f"[{self._get_current_time()}] Saving search results to ChromaDB...") response_id = f"response_{schedule_id}" self.formatted_collection.add( documents=[json.dumps(formatted_response)], metadatas=[ { "schedule_id": schedule_id, "email": task_info["email"], "target_time": task_info["target_time"], } ], ids=[response_id], ) print(f"[{self._get_current_time()}] Search results saved") # 调度邮件发送 target_time = datetime.fromisoformat(task_info["target_time"]) target_time_str = target_time.strftime("%H:%M") schedule.every().day.at(target_time_str).do( self.send_email, schedule_id=schedule_id ).tag(f"email_{schedule_id}") print( f"[{self._get_current_time()}] Email delivery scheduled (Time: {target_time_str})" ) return True except Exception as e: print(f"[{self._get_current_time()}] Search execution failed: {str(e)}") return False def send_email(self, schedule_id: str) -> bool: """发送邮件""" try: print( f"\n[{self._get_current_time()}] Starting email delivery (ID: {schedule_id})" ) response_id = f"response_{schedule_id}" print(f"[{self._get_current_time()}] Retrieving saved search results...") response_results = self.formatted_collection.get(ids=[response_id]) if not response_results["documents"]: print(f"[{self._get_current_time()}] Search results not found") return False formatted_response = json.loads(response_results["documents"][0]) metadata = response_results["metadatas"][0] print(f"[{self._get_current_time()}] Sending email...") print(f"Recipient: {metadata['email']}") print(f"Subject: {formatted_response['subject']}") self.yag.send( to=metadata["email"], subject=formatted_response["subject"], contents=formatted_response["body"], ) print(f"[{self._get_current_time()}] Email sent successfully") # 更新任务状态 print(f"[{self._get_current_time()}] Updating task status...") task_info = self.scheduled_tasks[schedule_id] task_info["status"] = "completed" self.schedule_collection.update( documents=[json.dumps(task_info)], ids=[schedule_id] ) # 清除调度 schedule.clear(f"email_{schedule_id}") print(f"[{self._get_current_time()}] Task completion processing complete\n") # 设置完成标志 self.is_completed = True self.completion_event.set() print( f"[{self._get_current_time()}] All tasks completed. Shutting down system.\n" ) return True except Exception as e: print(f"[{self._get_current_time()}] Email delivery failed: {str(e)}") return False def _run_scheduler(self): """运行调度器""" print(f"[{self._get_current_time()}] Scheduler started...") while not self.is_completed: schedule.run_pending() time.sleep(1) # 每秒检查一次 def wait_for_completion(self, timeout=None): """等待任务完成""" try: completed = self.completion_event.wait(timeout=timeout) if not completed: print(f"[{self._get_current_time()}] Task completion timeout") if hasattr(self, "yag"): self.yag.close() except KeyboardInterrupt: print(f"\n[{self._get_current_time()}] Terminated by user.") if hasattr(self, "yag"): self.yag.close()6. 系统使用示例
系统使用示例
# 系统配置 email_config = { "username": "your_email@gmail.com", "password": "your_app_password", } print("\n=== Starting Scheduling System Test ===\n") # 初始化系统 system = ScheduledSearchSystem(email_config) # 调度任务 result = system.schedule_task( query="find Modular RAG paper at 14:15 PM", # 示例:下午2:15 user_email="recipient@gmail.com", ) print("\n=== Scheduling Result ===") print(json.dumps(result, indent=2, ensure_ascii=False)) print("\n=== Task will execute at scheduled time... ===\n") # 等待任务完成(最多4小时) system.wait_for_completion(timeout=14400)7. 测试函数
测试函数
def datetime_handler(obj): """处理程序,将datetime对象转换为JSON可序列化的字符串。 Args: obj: 要转换的对象。 Returns: str: 格式化的datetime字符串(格式:YYYY-MM-DD HH:MM:SS+ZZZZ)。 Raises: TypeError: 如果对象不是datetime实例,则引发。 """ if isinstance(obj, datetime): return obj.strftime("%Y-%m-%d %H:%M:%S%z") raise TypeError(f"Object of type {type(obj)} is not JSON serializable") def test_search_system(): """测试搜索系统""" # 初始化组件 query_analyzer = QueryAnalysisAgent() search_router = SearchRouter() response_agent = ResponseAgent() # 测试查询 test_queries = [ "recommend place to eat at seoul in 7 pm", "find rag persona paper at 3 pm", "find news us president speech in 7 am", ] for query in test_queries: print(f"\n{'='*60}") print(f"Test Query: {query}") print(f"{'='*60}") try: query_analysis = query_analyzer.analyze_query(query) print("\n1. Query Analysis Results:") print( json.dumps( query_analysis, indent=2, ensure_ascii=False, default=datetime_handler, ) ) if query_analysis["status"] != "success": print("Query analysis failed!") continue search_results = search_router.route_and_search(query_analysis) print("\n2. Search Results:") print( json.dumps( search_results, indent=2, ensure_ascii=False, default=datetime_handler, ) ) # 添加:结果格式化 print("\n3. Formatted Results:") formatted_results = response_agent.format_results(search_results) print(json.dumps(formatted_results, indent=2, ensure_ascii=False)) except Exception as e: print(f"Error occurred during test: {str(e)}") if __name__ == "__main__": test_search_system()案例效果
多智能体调度系统实现了以下效果:
- 自然语言理解:系统能够解析包含时间信息的自然语言查询,如"find rag persona paper at 3 pm"
- 智能搜索路由:根据查询内容自动选择最合适的搜索类型(学术论文、新闻或一般搜索)
- 定时任务执行:在指定时间自动执行搜索任务,比目标时间提前5分钟开始执行
- 结果格式化:将原始搜索结果转换为结构化、易读的邮件格式
- 自动邮件发送:在目标时间自动发送格式化后的搜索结果到指定邮箱
- 任务状态跟踪:使用ChromaDB持久化存储任务信息和搜索结果
示例输出
查询分析结果
{ "target_time": "2025-02-08 15:00:00+0000", "execution_time": "2025-02-08 14:55:00+0000", "task_type": "search", "search_type": "research_paper", "keywords": [ "rag", "persona", "paper", "3", "pm" ], "requirements": "minimum 5 results", "time_sensitivity": "normal", "original_query": "find rag persona paper at 3 pm", "status": "success" }格式化邮件内容
{ "subject": "Search Results for [find rag persona paper at 3 pm]", "body": "Subject: Search Results for \"find rag persona paper at 3 pm\"\n\n---\n\nHello,\n\nHere are the organized research paper results for your search:\n\n1. Paper title: PeaCoK: Persona Commonsense Knowledge for Consistent and Engaging Narratives\n - Summary: This paper introduces PeaCoK, a method for incorporating persona-based commonsense knowledge into narrative generation systems...\n - URL: http://arxiv.org/abs/2305.02364v2\n\n2. Paper title: RAG vs. Fine-tuning: Pipelines, Tradeoffs, and a Case Study on Agriculture\n - Summary: This paper presents a comprehensive comparison between RAG and fine-tuning approaches...\n - URL: http://arxiv.org/abs/2401.15868\n\n...\n\nThank you for using our service. Should you have any further questions or need additional information, feel free to reach out.\n\nBest regards,\n\n[Your Name]" }案例实现思路
多智能体调度系统的实现基于以下思路:
1. 模块化设计
系统采用模块化设计,将功能分解为独立的智能体组件:
- QueryAnalysisAgent:负责自然语言理解和查询分析
- SearchRouter:负责根据查询类型路由到合适的搜索代理
- 专业搜索代理:负责执行特定类型的搜索任务
- ResponseAgent:负责结果格式化和内容生成
- ScheduledSearchSystem:负责任务调度和系统协调
2. 智能体协作
各智能体通过标准化的数据格式和接口进行协作:
- 使用JSON格式传递查询分析结果
- 统一的搜索结果格式便于后续处理
- 标准化的邮件格式确保一致的用户体验
3. 时间管理
系统实现精确的时间管理机制:
- 从自然语言查询中提取时间信息
- 设置执行时间比目标时间提前5分钟
- 使用Schedule库实现定时任务调度
- 支持相对时间(如"7 AM")和绝对时间
4. 数据持久化
使用ChromaDB实现数据持久化:
- 存储任务信息和状态
- 保存搜索结果和格式化响应
- 支持任务状态跟踪和历史查询
5. 异步处理
系统采用异步处理机制:
- 使用线程实现非阻塞操作
- 调度器在后台持续运行
- 支持多个并发任务调度
6. 错误处理
实现全面的错误处理机制:
- 每个组件都有异常捕获和处理
- 提供有意义的错误信息
- 确保系统稳定性和可靠性
扩展建议
基于当前实现,可以考虑以下扩展方向:
1. 增强搜索能力
- 添加更多专业搜索代理(如专利搜索、代码搜索等)
- 实现搜索结果去重和相关性排序优化
- 支持多语言搜索和结果翻译
- 添加搜索结果缓存机制,提高响应速度
2. 提升智能体能力
- 增强QueryAnalysisAgent的时间解析能力,支持更复杂的时间表达式
- 改进ResponseAgent,支持更多输出格式(如PDF、Word等)
- 添加搜索结果摘要和关键信息提取功能
- 实现基于用户偏好的个性化结果排序
3. 扩展调度功能
- 支持周期性任务调度(如每天、每周等)
- 添加任务优先级管理和资源分配
- 实现任务依赖关系和复杂工作流
- 添加任务执行监控和性能统计
4. 增强通知机制
- 支持多种通知方式(如短信、即时消息等)
- 添加通知模板自定义功能
- 实现通知优先级和分组管理
- 添加通知历史记录和统计
5. 用户界面与交互
- 开发Web界面,提供图形化任务管理
- 添加任务进度实时显示功能
- 实现用户认证和权限管理
- 提供API接口,支持第三方集成
6. 系统优化
- 实现分布式架构,提高系统可扩展性
- 添加负载均衡和故障转移机制
- 优化资源使用,提高系统效率
- 添加全面的日志记录和监控功能
总结
多智能体调度系统是一个综合性的AI应用案例,展示了如何将多个智能体组件协同工作,实现从自然语言查询到定时信息检索与邮件发送的完整自动化流程。
该案例的核心价值在于:
- 智能体协作:通过多个专业智能体的协作,实现复杂任务的自动化处理
- 自然语言理解:利用LLM的能力解析自然语言查询,提取关键信息
- 任务调度:实现精确的时间管理和任务调度,满足用户定时需求
- 结果处理:将原始搜索结果转换为用户友好的格式,并通过邮件发送
该案例可以应用于多种场景,如:
- 研究人员定时获取最新论文
- 商务人士定期获取行业新闻
- 个人用户定时获取特定主题信息
- 企业自动化信息收集与分发
通过进一步扩展和优化,该系统可以成为一个强大的信息管理和自动化工具,为用户提供个性化的信息服务。