news 2026/2/18 18:44:32

Python爬虫实战:使用异步技术与API逆向定时采集微博热搜榜及话题内容

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python爬虫实战:使用异步技术与API逆向定时采集微博热搜榜及话题内容

一、引言:微博热搜背后的数据价值

微博作为中国最大的社交媒体平台之一,其热搜榜实时反映了社会热点、网民关注和舆论走向。对于舆情分析、市场研究、内容创作等领域,微博热搜数据具有极高的价值。本文将详细介绍如何使用Python最新爬虫技术,通过异步请求和API逆向工程,构建一个高效的微博热搜采集系统,实现定时抓取热搜榜及深度话题内容。

二、技术栈概览:现代化爬虫工具集

在开始编码前,让我们了解本次项目将使用的核心技术:

  1. HTTP客户端

    • httpx:支持HTTP/2的异步HTTP客户端,性能优于传统requests

    • aiohttp:异步HTTP客户端/服务器框架

  2. 异步编程

    • asyncio:Python原生异步IO框架

    • anyio:提供更统一的异步API接口

  3. 数据解析

    • parsel:基于XPath和CSS选择器的数据提取库

    • json:处理API返回的JSON数据

  4. 调度与存储

    • apscheduler:高级Python调度器

    • sqlalchemy:SQL工具包和ORM

    • redis:缓存和消息队列

  5. 反反爬策略

    • 请求头轮换

    • IP代理池

    • 浏览器指纹模拟

三、项目架构设计

text

微博热搜采集系统 ├── 数据采集层(Async Spider) │ ├── 热搜榜抓取模块 │ ├── 话题内容深度采集模块 │ └── 反反爬策略模块 ├── 数据存储层(Database) │ ├── 热搜数据表 │ ├── 话题内容表 │ └── 用户画像表(可选) ├── 任务调度层(Scheduler) │ ├── 定时触发 │ ├── 失败重试 │ └── 任务监控 └── 数据接口层(API) ├── 实时数据查询 ├── 历史数据分析 └── 数据导出功能

四、核心代码实现

4.1 环境配置与依赖安装

python

# requirements.txt httpx>=0.24.0 aiohttp>=3.8.0 anyio>=3.0.0 parsel>=1.8.0 apscheduler>=3.10.0 sqlalchemy>=2.0.0 redis>=4.5.0 pydantic>=2.0.0 pandas>=2.0.0 playwright>=1.35.0 # 用于复杂JS渲染页面

4.2 配置文件设计

python

# config.py from pydantic_settings import BaseSettings from typing import List, Optional import os class Settings(BaseSettings): # 微博API配置 WEIBO_HOT_API: str = "https://weibo.com/ajax/statuses/hot_band" WEIBO_TOPIC_API: str = "https://weibo.com/ajax/statuses/topic_band" WEIBO_DETAIL_API: str = "https://weibo.com/ajax/statuses/show" # 请求配置 REQUEST_TIMEOUT: int = 30 CONCURRENT_LIMIT: int = 10 RETRY_TIMES: int = 3 # 代理配置 PROXY_ENABLED: bool = False PROXY_POOL: List[str] = [] # 数据库配置 DATABASE_URL: str = "sqlite+aiosqlite:///weibo_hot.db" REDIS_URL: str = "redis://localhost:6379/0" # 调度配置 COLLECTION_INTERVAL: int = 300 # 5分钟 DEEP_CRAWL_INTERVAL: int = 3600 # 1小时 # 浏览器模拟配置 USE_BROWSER: bool = False BROWSER_PATH: Optional[str] = None class Config: env_file = ".env" settings = Settings()

4.3 数据模型定义

python

