news 2026/5/29 0:53:45

Python爬虫实战:利用Playwright与Asyncio高效采集在线教育课程数据

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python爬虫实战:利用Playwright与Asyncio高效采集在线教育课程数据

摘要

在数字教育蓬勃发展的今天,在线教育平台积累了海量优质课程资源。本文将详细介绍如何使用Python最新爬虫技术,结合Playwright异步浏览器自动化工具,构建一个高效、稳定的在线教育课程采集系统。我们将涵盖反爬虫绕过、动态内容渲染、数据持久化等关键技术,并提供完整的可执行代码。

技术栈概览

  • Playwright: 微软开发的跨浏览器自动化工具,支持无头模式,渲染完整JavaScript

  • Asyncio: Python原生异步IO框架,实现高并发采集

  • BeautifulSoup4: HTML解析库

  • Pandas: 数据处理与分析

  • MongoDB/PostgreSQL: 数据存储方案

  • 代理IP池与用户代理轮换: 避免IP封锁

一、环境配置与安装

python

# requirements.txt playwright==1.40.0 asyncio==3.4.3 beautifulsoup4==4.12.2 pandas==2.1.3 aiohttp==3.9.1 motor==3.3.2 # 异步MongoDB驱动 asyncpg==0.29.0 # 异步PostgreSQL驱动 fake-useragent==1.4.0 redis==5.0.1

安装命令:

bash

pip install -r requirements.txt playwright install chromium # 安装浏览器驱动

二、爬虫架构设计

python

""" 在线教育课程采集系统架构 ┌─────────────────────────────────────────┐ │ 调度管理器 │ │ (Scheduler with Priority Queue) │ └─────────────────┬───────────────────────┘ │ ┌─────────────┼───────────────────────┐ ▼ ▼ ▼ ┌─────────┐ ┌─────────┐ ┌─────────────┐ │ 请求队列 │ │ 代理池 │ │ 用户代理池 │ │ (Redis) │ │ (Redis) │ │ (轮换) │ └────┬────┘ └────┬────┘ └──────┬──────┘ │ │ │ └───────────┼────────────────────────┘ ▼ ┌────────────────┐ │ 异步爬虫引擎 │ │ (Playwright) │ └────────┬───────┘ │ ┌────────────┼────────────┐ ▼ ▼ ▼ ┌───────┐ ┌─────────┐ ┌─────────┐ │解析器 │ │去重过滤器│ │异常处理器│ │(BS4) │ │ (Bloom) │ │(重试机制)│ └───┬───┘ └─────────┘ └─────────┘ │ ▼ ┌─────────────────────────┐ │ 数据管道 │ │ (清洗 → 验证 → 存储) │ └─────────────────────────┘ """

三、完整爬虫实现代码

python

