一、引言:微博热搜背后的数据价值
微博作为中国最大的社交媒体平台之一,其热搜榜实时反映了社会热点、网民关注和舆论走向。对于舆情分析、市场研究、内容创作等领域,微博热搜数据具有极高的价值。本文将详细介绍如何使用Python最新爬虫技术,通过异步请求和API逆向工程,构建一个高效的微博热搜采集系统,实现定时抓取热搜榜及深度话题内容。
二、技术栈概览:现代化爬虫工具集
在开始编码前,让我们了解本次项目将使用的核心技术:
HTTP客户端:
httpx:支持HTTP/2的异步HTTP客户端,性能优于传统requestsaiohttp:异步HTTP客户端/服务器框架
异步编程:
asyncio:Python原生异步IO框架anyio:提供更统一的异步API接口
数据解析:
parsel:基于XPath和CSS选择器的数据提取库json:处理API返回的JSON数据
调度与存储:
apscheduler:高级Python调度器sqlalchemy:SQL工具包和ORMredis:缓存和消息队列
反反爬策略:
请求头轮换
IP代理池
浏览器指纹模拟
三、项目架构设计
text
微博热搜采集系统 ├── 数据采集层(Async Spider) │ ├── 热搜榜抓取模块 │ ├── 话题内容深度采集模块 │ └── 反反爬策略模块 ├── 数据存储层(Database) │ ├── 热搜数据表 │ ├── 话题内容表 │ └── 用户画像表(可选) ├── 任务调度层(Scheduler) │ ├── 定时触发 │ ├── 失败重试 │ └── 任务监控 └── 数据接口层(API) ├── 实时数据查询 ├── 历史数据分析 └── 数据导出功能
四、核心代码实现
4.1 环境配置与依赖安装
python
# requirements.txt httpx>=0.24.0 aiohttp>=3.8.0 anyio>=3.0.0 parsel>=1.8.0 apscheduler>=3.10.0 sqlalchemy>=2.0.0 redis>=4.5.0 pydantic>=2.0.0 pandas>=2.0.0 playwright>=1.35.0 # 用于复杂JS渲染页面
4.2 配置文件设计
python
# config.py from pydantic_settings import BaseSettings from typing import List, Optional import os class Settings(BaseSettings): # 微博API配置 WEIBO_HOT_API: str = "https://weibo.com/ajax/statuses/hot_band" WEIBO_TOPIC_API: str = "https://weibo.com/ajax/statuses/topic_band" WEIBO_DETAIL_API: str = "https://weibo.com/ajax/statuses/show" # 请求配置 REQUEST_TIMEOUT: int = 30 CONCURRENT_LIMIT: int = 10 RETRY_TIMES: int = 3 # 代理配置 PROXY_ENABLED: bool = False PROXY_POOL: List[str] = [] # 数据库配置 DATABASE_URL: str = "sqlite+aiosqlite:///weibo_hot.db" REDIS_URL: str = "redis://localhost:6379/0" # 调度配置 COLLECTION_INTERVAL: int = 300 # 5分钟 DEEP_CRAWL_INTERVAL: int = 3600 # 1小时 # 浏览器模拟配置 USE_BROWSER: bool = False BROWSER_PATH: Optional[str] = None class Config: env_file = ".env" settings = Settings()
4.3 数据模型定义
python
# models.py from sqlalchemy import Column, Integer, String, DateTime, Text, Float, JSON from sqlalchemy.ext.declarative import declarative_base from datetime import datetime import json Base = declarative_base() class HotSearch(Base): """热搜榜数据模型""" __tablename__ = "hot_searches" id = Column(Integer, primary_key=True, autoincrement=True) rank = Column(Integer, nullable=False, comment="排名") keyword = Column(String(255), nullable=False, index=True, comment="关键词") url = Column(String(500), comment="话题URL") tag = Column(String(50), comment="标签:热、新、爆等") hot_value = Column(Integer, comment="热度值") category = Column(String(100), comment="分类") created_at = Column(DateTime, default=datetime.now, index=True) raw_data = Column(JSON, comment="原始API数据") def __repr__(self): return f"<HotSearch(rank={self.rank}, keyword={self.keyword}, hot_value={self.hot_value})>" class TopicContent(Base): """话题内容数据模型""" __tablename__ = "topic_contents" id = Column(Integer, primary_key=True, autoincrement=True) topic_id = Column(String(100), nullable=False, index=True) keyword = Column(String(255), nullable=False, index=True) title = Column(String(500), comment="话题标题") description = Column(Text, comment="话题描述") read_count = Column(Integer, comment="阅读数") discuss_count = Column(Integer, comment="讨论数") origin_url = Column(String(500), comment="原始链接") top_posts = Column(JSON, comment="热门微博内容") related_topics = Column(JSON, comment="相关话题") crawl_time = Column(DateTime, default=datetime.now, index=True) def __repr__(self): return f"<TopicContent(keyword={self.keyword}, read={self.read_count})>" class CrawlLog(Base): """爬取日志""" __tablename__ = "crawl_logs" id = Column(Integer, primary_key=True, autoincrement=True) task_type = Column(String(50), nullable=False) status = Column(String(20), nullable=False) # success, failed, partial items_count = Column(Integer, default=0) error_message = Column(Text) created_at = Column(DateTime, default=datetime.now, index=True)4.4 异步爬虫核心类
python
# spider.py import asyncio import httpx from typing import Dict, List, Optional, Any import json import random from datetime import datetime from dataclasses import dataclass from contextlib import asynccontextmanager import anyio from tenacity import retry, stop_after_attempt, wait_exponential @dataclass class RequestConfig: """请求配置数据类""" headers: Dict[str, str] timeout: float = 30.0 verify_ssl: bool = True follow_redirects: bool = True class WeiboHotSearchSpider: """微博热搜异步爬虫""" def __init__(self, config=None): self.config = config or {} self.client = None self.request_config = self._create_request_config() # 用户代理池 self.user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", ] def _create_request_config(self) -> RequestConfig: """创建请求配置""" headers = { "User-Agent": random.choice(self.user_agents), "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1", "Cache-Control": "max-age=0", } return RequestConfig(headers=headers) @asynccontextmanager async def get_client(self): """获取异步HTTP客户端""" limits = httpx.Limits(max_keepalive_connections=5, max_connections=10) timeout = httpx.Timeout(timeout=self.request_config.timeout) async with httpx.AsyncClient( headers=self.request_config.headers, timeout=timeout, limits=limits, verify=self.request_config.verify_ssl, follow_redirects=self.request_config.follow_redirects, http2=True, # 启用HTTP/2 ) as client: yield client @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def fetch_hot_search(self) -> Dict[str, Any]: """ 获取微博热搜榜数据 使用微博官方API,避免解析HTML """ api_url = "https://weibo.com/ajax/statuses/hot_band" async with self.get_client() as client: try: # 添加随机延迟,避免请求过于频繁 await asyncio.sleep(random.uniform(1, 3)) response = await client.get( api_url, params={ "source": "weibo", "type": "uid", "uid": "", "_t": int(datetime.now().timestamp() * 1000) } ) response.raise_for_status() data = response.json() return self._parse_hot_search_data(data) except httpx.RequestError as e: print(f"请求失败: {e}") raise except json.JSONDecodeError as e: print(f"JSON解析失败: {e}") raise def _parse_hot_search_data(self, raw_data: Dict) -> Dict[str, Any]: """解析热搜榜原始数据""" result = { "timestamp": datetime.now().isoformat(), "total": 0, "hot_searches": [], "topics": [], "raw_data": raw_data } if "data" not in raw_data or "band_list" not in raw_data["data"]: return result band_list = raw_data["data"]["band_list"] result["total"] = len(band_list) for item in band_list: hot_item = { "rank": item.get("rank", 0), "keyword": item.get("word", ""), "url": f"https://s.weibo.com/weibo?q={item.get('word_scheme', '')}", "tag": item.get("label_name", ""), "hot_value": item.get("num", 0), "category": item.get("category", ""), "is_ad": item.get("is_ad", 0) == 1, "note": item.get("note", ""), "real_pos": item.get("realpos", 0), "raw_item": item } result["hot_searches"].append(hot_item) # 提取话题信息 if "mblog" in item and item["mblog"]: result["topics"].append({ "topic_id": item["mblog"].get("id", ""), "keyword": hot_item["keyword"], "title": item["mblog"].get("text", "")[:100] }) return result @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=5)) async def fetch_topic_detail(self, topic_id: str, keyword: str) -> Optional[Dict]: """ 获取话题详细信息 使用微博话题API """ api_url = f"https://weibo.com/ajax/statuses/topic_band" async with self.get_client() as client: try: await asyncio.sleep(random.uniform(0.5, 1.5)) params = { "q": keyword, "page": 1, "count": 20, "_t": int(datetime.now().timestamp() * 1000) } response = await client.get(api_url, params=params) response.raise_for_status() data = response.json() return self._parse_topic_data(data, keyword) except Exception as e: print(f"获取话题详情失败 {keyword}: {e}") return None def _parse_topic_data(self, raw_data: Dict, keyword: str) -> Dict[str, Any]: """解析话题数据""" result = { "keyword": keyword, "crawl_time": datetime.now().isoformat(), "read_count": 0, "discuss_count": 0, "top_posts": [], "related_topics": [], "raw_data": raw_data } if "data" not in raw_data: return result data = raw_data["data"] # 获取阅读和讨论数 if "cardlistInfo" in data: card_info = data["cardlistInfo"] result["read_count"] = card_info.get("total", 0) result["discuss_count"] = card_info.get("page_size", 0) # 提取热门微博 if "cards" in data: for card in data["cards"]: if "mblog" in card: blog = card["mblog"] post = { "id": blog.get("id", ""), "text": blog.get("text", ""), "user": blog.get("user", {}).get("screen_name", ""), "reposts": blog.get("reposts_count", 0), "comments": blog.get("comments_count", 0), "attitudes": blog.get("attitudes_count", 0), "created_at": blog.get("created_at", "") } result["top_posts"].append(post) return result async def fetch_multiple_topics(self, topics: List[Dict], max_concurrent: int = 5) -> List[Dict]: """并发获取多个话题详情""" semaphore = asyncio.Semaphore(max_concurrent) async def fetch_with_semaphore(topic: Dict) -> Optional[Dict]: async with semaphore: return await self.fetch_topic_detail( topic.get("topic_id", ""), topic.get("keyword", "") ) tasks = [fetch_with_semaphore(topic) for topic in topics] results = await asyncio.gather(*tasks, return_exceptions=True) # 过滤掉异常结果 valid_results = [] for result in results: if isinstance(result, dict): valid_results.append(result) elif result is not None: print(f"任务返回异常: {type(result).__name__}") return valid_results4.5 数据库操作层
python
# database.py from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from sqlalchemy import select, desc from contextlib import asynccontextmanager from typing import AsyncGenerator, List, Dict, Any import asyncio from models import Base, HotSearch, TopicContent, CrawlLog class DatabaseManager: """异步数据库管理器""" def __init__(self, database_url: str): self.engine = create_async_engine( database_url, echo=False, pool_pre_ping=True, pool_recycle=3600, ) self.async_session = async_sessionmaker( self.engine, class_=AsyncSession, expire_on_commit=False ) async def init_db(self): """初始化数据库表""" async with self.engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) @asynccontextmanager async def get_session(self) -> AsyncGenerator[AsyncSession, None]: """获取数据库会话""" async with self.async_session() as session: try: yield session await session.commit() except Exception: await session.rollback() raise async def save_hot_searches(self, hot_searches: List[Dict]) -> int: """保存热搜数据""" saved_count = 0 async with self.get_session() as session: for item in hot_searches: # 检查是否已存在相同时间点的相同关键词 existing = await session.execute( select(HotSearch).where( HotSearch.keyword == item["keyword"], HotSearch.created_at >= item["timestamp"] ) ) if existing.scalar_one_or_none() is None: hot_search = HotSearch( rank=item["rank"], keyword=item["keyword"], url=item.get("url", ""), tag=item.get("tag", ""), hot_value=item.get("hot_value", 0), category=item.get("category", ""), created_at=item["timestamp"], raw_data=item.get("raw_item", {}) ) session.add(hot_search) saved_count += 1 await session.commit() return saved_count async def save_topic_contents(self, topics: List[Dict]) -> int: """保存话题内容""" saved_count = 0 async with self.get_session() as session: for topic in topics: # 生成topic_id(使用关键词和时间戳) topic_id = f"{topic['keyword']}_{datetime.now().strftime('%Y%m%d%H')}" topic_content = TopicContent( topic_id=topic_id, keyword=topic["keyword"], title=topic.get("title", "")[:200], description=topic.get("description", ""), read_count=topic.get("read_count", 0), discuss_count=topic.get("discuss_count", 0), origin_url=topic.get("url", ""), top_posts=topic.get("top_posts", []), related_topics=topic.get("related_topics", []), crawl_time=topic.get("crawl_time", datetime.now()) ) session.add(topic_content) saved_count += 1 await session.commit() return saved_count async def get_recent_hot_searches(self, limit: int = 50) -> List[Dict]: """获取最近的热搜数据""" async with self.get_session() as session: result = await session.execute( select(HotSearch) .order_by(desc(HotSearch.created_at), HotSearch.rank) .limit(limit) ) searches = result.scalars().all() return [{ "rank": s.rank, "keyword": s.keyword, "hot_value": s.hot_value, "tag": s.tag, "created_at": s.created_at.isoformat() } for s in searches] async def log_crawl_task(self, task_type: str, status: str, items_count: int = 0, error_msg: str = ""): """记录爬取日志""" async with self.get_session() as session: log = CrawlLog( task_type=task_type, status=status, items_count=items_count, error_message=error_msg ) session.add(log) await session.commit()4.6 定时任务调度器
python
# scheduler.py from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.cron import CronTrigger from typing import Callable, Dict, Any import asyncio from datetime import datetime import logging class TaskScheduler: """异步任务调度器""" def __init__(self): self.scheduler = AsyncIOScheduler() self.logger = logging.getLogger(__name__) def add_interval_task(self, func: Callable, seconds: int, args: tuple = (), kwargs: Dict = None): """添加间隔任务""" trigger = IntervalTrigger(seconds=seconds) self.scheduler.add_job( func, trigger, args=args, kwargs=kwargs or {}, id=f"{func.__name__}_{datetime.now().timestamp()}", replace_existing=True ) self.logger.info(f"添加间隔任务: {func.__name__}, 间隔: {seconds}秒") def add_cron_task(self, func: Callable, hour: str = "*", minute: str = "0", args: tuple = (), kwargs: Dict = None): """添加Cron任务""" trigger = CronTrigger(hour=hour, minute=minute) self.scheduler.add_job( func, trigger, args=args, kwargs=kwargs or {}, id=f"{func.__name__}_cron", replace_existing=True ) self.logger.info(f"添加Cron任务: {func.__name__}, 时间: {hour}:{minute}") async def start(self): """启动调度器""" if not self.scheduler.running: self.scheduler.start() self.logger.info("任务调度器已启动") # 保持主程序运行 try: while True: await asyncio.sleep(3600) # 每小时检查一次 except (KeyboardInterrupt, SystemExit): self.shutdown() def shutdown(self): """关闭调度器""" if self.scheduler.running: self.scheduler.shutdown() self.logger.info("任务调度器已关闭")4.7 主程序整合
python
# main.py import asyncio import signal import sys from datetime import datetime from typing import List, Dict, Any import logging from contextlib import AsyncExitStack from config import settings from spider import WeiboHotSearchSpider from database import DatabaseManager from scheduler import TaskScheduler logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('weibo_crawler.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class WeiboHotSearchCollector: """微博热搜采集主程序""" def __init__(self): self.spider = WeiboHotSearchSpider() self.db = DatabaseManager(settings.DATABASE_URL) self.scheduler = TaskScheduler() self.is_running = True # 设置信号处理 signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) def signal_handler(self, signum, frame): """信号处理函数""" logger.info(f"收到信号 {signum}, 准备关闭程序...") self.is_running = False async def initialize(self): """初始化程序""" logger.info("正在初始化微博热搜采集系统...") # 初始化数据库 await self.db.init_db() logger.info("数据库初始化完成") # 添加定时任务 self.scheduler.add_interval_task( self.collect_hot_searches, settings.COLLECTION_INTERVAL ) # 每小时深度采集一次 self.scheduler.add_cron_task( self.collect_topic_details, hour="*", minute="30" ) logger.info("定时任务已设置") async def collect_hot_searches(self): """采集热搜榜任务""" logger.info("开始采集微博热搜榜...") try: # 获取热搜数据 hot_data = await self.spider.fetch_hot_search() # 保存到数据库 saved_count = await self.db.save_hot_searches(hot_data["hot_searches"]) # 记录日志 await self.db.log_crawl_task( "hot_search", "success", saved_count ) logger.info(f"热搜榜采集完成,保存 {saved_count} 条数据") # 提取话题信息供后续深度采集 topics_for_deep_crawl = [] for item in hot_data["hot_searches"][:10]: # 只取前10个进行深度采集 if "raw_item" in item and "mblog" in item["raw_item"]: topics_for_deep_crawl.append({ "keyword": item["keyword"], "topic_id": item["raw_item"]["mblog"].get("id", ""), "rank": item["rank"] }) # 缓存话题信息供深度采集使用 # 这里可以存入Redis或内存缓存 self.cached_topics = topics_for_deep_crawl except Exception as e: logger.error(f"热搜榜采集失败: {e}") await self.db.log_crawl_task( "hot_search", "failed", error_msg=str(e) ) async def collect_topic_details(self): """深度采集话题详情""" if not hasattr(self, 'cached_topics') or not self.cached_topics: logger.warning("没有可采集的话题信息") return logger.info(f"开始深度采集 {len(self.cached_topics)} 个话题...") try: # 并发获取话题详情 topic_details = await self.spider.fetch_multiple_topics( self.cached_topics, max_concurrent=3 ) # 保存话题内容 saved_count = await self.db.save_topic_contents(topic_details) # 记录日志 await self.db.log_crawl_task( "topic_detail", "success", saved_count ) logger.info(f"话题详情采集完成,保存 {saved_count} 条数据") except Exception as e: logger.error(f"话题详情采集失败: {e}") await self.db.log_crawl_task( "topic_detail", "failed", error_msg=str(e) ) async def run_once(self): """单次运行所有采集任务""" logger.info("执行单次采集任务...") await self.collect_hot_searches() await asyncio.sleep(5) # 等待5秒 await self.collect_topic_details() logger.info("单次采集任务完成") async def run_scheduled(self): """运行定时采集任务""" logger.info("启动定时采集任务...") await self.initialize() await self.scheduler.start() async def cleanup(self): """清理资源""" logger.info("正在清理资源...") # 这里可以添加其他清理逻辑 logger.info("资源清理完成") async def main(): """主函数""" collector = WeiboHotSearchCollector() # 使用AsyncExitStack管理资源 async with AsyncExitStack() as stack: try: # 根据命令行参数决定运行模式 if len(sys.argv) > 1 and sys.argv[1] == "--once": await collector.run_once() else: await collector.run_scheduled() # 保持程序运行 while collector.is_running: await asyncio.sleep(1) except KeyboardInterrupt: logger.info("用户中断程序") finally: await collector.cleanup() logger.info("程序正常退出") if __name__ == "__main__": # 检查Python版本 import sys if sys.version_info < (3, 8): print("请使用Python 3.8或更高版本") sys.exit(1) # 运行主程序 asyncio.run(main())4.8 反反爬策略增强版
python
# anti_anti_spider.py import random import time from typing import Dict, List import hashlib from urllib.parse import urlencode class AntiAntiSpider: """反反爬策略增强类""" def __init__(self): self.cookie_pool = [] self.proxy_pool = [] self.last_request_time = 0 self.min_request_interval = 1.0 # 最小请求间隔 def generate_fingerprint(self) -> Dict[str, str]: """生成浏览器指纹""" return { "User-Agent": self.get_random_user_agent(), "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Pragma": "no-cache", "Cache-Control": "no-cache", } def get_random_user_agent(self) -> str: """获取随机User-Agent""" user_agents = [ # Chrome "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", # Firefox "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/120.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/120.0", # Safari "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15", ] return random.choice(user_agents) def add_delay(self): """添加随机延迟""" current_time = time.time() elapsed = current_time - self.last_request_time if elapsed < self.min_request_interval: sleep_time = self.min_request_interval - elapsed + random.uniform(0.5, 2.0) time.sleep(sleep_time) self.last_request_time = time.time() def generate_signature(self, params: Dict, secret: str = "weibo") -> str: """生成请求签名(模拟微博签名)""" # 对参数排序并拼接 sorted_params = sorted(params.items(), key=lambda x: x[0]) param_str = "&".join([f"{k}={v}" for k, v in sorted_params]) # 添加密钥并计算MD5 sign_str = param_str + secret return hashlib.md5(sign_str.encode()).hexdigest() def rotate_proxy(self) -> Dict[str, str]: """轮换代理""" if not self.proxy_pool: return {} proxy = random.choice(self.proxy_pool) return { "http": proxy, "https": proxy } def simulate_human_behavior(self): """模拟人类行为""" # 随机鼠标移动和滚动 behaviors = [ lambda: time.sleep(random.uniform(0.1, 0.5)), lambda: None, # 模拟点击 lambda: None, # 模拟滚动 ] random.choice(behaviors)()4.9 数据导出与API接口
python
# api.py from fastapi import FastAPI, HTTPException, Query, BackgroundTasks from fastapi.responses import JSONResponse, FileResponse from fastapi.middleware.cors import CORSMiddleware import pandas as pd from datetime import datetime, timedelta from typing import Optional, List import json import asyncio from database import DatabaseManager from config import settings app = FastAPI(title="微博热搜数据API", version="1.0.0") # 添加CORS中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 全局数据库管理器 db_manager = None @app.on_event("startup") async def startup(): """启动事件""" global db_manager db_manager = DatabaseManager(settings.DATABASE_URL) await db_manager.init_db() @app.get("/") async def root(): """根路径""" return {"message": "微博热搜数据API", "status": "running"} @app.get("/api/hot-searches/recent") async def get_recent_hot_searches( limit: int = Query(50, ge=1, le=1000), hours: int = Query(24, ge=1, le=720) ): """获取最近的热搜数据""" try: searches = await db_manager.get_recent_hot_searches(limit) return { "code": 0, "message": "success", "data": searches, "count": len(searches) } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/hot-searches/trend") async def get_hot_search_trend( keyword: str = Query(..., min_length=1), days: int = Query(7, ge=1, le=30) ): """获取关键词的热度趋势""" try: # 这里实现趋势查询逻辑 # 由于篇幅限制,省略具体实现 return { "code": 0, "message": "success", "data": {"keyword": keyword, "trend": []} } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/topics/{keyword}") async def get_topic_detail( keyword: str, limit: int = Query(20, ge=1, le=100) ): """获取话题详情""" try: # 这里实现话题详情查询 return { "code": 0, "message": "success", "data": {"keyword": keyword} } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/export/csv") async def export_to_csv( start_date: str = Query(None), end_date: str = Query(None), background_tasks: BackgroundTasks = None ): """导出数据为CSV""" try: # 构建查询条件 # 这里实现数据导出逻辑 # 模拟生成文件 df = pd.DataFrame({ "rank": range(1, 51), "keyword": [f"测试关键词{i}" for i in range(50)], "hot_value": [1000000 - i * 10000 for i in range(50)], "time": [datetime.now().isoformat()] * 50 }) filename = f"weibo_hot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" filepath = f"./exports/{filename}" # 确保目录存在 import os os.makedirs("./exports", exist_ok=True) df.to_csv(filepath, index=False, encoding="utf-8-sig") return FileResponse( filepath, filename=filename, media_type="text/csv" ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/stats/summary") async def get_stats_summary(): """获取统计摘要""" try: # 这里实现统计逻辑 return { "code": 0, "message": "success", "data": { "total_records": 10000, "last_update": datetime.now().isoformat(), "top_keywords": [], "hot_categories": [] } } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)五、部署与运行指南
5.1 环境配置
bash
# 1. 克隆项目 git clone <repository-url> cd weibo-hot-search-crawler # 2. 创建虚拟环境 python -m venv venv # Windows venv\Scripts\activate # Linux/Mac source venv/bin/activate # 3. 安装依赖 pip install -r requirements.txt # 4. 配置环境变量 cp .env.example .env # 编辑.env文件,配置数据库和API设置
5.2 运行方式
bash
# 1. 单次运行(测试用) python main.py --once # 2. 启动定时采集 python main.py # 3. 启动API服务 python api.py # 4. 使用Docker运行 docker build -t weibo-crawler . docker run -d --name weibo-crawler weibo-crawler
5.3 Docker部署
dockerfile
# Dockerfile FROM python:3.11-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ gcc \ g++ \ && rm -rf /var/lib/apt/lists/* # 复制依赖文件 COPY requirements.txt . # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt # 复制项目文件 COPY . . # 创建数据目录 RUN mkdir -p /app/exports # 运行应用 CMD ["python", "main.py"]
六、优化建议与注意事项
6.1 性能优化
连接池优化:调整HTTP连接池大小,避免频繁建立连接
缓存策略:使用Redis缓存频繁访问的数据
批量插入:数据库操作使用批量插入,减少IO次数
6.2 反爬策略应对
IP代理:使用高质量的代理IP池
请求随机化:随机化请求间隔和请求头
验证码识别:集成验证码识别服务
浏览器仿真:对JS渲染页面使用Playwright
6.3 数据质量保证
数据去重:建立完善的数据去重机制
异常检测:监控数据异常波动
数据校验:对采集的数据进行格式和完整性校验
6.4 法律与合规
遵守Robots协议:尊重网站的robots.txt
控制请求频率:避免对目标网站造成压力
数据使用规范:遵守相关法律法规,合理使用数据
用户隐私保护:避免采集用户隐私信息
七、扩展功能建议
实时监控告警:对异常热搜进行实时告警
情感分析:对话题内容进行情感倾向分析
舆情分析:基于热搜数据进行舆情分析
数据可视化:构建数据看板,直观展示数据
多平台对比:对比多个社交平台的热点话题