# models.py from sqlalchemy import Column, Integer, String, DateTime, Text, Float, JSON from sqlalchemy.ext.declarative import declarative_base from datetime import datetime import json Base = declarative_base() class HotSearch(Base): """热搜榜数据模型""" __tablename__ = "hot_searches" id = Column(Integer, primary_key=True, autoincrement=True) rank = Column(Integer, nullable=False, comment="排名") keyword = Column(String(255), nullable=False, index=True, comment="关键词") url = Column(String(500), comment="话题URL") tag = Column(String(50), comment="标签:热、新、爆等") hot_value = Column(Integer, comment="热度值") category = Column(String(100), comment="分类") created_at = Column(DateTime, default=datetime.now, index=True) raw_data = Column(JSON, comment="原始API数据") def __repr__(self): return f"<HotSearch(rank={self.rank}, keyword={self.keyword}, hot_value={self.hot_value})>" class TopicContent(Base): """话题内容数据模型""" __tablename__ = "topic_contents" id = Column(Integer, primary_key=True, autoincrement=True) topic_id = Column(String(100), nullable=False, index=True) keyword = Column(String(255), nullable=False, index=True) title = Column(String(500), comment="话题标题") description = Column(Text, comment="话题描述") read_count = Column(Integer, comment="阅读数") discuss_count = Column(Integer, comment="讨论数") origin_url = Column(String(500), comment="原始链接") top_posts = Column(JSON, comment="热门微博内容") related_topics = Column(JSON, comment="相关话题") crawl_time = Column(DateTime, default=datetime.now, index=True) def __repr__(self): return f"<TopicContent(keyword={self.keyword}, read={self.read_count})>" class CrawlLog(Base): """爬取日志""" __tablename__ = "crawl_logs" id = Column(Integer, primary_key=True, autoincrement=True) task_type = Column(String(50), nullable=False) status = Column(String(20), nullable=False) # success, failed, partial items_count = Column(Integer, default=0) error_message = Column(Text) created_at = Column(DateTime, default=datetime.now, index=True)

4.4 异步爬虫核心类

python

# spider.py import asyncio import httpx from typing import Dict, List, Optional, Any import json import random from datetime import datetime from dataclasses import dataclass from contextlib import asynccontextmanager import anyio from tenacity import retry, stop_after_attempt, wait_exponential @dataclass class RequestConfig: """请求配置数据类""" headers: Dict[str, str] timeout: float = 30.0 verify_ssl: bool = True follow_redirects: bool = True class WeiboHotSearchSpider: """微博热搜异步爬虫""" def __init__(self, config=None): self.config = config or {} self.client = None self.request_config = self._create_request_config() # 用户代理池 self.user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", ] def _create_request_config(self) -> RequestConfig: """创建请求配置""" headers = { "User-Agent": random.choice(self.user_agents), "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1", "Cache-Control": "max-age=0", } return RequestConfig(headers=headers) @asynccontextmanager async def get_client(self): """获取异步HTTP客户端""" limits = httpx.Limits(max_keepalive_connections=5, max_connections=10) timeout = httpx.Timeout(timeout=self.request_config.timeout) async with httpx.AsyncClient( headers=self.request_config.headers, timeout=timeout, limits=limits, verify=self.request_config.verify_ssl, follow_redirects=self.request_config.follow_redirects, http2=True, # 启用HTTP/2 ) as client: yield client @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def fetch_hot_search(self) -> Dict[str, Any]: """ 获取微博热搜榜数据 使用微博官方API,避免解析HTML """ api_url = "https://weibo.com/ajax/statuses/hot_band" async with self.get_client() as client: try: # 添加随机延迟,避免请求过于频繁 await asyncio.sleep(random.uniform(1, 3)) response = await client.get( api_url, params={ "source": "weibo", "type": "uid", "uid": "", "_t": int(datetime.now().timestamp() * 1000) } ) response.raise_for_status() data = response.json() return self._parse_hot_search_data(data) except httpx.RequestError as e: print(f"请求失败: {e}") raise except json.JSONDecodeError as e: print(f"JSON解析失败: {e}") raise def _parse_hot_search_data(self, raw_data: Dict) -> Dict[str, Any]: """解析热搜榜原始数据""" result = { "timestamp": datetime.now().isoformat(), "total": 0, "hot_searches": [], "topics": [], "raw_data": raw_data } if "data" not in raw_data or "band_list" not in raw_data["data"]: return result band_list = raw_data["data"]["band_list"] result["total"] = len(band_list) for item in band_list: hot_item = { "rank": item.get("rank", 0), "keyword": item.get("word", ""), "url": f"https://s.weibo.com/weibo?q={item.get('word_scheme', '')}", "tag": item.get("label_name", ""), "hot_value": item.get("num", 0), "category": item.get("category", ""), "is_ad": item.get("is_ad", 0) == 1, "note": item.get("note", ""), "real_pos": item.get("realpos", 0), "raw_item": item } result["hot_searches"].append(hot_item) # 提取话题信息 if "mblog" in item and item["mblog"]: result["topics"].append({ "topic_id": item["mblog"].get("id", ""), "keyword": hot_item["keyword"], "title": item["mblog"].get("text", "")[:100] }) return result @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=5)) async def fetch_topic_detail(self, topic_id: str, keyword: str) -> Optional[Dict]: """ 获取话题详细信息 使用微博话题API """ api_url = f"https://weibo.com/ajax/statuses/topic_band" async with self.get_client() as client: try: await asyncio.sleep(random.uniform(0.5, 1.5)) params = { "q": keyword, "page": 1, "count": 20, "_t": int(datetime.now().timestamp() * 1000) } response = await client.get(api_url, params=params) response.raise_for_status() data = response.json() return self._parse_topic_data(data, keyword) except Exception as e: print(f"获取话题详情失败 {keyword}: {e}") return None def _parse_topic_data(self, raw_data: Dict, keyword: str) -> Dict[str, Any]: """解析话题数据""" result = { "keyword": keyword, "crawl_time": datetime.now().isoformat(), "read_count": 0, "discuss_count": 0, "top_posts": [], "related_topics": [], "raw_data": raw_data } if "data" not in raw_data: return result data = raw_data["data"] # 获取阅读和讨论数 if "cardlistInfo" in data: card_info = data["cardlistInfo"] result["read_count"] = card_info.get("total", 0) result["discuss_count"] = card_info.get("page_size", 0) # 提取热门微博 if "cards" in data: for card in data["cards"]: if "mblog" in card: blog = card["mblog"] post = { "id": blog.get("id", ""), "text": blog.get("text", ""), "user": blog.get("user", {}).get("screen_name", ""), "reposts": blog.get("reposts_count", 0), "comments": blog.get("comments_count", 0), "attitudes": blog.get("attitudes_count", 0), "created_at": blog.get("created_at", "") } result["top_posts"].append(post) return result async def fetch_multiple_topics(self, topics: List[Dict], max_concurrent: int = 5) -> List[Dict]: """并发获取多个话题详情""" semaphore = asyncio.Semaphore(max_concurrent) async def fetch_with_semaphore(topic: Dict) -> Optional[Dict]: async with semaphore: return await self.fetch_topic_detail( topic.get("topic_id", ""), topic.get("keyword", "") ) tasks = [fetch_with_semaphore(topic) for topic in topics] results = await asyncio.gather(*tasks, return_exceptions=True) # 过滤掉异常结果 valid_results = [] for result in results: if isinstance(result, dict): valid_results.append(result) elif result is not None: print(f"任务返回异常: {type(result).__name__}") return valid_results