""" online_education_crawler.py 在线教育课程全量采集爬虫 支持:Coursera、edX、Udemy、网易云课堂、慕课网等主流平台 """ import asyncio import logging from typing import Dict, List, Optional, Any from dataclasses import dataclass, asdict from datetime import datetime import json import hashlib from urllib.parse import urlparse, urljoin import random import time # 第三方库导入 from playwright.async_api import async_playwright, Page, Browser, BrowserContext from bs4 import BeautifulSoup import pandas as pd import aiohttp from fake_useragent import UserAgent from motor.motor_asyncio import AsyncIOMotorClient import asyncpg import redis.asyncio as redis from bloom_filter import BloomFilter # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('education_crawler.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 数据模型 @dataclass class Course: """课程数据模型""" platform: str # 平台名称 title: str # 课程标题 url: str # 课程链接 instructors: List[str] # 讲师列表 rating: Optional[float] # 评分 rating_count: Optional[int] # 评分人数 duration: Optional[str] # 课程时长 level: Optional[str] # 难度级别 language: Optional[str] # 授课语言 subtitle_languages: List[str] # 字幕语言 price: Optional[float] # 价格 original_price: Optional[float] # 原价 discount: Optional[float] # 折扣 students_enrolled: Optional[int] # 报名人数 description: str # 课程描述 learning_outcomes: List[str] # 学习成果 syllabus: List[Dict[str, Any]] # 课程大纲 category: str # 分类 subcategory: str # 子分类 tags: List[str] # 标签 created_at: datetime # 创建时间 updated_at: datetime # 更新时间 popularity_score: Optional[float] # 热度评分 def to_dict(self) -> Dict: """转换为字典""" data = asdict(self) data['created_at'] = data['created_at'].isoformat() data['updated_at'] = data['updated_at'].isoformat() return data @classmethod def generate_id(cls, platform: str, title: str) -> str: """生成唯一ID""" unique_str = f"{platform}_{title}" return hashlib.md5(unique_str.encode()).hexdigest() class ProxyPool: """代理IP池管理""" def __init__(self, redis_url: str = "redis://localhost:6379"): self.redis = None self.redis_url = redis_url async def init(self): """初始化Redis连接""" self.redis = await redis.from_url(self.redis_url) async def add_proxy(self, proxy: str): """添加代理""" await self.redis.sadd('proxies', proxy) async def get_random_proxy(self) -> Optional[str]: """获取随机代理""" proxies = await self.redis.smembers('proxies') return random.choice(list(proxies)) if proxies else None async def remove_proxy(self, proxy: str): """移除失效代理""" await self.redis.srem('proxies', proxy) class EducationCrawler: """在线教育课程爬虫核心类""" def __init__(self, config: Dict): self.config = config self.user_agent = UserAgent() self.proxy_pool = ProxyPool() self.bloom_filter = BloomFilter(max_elements=1000000, error_rate=0.001) self.stats = { 'total_crawled': 0, 'success': 0, 'failed': 0, 'duplicates': 0 } # 平台特定的选择器配置 self.platform_selectors = { 'coursera': { 'course_card': 'div.rc-DesktopSearchCard', 'title': 'h2[data-testid="search-result-card-title"]', 'instructor': 'span[data-testid="search-result-card-instructor"]', 'rating': 'span[data-testid="search-result-card-rating"]', 'enrollment': 'span[data-testid="search-result-card-enrollments"]' }, 'udemy': { 'course_card': 'div[data-purpose="search-course-card"]', 'title': 'div[data-purpose="course-title-url"]', 'instructor': 'div[data-purpose="safely-set-inner-html:course-card:visible-instructors"]', 'rating': 'span[data-purpose="rating-number"]', 'price': 'div[data-purpose="course-price-text"]' } # 可扩展其他平台... } async def init_resources(self): """初始化资源""" await self.proxy_pool.init() logger.info("爬虫资源初始化完成") async def crawl_platform(self, platform: str, base_url: str, search_params: Dict): """ 爬取特定平台课程 Args: platform: 平台名称 base_url: 基础URL search_params: 搜索参数 """ logger.info(f"开始爬取 {platform} 平台") async with async_playwright() as p: # 启动浏览器(可配置无头模式) browser = await p.chromium.launch( headless=self.config.get('headless', True), proxy=await self._get_proxy_config(), args=[ '--disable-blink-features=AutomationControlled', '--disable-dev-shm-usage', '--no-sandbox', '--disable-setuid-sandbox' ] ) # 创建上下文(模拟真实浏览器) context = await browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent=self.user_agent.random, locale='zh-CN', timezone_id='Asia/Shanghai' ) # 添加反爬虫绕过脚本 await self._inject_stealth_scripts(context) page = await context.new_page() try: # 构建搜索URL search_url = self._build_search_url(base_url, search_params) await page.goto(search_url, wait_until='networkidle') # 处理可能的登录弹窗 await self._handle_popups(page) # 滚动加载更多内容 await self._auto_scroll(page) # 提取课程列表 courses = await self._extract_courses(page, platform) # 并发处理课程详情 await self._process_course_details(page, courses, platform) except Exception as e: logger.error(f"爬取{platform}失败: {str(e)}") finally: await browser.close() async def _extract_courses(self, page: Page, platform: str) -> List[Dict]: """提取课程基本信息""" selector_config = self.platform_selectors.get(platform, {}) course_cards = await page.query_selector_all( selector_config.get('course_card', 'div[class*="course"], div[class*="card"]') ) courses = [] for card in course_cards: try: # 提取课程信息 title_elem = await card.query_selector(selector_config.get('title', 'h2, h3')) title = await title_elem.text_content() if title_elem else "未命名" link_elem = await card.query_selector('a[href*="/course/"], a[href*="/learn/"]') url = await link_elem.get_attribute('href') if link_elem else "" url = urljoin(self.config['base_urls'][platform], url) if url else "" # 检查去重 course_id = Course.generate_id(platform, title) if course_id in self.bloom_filter: self.stats['duplicates'] += 1 continue self.bloom_filter.add(course_id) course_data = { 'platform': platform, 'title': title.strip(), 'url': url, 'course_id': course_id } courses.append(course_data) except Exception as e: logger.warning(f"提取课程信息失败: {str(e)}") continue logger.info(f"从{platform}提取到{len(courses)}门课程") return courses async def _process_course_details(self, page: Page, courses: List[Dict], platform: str): """处理课程详情页""" semaphore = asyncio.Semaphore(self.config.get('concurrency', 5)) async def process_course(course: Dict): async with semaphore: try: # 创建新页面访问详情 detail_page = await page.context.new_page() # 设置随机延迟,避免请求过快 await asyncio.sleep(random.uniform(1, 3)) await detail_page.goto(course['url'], wait_until='networkidle') # 提取详情信息 detail_data = await self._extract_course_details(detail_page, platform) # 合并数据 course.update(detail_data) # 保存数据 await self._save_course_data(course) self.stats['success'] += 1 logger.info(f"成功保存课程: {course['title']}") await detail_page.close() except Exception as e: self.stats['failed'] += 1 logger.error(f"处理课程失败 {course['url']}: {str(e)}") # 并发处理课程详情 tasks = [process_course(course) for course in courses[:self.config.get('max_courses', 100)]] await asyncio.gather(*tasks, return_exceptions=True) async def _extract_course_details(self, page: Page, platform: str) -> Dict: """提取课程详情信息""" # 等待主要内容加载 await page.wait_for_selector('main, .course-content, .course-details', timeout=10000) # 获取页面HTML html = await page.content() soup = BeautifulSoup(html, 'html.parser') # 平台特定的解析逻辑 if platform == 'coursera': return self._parse_coursera_details(soup) elif platform == 'udemy': return self._parse_udemy_details(soup) else: return self._parse_generic_details(soup) def _parse_coursera_details(self, soup: BeautifulSoup) -> Dict: """解析Coursera课程详情""" details = {} try: # 提取讲师信息 instructor_elements = soup.select('div.instructor-count, span.instructor-name') details['instructors'] = [elem.get_text(strip=True) for elem in instructor_elements] # 提取评分 rating_elem = soup.select_one('div.ratings-text, span[data-testid="ratings-count"]') if rating_elem: rating_text = rating_elem.get_text(strip=True) details['rating'] = float(rating_text.split()[0]) # 提取描述 desc_elem = soup.select_one('div.description, div.course-description') details['description'] = desc_elem.get_text(strip=True) if desc_elem else "" # 提取大纲 syllabus_items = soup.select('div.accordion-item, div.syllabus-item') details['syllabus'] = [ { 'week': item.select_one('h3, .week-title').get_text(strip=True) if item.select_one('h3') else f"第{i+1}周", 'content': item.select_one('.content, .topics').get_text(strip=True) if item.select_one('.content') else "" } for i, item in enumerate(syllabus_items[:10]) # 限制前10周 ] except Exception as e: logger.warning(f"解析Coursera详情失败: {str(e)}") return details def _parse_udemy_details(self, soup: BeautifulSoup) -> Dict: """解析Udemy课程详情""" details = {} try: # 提取价格信息 price_elem = soup.select_one('div[data-purpose="price-text"]') if price_elem: price_text = price_elem.get_text(strip=True) details['price'] = self._extract_price(price_text) # 提取学生人数 students_elem = soup.select_one('div[data-purpose="enrollment"]') if students_elem: students_text = students_elem.get_text(strip=True) details['students_enrolled'] = self._extract_number(students_text) # 提取课程时长 duration_elem = soup.select_one('div[data-purpose="content-length"]') if duration_elem: details['duration'] = duration_elem.get_text(strip=True) except Exception as e: logger.warning(f"解析Udemy详情失败: {str(e)}") return details async def _save_course_data(self, course_data: Dict): """保存课程数据到数据库""" try: # 转换为Course对象 course = Course( platform=course_data['platform'], title=course_data['title'], url=course_data['url'], instructors=course_data.get('instructors', []), rating=course_data.get('rating'), rating_count=course_data.get('rating_count'), duration=course_data.get('duration'), level=course_data.get('level'), language=course_data.get('language', '英语'), subtitle_languages=course_data.get('subtitle_languages', []), price=course_data.get('price'), original_price=course_data.get('original_price'), discount=course_data.get('discount'), students_enrolled=course_data.get('students_enrolled'), description=course_data.get('description', ''), learning_outcomes=course_data.get('learning_outcomes', []), syllabus=course_data.get('syllabus', []), category=course_data.get('category', '未分类'), subcategory=course_data.get('subcategory', ''), tags=course_data.get('tags', []), created_at=datetime.now(), updated_at=datetime.now(), popularity_score=self._calculate_popularity(course_data) ) # 保存到MongoDB if self.config.get('mongodb_uri'): await self._save_to_mongodb(course) # 保存到PostgreSQL if self.config.get('postgresql_uri'): await self._save_to_postgresql(course) # 保存到CSV(备份) await self._save_to_csv(course) except Exception as e: logger.error(f"保存课程数据失败: {str(e)}") async def _save_to_mongodb(self, course: Course): """保存到MongoDB""" client = AsyncIOMotorClient(self.config['mongodb_uri']) db = client.education collection = db.courses # 使用课程ID作为唯一标识 course_dict = course.to_dict() course_dict['_id'] = Course.generate_id(course.platform, course.title) # 更新或插入 await collection.update_one( {'_id': course_dict['_id']}, {'$set': course_dict}, upsert=True ) client.close() async def _save_to_postgresql(self, course: Course): """保存到PostgreSQL""" conn = await asyncpg.connect(self.config['postgresql_uri']) await conn.execute(''' INSERT INTO courses ( id, platform, title, url, instructors, rating, rating_count, duration, level, language, subtitle_languages, price, original_price, discount, students_enrolled, description, learning_outcomes, syllabus, category, subcategory, tags, created_at, updated_at, popularity_score ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24) ON CONFLICT (id) DO UPDATE SET updated_at = EXCLUDED.updated_at, rating = EXCLUDED.rating, students_enrolled = EXCLUDED.students_enrolled, price = EXCLUDED.price ''', *self._prepare_postgresql_data(course)) await conn.close() async def _save_to_csv(self, course: Course): """保存到CSV文件""" df = pd.DataFrame([course.to_dict()]) # 追加到CSV文件 try: existing_df = pd.read_csv('courses.csv') updated_df = pd.concat([existing_df, df], ignore_index=True) updated_df.to_csv('courses.csv', index=False, encoding='utf-8-sig') except FileNotFoundError: df.to_csv('courses.csv', index=False, encoding='utf-8-sig') def _calculate_popularity(self, course_data: Dict) -> float: """计算课程热度评分""" score = 0.0 # 基于学生人数 if course_data.get('students_enrolled'): students = course_data['students_enrolled'] if students > 100000: score += 30 elif students > 10000: score += 20 elif students > 1000: score += 10 # 基于评分 if course_data.get('rating'): rating = course_data['rating'] score += rating * 10 # 基于折扣 if course_data.get('discount') and course_data['discount'] > 50: score += 5 return min(score, 100) async def _inject_stealth_scripts(self, context: BrowserContext): """注入反爬虫绕过脚本""" await context.add_init_script(""" // 覆盖webdriver属性 Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); // 覆盖plugins属性 Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] }); // 覆盖languages属性 Object.defineProperty(navigator, 'languages', { get: () => ['zh-CN', 'zh', 'en'] }); // 隐藏自动化特征 window.chrome = { runtime: {}, loadTimes: function(){}, csi: function(){}, app: {} }; // 覆盖permissions API const originalQuery = window.navigator.permissions.query; window.navigator.permissions.query = (parameters) => ( parameters.name === 'notifications' ? Promise.resolve({ state: Notification.permission }) : originalQuery(parameters) ); """) async def _handle_popups(self, page: Page): """处理弹窗""" try: # 等待并关闭可能的弹窗 popup_selectors = [ 'button[aria-label="Close"]', 'div[class*="modal"] button', 'div[class*="popup"] button', 'button[class*="close"]' ] for selector in popup_selectors: try: close_btn = await page.wait_for_selector(selector, timeout=3000) if close_btn: await close_btn.click() await asyncio.sleep(1) except: continue except Exception as e: logger.debug(f"处理弹窗时出错: {str(e)}") async def _auto_scroll(self, page: Page): """自动滚动加载更多内容""" scroll_pause_time = 1 last_height = await page.evaluate('document.body.scrollHeight') for _ in range(10): # 最大滚动次数 # 滚动到底部 await page.evaluate('window.scrollTo(0, document.body.scrollHeight)') await asyncio.sleep(scroll_pause_time) # 计算新高度 new_height = await page.evaluate('document.body.scrollHeight') if new_height == last_height: break last_height = new_height async def _get_proxy_config(self) -> Optional[Dict]: """获取代理配置""" proxy = await self.proxy_pool.get_random_proxy() if proxy: return { 'server': proxy, 'username': self.config.get('proxy_username'), 'password': self.config.get('proxy_password') } return None def _build_search_url(self, base_url: str, params: Dict) -> str: """构建搜索URL""" from urllib.parse import urlencode query_string = urlencode(params) return f"{base_url}?{query_string}" def _extract_price(self, text: str) -> Optional[float]: """从文本中提取价格""" import re matches = re.findall(r'[\d,.]+', text) if matches: return float(matches[0].replace(',', '')) return None def _extract_number(self, text: str) -> Optional[int]: """从文本中提取数字""" import re matches = re.findall(r'[\d,]+', text) if matches: return int(matches[0].replace(',', '')) return None def _prepare_postgresql_data(self, course: Course) -> List: """准备PostgreSQL数据""" return [ Course.generate_id(course.platform, course.title), course.platform, course.title, course.url, course.instructors, course.rating, course.rating_count, course.duration, course.level, course.language, course.subtitle_languages, course.price, course.original_price, course.discount, course.students_enrolled, course.description, course.learning_outcomes, json.dumps(course.syllabus), course.category, course.subcategory, course.tags, course.created_at, course.updated_at, course.popularity_score ] class Scheduler: """爬虫调度器""" def __init__(self, crawler: EducationCrawler): self.crawler = crawler self.platforms = [ { 'name': 'coursera', 'base_url': 'https://www.coursera.org/search', 'params': {'query': 'python data science machine learning'} }, { 'name': 'udemy', 'base_url': 'https://www.udemy.com/courses/search', 'params': {'q': 'programming', 'lang': 'en'} } ] async def run(self): """运行调度器""" logger.info("开始调度爬虫任务") # 并发爬取多个平台 tasks = [] for platform in self.platforms: task = self.crawler.crawl_platform( platform['name'], platform['base_url'], platform['params'] ) tasks.append(task) # 限制并发数 semaphore = asyncio.Semaphore(2) # 同时爬取2个平台 async with semaphore: results = await asyncio.gather(*tasks, return_exceptions=True) # 输出统计信息 self._print_statistics() def _print_statistics(self): """打印统计信息""" logger.info("=" * 50) logger.info("爬虫任务完成统计") logger.info(f"总计爬取: {self.crawler.stats['total_crawled']}") logger.info(f"成功: {self.crawler.stats['success']}") logger.info(f"失败: {self.crawler.stats['failed']}") logger.info(f"去重跳过: {self.crawler.stats['duplicates']}") logger.info("=" * 50) async def main(): """主函数""" # 配置参数 config = { 'headless': False, # 调试时可设为False查看浏览器 'concurrency': 5, # 并发数 'max_courses': 50, # 每个平台最大爬取数 'base_urls': { 'coursera': 'https://www.coursera.org', 'udemy': 'https://www.udemy.com' }, 'mongodb_uri': 'mongodb://localhost:27017', 'postgresql_uri': 'postgresql://user:password@localhost:5432/education' } # 创建爬虫实例 crawler = EducationCrawler(config) await crawler.init_resources() # 创建调度器并运行 scheduler = Scheduler(crawler) await scheduler.run() if __name__ == "__main__": # 创建数据库表(PostgreSQL) import asyncio import asyncpg async def init_database(): """初始化数据库表""" conn = await asyncpg.connect('postgresql://user:password@localhost:5432/education') await conn.execute(''' CREATE TABLE IF NOT EXISTS courses ( id VARCHAR(50) PRIMARY KEY, platform VARCHAR(50) NOT NULL, title TEXT NOT NULL, url TEXT NOT NULL, instructors TEXT[] DEFAULT '{}', rating FLOAT, rating_count INTEGER, duration VARCHAR(100), level VARCHAR(50), language VARCHAR(50), subtitle_languages TEXT[] DEFAULT '{}', price FLOAT, original_price FLOAT, discount FLOAT, students_enrolled INTEGER, description TEXT, learning_outcomes TEXT[] DEFAULT '{}', syllabus JSONB, category VARCHAR(100), subcategory VARCHAR(100), tags TEXT[] DEFAULT '{}', created_at TIMESTAMP NOT NULL, updated_at TIMESTAMP NOT NULL, popularity_score FLOAT, INDEX idx_platform (platform), INDEX idx_category (category), INDEX idx_price (price), INDEX idx_rating (rating DESC), INDEX idx_popularity (popularity_score DESC) ) ''') await conn.close() logger.info("数据库表初始化完成") # 运行初始化 asyncio.run(init_database()) # 运行爬虫 asyncio.run(main())

四、高级特性与优化

1. 智能反爬虫策略

python

class AntiAntiCrawler: """反反爬虫策略""" @staticmethod def rotate_user_agents(): """轮换用户代理""" ua = UserAgent() return { 'User-Agent': ua.random, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1' } @staticmethod def simulate_human_delay(): """模拟人类延迟""" delay = random.uniform(1, 5) time.sleep(delay) @staticmethod def random_mouse_movement(page: Page): """随机鼠标移动""" await page.mouse.move( random.randint(0, 1000), random.randint(0, 700) )

2. 分布式爬虫支持

python

class DistributedCrawler: """分布式爬虫协调器""" def __init__(self, redis_url: str): self.redis = redis.from_url(redis_url) self.task_queue = 'crawler_tasks' async def distribute_tasks(self, tasks: List[Dict]): """分发任务到队列""" for task in tasks: await self.redis.lpush(self.task_queue, json.dumps(task)) async def consume_tasks(self, worker_id: str): """消费任务""" while True: task_json = await self.redis.brpop(self.task_queue, timeout=30) if task_json: task = json.loads(task_json[1]) await self.process_task(task, worker_id) async def process_task(self, task: Dict, worker_id: str): """处理单个任务""" # 分布式任务处理逻辑 pass

3. 数据质量监控

python

class DataQualityMonitor: """数据质量监控""" @staticmethod def validate_course_data(course: Dict) -> bool: """验证课程数据完整性""" required_fields = ['title', 'url', 'platform', 'description'] for field in required_fields: if not course.get(field): logger.warning(f"课程数据缺失必要字段: {field}") return False # 验证URL格式 if not course['url'].startswith(('http://', 'https://')): logger.warning(f"课程URL格式错误: {course['url']}") return False # 验证评分范围 if course.get('rating'): if not 0 <= course['rating'] <= 5: logger.warning(f"评分超出范围: {course['rating']}") return False return True

五、部署与监控

Docker部署配置

dockerfile

# Dockerfile FROM python:3.11-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt \ && playwright install chromium \ && playwright install-deps COPY . . CMD ["python", "main.py"]

监控仪表板

python

# monitoring_dashboard.py from flask import Flask, jsonify import psutil import datetime app = Flask(__name__) @app.route('/metrics') def get_metrics(): """获取爬虫监控指标""" return jsonify({ 'timestamp': datetime.datetime.now().isoformat(), 'cpu_percent': psutil.cpu_percent(), 'memory_percent': psutil.virtual_memory().percent, 'disk_usage': psutil.disk_usage('/').percent, 'crawler_stats': crawler.stats # 从爬虫实例获取 })

六、法律与道德考量