4.5 数据库操作层

python

# database.py from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from sqlalchemy import select, desc from contextlib import asynccontextmanager from typing import AsyncGenerator, List, Dict, Any import asyncio from models import Base, HotSearch, TopicContent, CrawlLog class DatabaseManager: """异步数据库管理器""" def __init__(self, database_url: str): self.engine = create_async_engine( database_url, echo=False, pool_pre_ping=True, pool_recycle=3600, ) self.async_session = async_sessionmaker( self.engine, class_=AsyncSession, expire_on_commit=False ) async def init_db(self): """初始化数据库表""" async with self.engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) @asynccontextmanager async def get_session(self) -> AsyncGenerator[AsyncSession, None]: """获取数据库会话""" async with self.async_session() as session: try: yield session await session.commit() except Exception: await session.rollback() raise async def save_hot_searches(self, hot_searches: List[Dict]) -> int: """保存热搜数据""" saved_count = 0 async with self.get_session() as session: for item in hot_searches: # 检查是否已存在相同时间点的相同关键词 existing = await session.execute( select(HotSearch).where( HotSearch.keyword == item["keyword"], HotSearch.created_at >= item["timestamp"] ) ) if existing.scalar_one_or_none() is None: hot_search = HotSearch( rank=item["rank"], keyword=item["keyword"], url=item.get("url", ""), tag=item.get("tag", ""), hot_value=item.get("hot_value", 0), category=item.get("category", ""), created_at=item["timestamp"], raw_data=item.get("raw_item", {}) ) session.add(hot_search) saved_count += 1 await session.commit() return saved_count async def save_topic_contents(self, topics: List[Dict]) -> int: """保存话题内容""" saved_count = 0 async with self.get_session() as session: for topic in topics: # 生成topic_id(使用关键词和时间戳) topic_id = f"{topic['keyword']}_{datetime.now().strftime('%Y%m%d%H')}" topic_content = TopicContent( topic_id=topic_id, keyword=topic["keyword"], title=topic.get("title", "")[:200], description=topic.get("description", ""), read_count=topic.get("read_count", 0), discuss_count=topic.get("discuss_count", 0), origin_url=topic.get("url", ""), top_posts=topic.get("top_posts", []), related_topics=topic.get("related_topics", []), crawl_time=topic.get("crawl_time", datetime.now()) ) session.add(topic_content) saved_count += 1 await session.commit() return saved_count async def get_recent_hot_searches(self, limit: int = 50) -> List[Dict]: """获取最近的热搜数据""" async with self.get_session() as session: result = await session.execute( select(HotSearch) .order_by(desc(HotSearch.created_at), HotSearch.rank) .limit(limit) ) searches = result.scalars().all() return [{ "rank": s.rank, "keyword": s.keyword, "hot_value": s.hot_value, "tag": s.tag, "created_at": s.created_at.isoformat() } for s in searches] async def log_crawl_task(self, task_type: str, status: str, items_count: int = 0, error_msg: str = ""): """记录爬取日志""" async with self.get_session() as session: log = CrawlLog( task_type=task_type, status=status, items_count=items_count, error_message=error_msg ) session.add(log) await session.commit()

4.6 定时任务调度器

python

# scheduler.py from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.cron import CronTrigger from typing import Callable, Dict, Any import asyncio from datetime import datetime import logging class TaskScheduler: """异步任务调度器""" def __init__(self): self.scheduler = AsyncIOScheduler() self.logger = logging.getLogger(__name__) def add_interval_task(self, func: Callable, seconds: int, args: tuple = (), kwargs: Dict = None): """添加间隔任务""" trigger = IntervalTrigger(seconds=seconds) self.scheduler.add_job( func, trigger, args=args, kwargs=kwargs or {}, id=f"{func.__name__}_{datetime.now().timestamp()}", replace_existing=True ) self.logger.info(f"添加间隔任务: {func.__name__}, 间隔: {seconds}秒") def add_cron_task(self, func: Callable, hour: str = "*", minute: str = "0", args: tuple = (), kwargs: Dict = None): """添加Cron任务""" trigger = CronTrigger(hour=hour, minute=minute) self.scheduler.add_job( func, trigger, args=args, kwargs=kwargs or {}, id=f"{func.__name__}_cron", replace_existing=True ) self.logger.info(f"添加Cron任务: {func.__name__}, 时间: {hour}:{minute}") async def start(self): """启动调度器""" if not self.scheduler.running: self.scheduler.start() self.logger.info("任务调度器已启动") # 保持主程序运行 try: while True: await asyncio.sleep(3600) # 每小时检查一次 except (KeyboardInterrupt, SystemExit): self.shutdown() def shutdown(self): """关闭调度器""" if self.scheduler.running: self.scheduler.shutdown() self.logger.info("任务调度器已关闭")

4.7 主程序整合

python