  1. 遵守robots.txt: 始终检查目标网站的robots.txt文件

  2. 频率限制: 合理控制请求频率,避免对目标服务器造成压力

  3. 数据使用: 仅将数据用于个人学习或研究目的

  4. 版权尊重: 尊重课程内容的版权,不用于商业用途

  5. 隐私保护: 不收集个人信息,仅收集公开课程信息

七、性能优化建议

  1. 连接池复用: 使用aiohttp的连接池减少TCP握手

  2. CDN缓存: 对静态资源使用本地缓存

  3. 增量爬取: 仅爬取更新的课程

  4. 断点续传: 实现任务状态持久化

  5. 压缩传输: 启用gzip压缩减少网络传输

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/27 6:18:42

网盘直链下载助手+Hunyuan-MT-7B:极速获取并运行翻译模型

网盘直链下载助手 Hunyuan-MT-7B&#xff1a;极速获取并运行翻译模型 在企业全球化协作日益频繁的今天&#xff0c;一份技术文档、一封商务邮件或一段会议记录&#xff0c;往往需要在中英、中法甚至藏汉之间快速转换。传统的翻译工具要么依赖云端API&#xff0c;存在数据泄露风…

作者头像 李华
网站建设 2026/5/25 11:42:23

Gitee智能化转型:打造开发者生态与AI赋能的未来之路

Gitee智能化转型&#xff1a;打造开发者生态与AI赋能的未来之路 在数字化转型浪潮席卷全球的今天&#xff0c;Gitee作为中国开源社区的领军平台&#xff0c;正以其独特的智能化转型路径&#xff0c;重新定义开发者生态的未来图景。从最初的代码托管平台到如今集开发者社区、企业…

作者头像 李华
网站建设 2026/5/28 11:05:30

AI Agent完全指南:六大核心模块解析,助你构建智能协作体

AI Agent由感知交互、任务规划、记忆管理、工具调用、执行反馈和自主优化六大模块构成&#xff0c;形成"感知-规划-记忆-工具-执行-反馈-优化"的闭环协同。这种架构使AI Agent从"单一任务、被动响应"升级为"自主智能体"&#xff0c;具备理解需求…

作者头像 李华
网站建设 2026/5/26 16:04:42

‌DeFi借贷智能合约漏洞扫描测试:软件测试从业者指南

‌去中心化金融&#xff08;DeFi&#xff09;借贷平台依托智能合约实现自动化借贷&#xff0c;但合约漏洞可能导致资金损失&#xff08;如2022年Wormhole事件损失超3亿美元&#xff09;。截至2026年&#xff0c;DeFi市场规模已突破万亿美元&#xff0c;智能合约安全成为测试从业…

作者头像 李华
网站建设 2026/5/23 10:08:43

专业服务-众包测试:任务分配算法公平性测试

一、公平性在众包测试中的核心价值 众包测试通过分布式协作提升效率&#xff0c;但任务分配失衡将导致三重危机&#xff1a; 资源浪费&#xff1a;高技能测试者长期闲置 质量滑坡&#xff1a;关键任务分配给低胜任力人员 生态恶化&#xff1a;测试者流失率上升&#xff08;行…

作者头像 李华
网站建设 2026/5/24 14:52:26

Qwen3Guard-Gen-8B在会议纪要自动生成中的隐私保护机制

Qwen3Guard-Gen-8B在会议纪要自动生成中的隐私保护机制 在企业加速推进AI办公自动化的今天&#xff0c;一场普通的内部战略会可能正悄悄成为数据安全的“高危现场”&#xff1a;高管讨论人事调整、财务披露预算细节、技术团队透露未发布产品路线图——这些内容一旦被自动会议系…

作者头像 李华