# main.py import asyncio import signal import sys from datetime import datetime from typing import List, Dict, Any import logging from contextlib import AsyncExitStack from config import settings from spider import WeiboHotSearchSpider from database import DatabaseManager from scheduler import TaskScheduler logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('weibo_crawler.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class WeiboHotSearchCollector: """微博热搜采集主程序""" def __init__(self): self.spider = WeiboHotSearchSpider() self.db = DatabaseManager(settings.DATABASE_URL) self.scheduler = TaskScheduler() self.is_running = True # 设置信号处理 signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) def signal_handler(self, signum, frame): """信号处理函数""" logger.info(f"收到信号 {signum}, 准备关闭程序...") self.is_running = False async def initialize(self): """初始化程序""" logger.info("正在初始化微博热搜采集系统...") # 初始化数据库 await self.db.init_db() logger.info("数据库初始化完成") # 添加定时任务 self.scheduler.add_interval_task( self.collect_hot_searches, settings.COLLECTION_INTERVAL ) # 每小时深度采集一次 self.scheduler.add_cron_task( self.collect_topic_details, hour="*", minute="30" ) logger.info("定时任务已设置") async def collect_hot_searches(self): """采集热搜榜任务""" logger.info("开始采集微博热搜榜...") try: # 获取热搜数据 hot_data = await self.spider.fetch_hot_search() # 保存到数据库 saved_count = await self.db.save_hot_searches(hot_data["hot_searches"]) # 记录日志 await self.db.log_crawl_task( "hot_search", "success", saved_count ) logger.info(f"热搜榜采集完成,保存 {saved_count} 条数据") # 提取话题信息供后续深度采集 topics_for_deep_crawl = [] for item in hot_data["hot_searches"][:10]: # 只取前10个进行深度采集 if "raw_item" in item and "mblog" in item["raw_item"]: topics_for_deep_crawl.append({ "keyword": item["keyword"], "topic_id": item["raw_item"]["mblog"].get("id", ""), "rank": item["rank"] }) # 缓存话题信息供深度采集使用 # 这里可以存入Redis或内存缓存 self.cached_topics = topics_for_deep_crawl except Exception as e: logger.error(f"热搜榜采集失败: {e}") await self.db.log_crawl_task( "hot_search", "failed", error_msg=str(e) ) async def collect_topic_details(self): """深度采集话题详情""" if not hasattr(self, 'cached_topics') or not self.cached_topics: logger.warning("没有可采集的话题信息") return logger.info(f"开始深度采集 {len(self.cached_topics)} 个话题...") try: # 并发获取话题详情 topic_details = await self.spider.fetch_multiple_topics( self.cached_topics, max_concurrent=3 ) # 保存话题内容 saved_count = await self.db.save_topic_contents(topic_details) # 记录日志 await self.db.log_crawl_task( "topic_detail", "success", saved_count ) logger.info(f"话题详情采集完成,保存 {saved_count} 条数据") except Exception as e: logger.error(f"话题详情采集失败: {e}") await self.db.log_crawl_task( "topic_detail", "failed", error_msg=str(e) ) async def run_once(self): """单次运行所有采集任务""" logger.info("执行单次采集任务...") await self.collect_hot_searches() await asyncio.sleep(5) # 等待5秒 await self.collect_topic_details() logger.info("单次采集任务完成") async def run_scheduled(self): """运行定时采集任务""" logger.info("启动定时采集任务...") await self.initialize() await self.scheduler.start() async def cleanup(self): """清理资源""" logger.info("正在清理资源...") # 这里可以添加其他清理逻辑 logger.info("资源清理完成") async def main(): """主函数""" collector = WeiboHotSearchCollector() # 使用AsyncExitStack管理资源 async with AsyncExitStack() as stack: try: # 根据命令行参数决定运行模式 if len(sys.argv) > 1 and sys.argv[1] == "--once": await collector.run_once() else: await collector.run_scheduled() # 保持程序运行 while collector.is_running: await asyncio.sleep(1) except KeyboardInterrupt: logger.info("用户中断程序") finally: await collector.cleanup() logger.info("程序正常退出") if __name__ == "__main__": # 检查Python版本 import sys if sys.version_info < (3, 8): print("请使用Python 3.8或更高版本") sys.exit(1) # 运行主程序 asyncio.run(main())

4.8 反反爬策略增强版

python

# anti_anti_spider.py import random import time from typing import Dict, List import hashlib from urllib.parse import urlencode class AntiAntiSpider: """反反爬策略增强类""" def __init__(self): self.cookie_pool = [] self.proxy_pool = [] self.last_request_time = 0 self.min_request_interval = 1.0 # 最小请求间隔 def generate_fingerprint(self) -> Dict[str, str]: """生成浏览器指纹""" return { "User-Agent": self.get_random_user_agent(), "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Pragma": "no-cache", "Cache-Control": "no-cache", } def get_random_user_agent(self) -> str: """获取随机User-Agent""" user_agents = [ # Chrome "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", # Firefox "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/120.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/120.0", # Safari "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15", ] return random.choice(user_agents) def add_delay(self): """添加随机延迟""" current_time = time.time() elapsed = current_time - self.last_request_time if elapsed < self.min_request_interval: sleep_time = self.min_request_interval - elapsed + random.uniform(0.5, 2.0) time.sleep(sleep_time) self.last_request_time = time.time() def generate_signature(self, params: Dict, secret: str = "weibo") -> str: """生成请求签名(模拟微博签名)""" # 对参数排序并拼接 sorted_params = sorted(params.items(), key=lambda x: x[0]) param_str = "&".join([f"{k}={v}" for k, v in sorted_params]) # 添加密钥并计算MD5 sign_str = param_str + secret return hashlib.md5(sign_str.encode()).hexdigest() def rotate_proxy(self) -> Dict[str, str]: """轮换代理""" if not self.proxy_pool: return {} proxy = random.choice(self.proxy_pool) return { "http": proxy, "https": proxy } def simulate_human_behavior(self): """模拟人类行为""" # 随机鼠标移动和滚动 behaviors = [ lambda: time.sleep(random.uniform(0.1, 0.5)), lambda: None, # 模拟点击 lambda: None, # 模拟滚动 ] random.choice(behaviors)()

4.9 数据导出与API接口

python

# api.py from fastapi import FastAPI, HTTPException, Query, BackgroundTasks from fastapi.responses import JSONResponse, FileResponse from fastapi.middleware.cors import CORSMiddleware import pandas as pd from datetime import datetime, timedelta from typing import Optional, List import json import asyncio from database import DatabaseManager from config import settings app = FastAPI(title="微博热搜数据API", version="1.0.0") # 添加CORS中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 全局数据库管理器 db_manager = None @app.on_event("startup") async def startup(): """启动事件""" global db_manager db_manager = DatabaseManager(settings.DATABASE_URL) await db_manager.init_db() @app.get("/") async def root(): """根路径""" return {"message": "微博热搜数据API", "status": "running"} @app.get("/api/hot-searches/recent") async def get_recent_hot_searches( limit: int = Query(50, ge=1, le=1000), hours: int = Query(24, ge=1, le=720) ): """获取最近的热搜数据""" try: searches = await db_manager.get_recent_hot_searches(limit) return { "code": 0, "message": "success", "data": searches, "count": len(searches) } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/hot-searches/trend") async def get_hot_search_trend( keyword: str = Query(..., min_length=1), days: int = Query(7, ge=1, le=30) ): """获取关键词的热度趋势""" try: # 这里实现趋势查询逻辑 # 由于篇幅限制,省略具体实现 return { "code": 0, "message": "success", "data": {"keyword": keyword, "trend": []} } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/topics/{keyword}") async def get_topic_detail( keyword: str, limit: int = Query(20, ge=1, le=100) ): """获取话题详情""" try: # 这里实现话题详情查询 return { "code": 0, "message": "success", "data": {"keyword": keyword} } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/export/csv") async def export_to_csv( start_date: str = Query(None), end_date: str = Query(None), background_tasks: BackgroundTasks = None ): """导出数据为CSV""" try: # 构建查询条件 # 这里实现数据导出逻辑 # 模拟生成文件 df = pd.DataFrame({ "rank": range(1, 51), "keyword": [f"测试关键词{i}" for i in range(50)], "hot_value": [1000000 - i * 10000 for i in range(50)], "time": [datetime.now().isoformat()] * 50 }) filename = f"weibo_hot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" filepath = f"./exports/{filename}" # 确保目录存在 import os os.makedirs("./exports", exist_ok=True) df.to_csv(filepath, index=False, encoding="utf-8-sig") return FileResponse( filepath, filename=filename, media_type="text/csv" ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/stats/summary") async def get_stats_summary(): """获取统计摘要""" try: # 这里实现统计逻辑 return { "code": 0, "message": "success", "data": { "total_records": 10000, "last_update": datetime.now().isoformat(), "top_keywords": [], "hot_categories": [] } } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)

五、部署与运行指南

5.1 环境配置

bash

# 1. 克隆项目 git clone <repository-url> cd weibo-hot-search-crawler # 2. 创建虚拟环境 python -m venv venv # Windows venv\Scripts\activate # Linux/Mac source venv/bin/activate # 3. 安装依赖 pip install -r requirements.txt # 4. 配置环境变量 cp .env.example .env # 编辑.env文件,配置数据库和API设置

5.2 运行方式

bash

# 1. 单次运行(测试用) python main.py --once # 2. 启动定时采集 python main.py # 3. 启动API服务 python api.py # 4. 使用Docker运行 docker build -t weibo-crawler . docker run -d --name weibo-crawler weibo-crawler

5.3 Docker部署

dockerfile

# Dockerfile FROM python:3.11-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ gcc \ g++ \ && rm -rf /var/lib/apt/lists/* # 复制依赖文件 COPY requirements.txt . # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt # 复制项目文件 COPY . . # 创建数据目录 RUN mkdir -p /app/exports # 运行应用 CMD ["python", "main.py"]

六、优化建议与注意事项

6.1 性能优化

  1. 连接池优化:调整HTTP连接池大小,避免频繁建立连接

  2. 缓存策略:使用Redis缓存频繁访问的数据

  3. 批量插入:数据库操作使用批量插入,减少IO次数

6.2 反爬策略应对

  1. IP代理:使用高质量的代理IP池

  2. 请求随机化:随机化请求间隔和请求头

  3. 验证码识别:集成验证码识别服务

  4. 浏览器仿真:对JS渲染页面使用Playwright

6.3 数据质量保证

  1. 数据去重:建立完善的数据去重机制

  2. 异常检测:监控数据异常波动

  3. 数据校验:对采集的数据进行格式和完整性校验

6.4 法律与合规

  1. 遵守Robots协议:尊重网站的robots.txt

  2. 控制请求频率:避免对目标网站造成压力

  3. 数据使用规范:遵守相关法律法规,合理使用数据

  4. 用户隐私保护:避免采集用户隐私信息

七、扩展功能建议

  1. 实时监控告警:对异常热搜进行实时告警

  2. 情感分析:对话题内容进行情感倾向分析

  3. 舆情分析:基于热搜数据进行舆情分析

  4. 数据可视化:构建数据看板,直观展示数据

  5. 多平台对比:对比多个社交平台的热点话题

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/17 21:01:33

编码器十年演进(2015–2025)

编码器十年演进&#xff08;2015–2025&#xff09; 一句话总论&#xff1a; 2015年编码器还是“有感霍尔/光电低分辨率集中式信号处理”的传统时代&#xff0c;2025年已进化成“无感高精度磁/电容编码器分布式一体化端到端VLA自校准量子级抗扰自愈”的具身智能时代&#xff0c…

作者头像 李华
网站建设 2026/2/7 0:27:26

商业化应用前景:基于lora-scripts的服务模式创新

商业化应用前景&#xff1a;基于lora-scripts的服务模式创新 在AI生成内容&#xff08;AIGC&#xff09;浪潮席卷各行各业的今天&#xff0c;一个核心矛盾日益凸显&#xff1a;通用大模型虽然强大&#xff0c;却难以精准满足企业或创作者对风格、术语、角色和输出格式的高度定制…

作者头像 李华
网站建设 2026/2/18 17:10:25

vue+uniapp+springboot小程序餐饮美食点单系统

文章目录系统概述技术架构应用价值关键词主要技术与实现手段系统设计与实现的思路系统设计方法java类核心代码部分展示结论源码lw获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;系统概述 VueUniappSpringBoot小程序餐饮美食点单系统是一款基于…

作者头像 李华
网站建设 2026/2/18 12:01:29

破解囚徒困境与樱桃案例:约束 + 信任的双轮驱动机制设计

破解囚徒困境与樱桃案例&#xff1a;约束 信任的双轮驱动机制设计破解两类困境的核心逻辑是双轮驱动&#xff1a;通过 “约束机制” 抬高背叛成本、压缩背叛收益&#xff0c;通过 “信任机制” 降低合作风险、强化合作回报&#xff0c;最终让 “合作” 成为个体的最优选择&…

作者头像 李华
网站建设 2026/2/13 9:18:52

基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的木材表面缺陷检测系统(深度学习+Python代码+UI界面+训练数据集)

摘要 随着木材加工业的快速发展&#xff0c;自动化缺陷检测成为提高生产效率和产品质量的关键技术。本文详细介绍了基于YOLOv5/v6/v7/v8的木材表面缺陷检测系统的完整实现方案&#xff0c;包括算法原理、数据集构建、模型训练、系统部署和用户界面设计。该系统能够实时检测木材…

作者头像 李华
网站建设 2026/2/18 6:14:27

泰山OFFICE开源:为了文档新布局

我在泰山的时候&#xff0c;因为痛恨布局与微软相差太大&#xff0c;所以决心重写布局。新布局思路完全不同于以前&#xff0c;可以说是石破天惊&#xff0c;为此申请了一系列专利。代码是基于泰山OFFICE3.3。为什么不基于5.0&#xff1f;一方面是我动手早&#xff0c;相当于预…

作者头像 李华