news 2026/4/3 5:34:09

Python爬虫实战:最新异步技术抓取编程教程资源

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python爬虫实战:最新异步技术抓取编程教程资源

一、前言:为什么需要新一代爬虫技术?

在当今信息爆炸的时代,海量编程教程资源分散在各个网站平台,手动收集这些资源既耗时又低效。传统同步爬虫在面对大量请求时效率低下,而基于异步IO的新一代爬虫技术能够并发处理数百个请求,大幅提升数据采集效率。本文将详细介绍如何使用Python最新的异步爬虫技术,构建一个高效、稳定的编程教程资源抓取系统。

二、技术栈:现代Python爬虫的核心组件

2.1 核心框架选择

  • aiohttp:异步HTTP客户端/服务器框架,支持WebSocket

  • httpx:新一代HTTP客户端,同时支持同步和异步请求

  • playwright:微软开发的浏览器自动化工具,支持现代JavaScript渲染

2.2 解析与存储

  • parsel:Scrapy团队开发的HTML/XML解析库

  • BeautifulSoup4:经典的HTML解析库

  • asyncpg:异步PostgreSQL客户端

  • aiofiles:异步文件操作库

2.3 异步生态

  • asyncio:Python原生异步IO框架

  • anyio:高级异步编程抽象层

  • uvloop:超快速异步事件循环(性能提升2-4倍)

三、完整代码实现:异步爬虫系统

python

""" 现代异步编程教程资源爬虫系统 支持并发抓取、自动去重、断点续爬、反爬绕过等功能 """ import asyncio import aiohttp import aiofiles from typing import List, Dict, Optional from dataclasses import dataclass, asdict from urllib.parse import urljoin, urlparse import hashlib import json import logging from datetime import datetime from contextlib import asynccontextmanager import ssl import certifi # 配置日志系统 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('crawler.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) @dataclass class TutorialResource: """教程资源数据模型""" title: str url: str description: str category: str file_type: str # pdf, video, zip等 file_size: str download_url: str source_site: str upload_date: str tags: List[str] language: str = "中文" difficulty: str = "中级" rating: float = 0.0 class AsyncTutorialCrawler: """异步教程资源爬虫核心类""" def __init__(self, max_concurrent: int = 100, request_timeout: int = 30, use_proxy: bool = False): """ 初始化爬虫 Args: max_concurrent: 最大并发数 request_timeout: 请求超时时间(秒) use_proxy: 是否使用代理 """ self.max_concurrent = max_concurrent self.request_timeout = request_timeout self.use_proxy = use_proxy self.visited_urls = set() self.resources = [] # SSL上下文配置 self.ssl_context = ssl.create_default_context(cafile=certifi.where()) # 请求头配置(模拟浏览器) self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'Cache-Control': 'max-age=0', } # 网站配置(示例) self.site_configs = { 'tutorialspoint': { 'base_url': 'https://www.tutorialspoint.com', 'selectors': { 'resource_list': '.resource-list > div', 'title': 'h2 a::text', 'url': 'h2 a::attr(href)', 'description': '.description::text', 'category': '.category::text', 'file_type': '.file-type::text' } }, 'freecodecamp': { 'base_url': 'https://www.freecodecamp.org', 'api_endpoint': '/api/v1/tutorials' } } @asynccontextmanager async def create_session(self): """创建aiohttp会话上下文""" connector = aiohttp.TCPConnector( limit=self.max_concurrent, ssl=self.ssl_context, force_close=True, enable_cleanup_closed=True ) timeout = aiohttp.ClientTimeout(total=self.request_timeout) async with aiohttp.ClientSession( connector=connector, timeout=timeout, headers=self.headers ) as session: yield session def generate_url_hash(self, url: str) -> str: """生成URL的MD5哈希值用于去重""" return hashlib.md5(url.encode('utf-8')).hexdigest() async def fetch_with_retry(self, session: aiohttp.ClientSession, url: str, retries: int = 3, delay: float = 1.0) -> Optional[str]: """ 带重试机制的请求函数 Args: session: aiohttp会话 url: 请求URL retries: 重试次数 delay: 重试延迟(秒) Returns: 响应文本或None """ url_hash = self.generate_url_hash(url) # 检查是否已访问 if url_hash in self.visited_urls: logger.debug(f"跳过已访问URL: {url}") return None for attempt in range(retries): try: async with session.get(url, ssl=self.ssl_context) as response: if response.status == 200: self.visited_urls.add(url_hash) text = await response.text() logger.info(f"成功获取: {url} (尝试 {attempt + 1})") return text elif response.status == 429: # 太多请求 wait_time = delay * (2 ** attempt) # 指数退避 logger.warning(f"触发限流,等待 {wait_time} 秒") await asyncio.sleep(wait_time) else: logger.error(f"HTTP错误 {response.status}: {url}") return None except (aiohttp.ClientError, asyncio.TimeoutError) as e: logger.warning(f"请求失败 (尝试 {attempt + 1}/{retries}): {url} - {e}") if attempt < retries - 1: await asyncio.sleep(delay * (2 ** attempt)) else: logger.error(f"请求最终失败: {url}") return None return None async def parse_tutorialspoint(self, html: str, base_url: str) -> List[TutorialResource]: """解析tutorialspoint网站""" from parsel import Selector selector = Selector(html) resources = [] config = self.site_configs['tutorialspoint'] for item in selector.css(config['selectors']['resource_list']): try: title = item.css(config['selectors']['title']).get('').strip() relative_url = item.css(config['selectors']['url']).get('') url = urljoin(base_url, relative_url) resource = TutorialResource( title=title, url=url, description=item.css(config['selectors']['description']).get('')[:200], category=item.css(config['selectors']['category']).get('未分类'), file_type=item.css(config['selectors']['file_type']).get('未知'), file_size="待获取", download_url="", # 需要进一步解析详情页 source_site="tutorialspoint", upload_date=datetime.now().strftime("%Y-%m-%d"), tags=["编程", "教程"] ) resources.append(resource) except Exception as e: logger.error(f"解析失败: {e}") continue return resources async def parse_freecodecamp_api(self, data: dict) -> List[TutorialResource]: """解析freecodecamp API数据""" resources = [] for item in data.get('tutorials', []): try: resource = TutorialResource( title=item.get('title', ''), url=item.get('url', ''), description=item.get('description', ''), category=item.get('category', 'programming'), file_type='text', # 主要是文章 file_size='N/A', download_url=item.get('download_url', ''), source_site="freecodecamp", upload_date=item.get('published_at', ''), tags=item.get('tags', []), difficulty=item.get('difficulty', 'beginner'), rating=float(item.get('rating', 0)) ) resources.append(resource) except Exception as e: logger.error(f"解析API数据失败: {e}") continue return resources async def download_resource(self, session: aiohttp.ClientSession, resource: TutorialResource, save_dir: str = "downloads"): """异步下载资源文件""" if not resource.download_url: return try: async with session.get(resource.download_url, ssl=self.ssl_context) as response: if response.status == 200: # 获取文件扩展名 content_disposition = response.headers.get('Content-Disposition', '') filename = resource.title.replace(' ', '_') + self._get_file_extension(resource.file_type) # 异步保存文件 filepath = f"{save_dir}/{filename}" async with aiofiles.open(filepath, 'wb') as f: await f.write(await response.read()) logger.info(f"下载完成: {filename}") resource.file_size = f"{response.content_length / 1024 / 1024:.2f} MB" except Exception as e: logger.error(f"下载失败 {resource.title}: {e}") def _get_file_extension(self, file_type: str) -> str: """根据文件类型获取扩展名""" extensions = { 'pdf': '.pdf', 'video': '.mp4', 'zip': '.zip', 'text': '.txt', 'html': '.html', 'markdown': '.md' } return extensions.get(file_type.lower(), '.bin') async def crawl_site(self, site_name: str, start_url: str = None): """爬取指定网站""" async with self.create_session() as session: if site_name == 'freecodecamp': # API方式获取 api_url = self.site_configs[site_name]['api_endpoint'] full_url = self.site_configs[site_name]['base_url'] + api_url async with session.get(full_url) as response: if response.status == 200: data = await response.json() resources = await self.parse_freecodecamp_api(data) self.resources.extend(resources) else: # HTML解析方式 base_url = start_url or self.site_configs[site_name]['base_url'] html = await self.fetch_with_retry(session, base_url) if html: if site_name == 'tutorialspoint': resources = await self.parse_tutorialspoint(html, base_url) self.resources.extend(resources) # 可以添加更多网站的解析逻辑 async def concurrent_crawl(self, sites: List[Dict]): """并发爬取多个网站""" tasks = [] for site in sites: task = asyncio.create_task(self.crawl_site(site['name'], site.get('url'))) tasks.append(task) # 使用asyncio.gather并发执行 results = await asyncio.gather(*tasks, return_exceptions=True) # 处理异常 for i, result in enumerate(results): if isinstance(result, Exception): logger.error(f"爬取任务失败 {sites[i]['name']}: {result}") async def save_results(self, format: str = 'json'): """保存爬取结果""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if format == 'json': filename = f"tutorial_resources_{timestamp}.json" data = [asdict(resource) for resource in self.resources] async with aiofiles.open(filename, 'w', encoding='utf-8') as f: await f.write(json.dumps(data, ensure_ascii=False, indent=2)) logger.info(f"结果已保存至 {filename}") elif format == 'csv': import csv filename = f"tutorial_resources_{timestamp}.csv" async with aiofiles.open(filename, 'w', encoding='utf-8', newline='') as f: writer = csv.DictWriter(f, fieldnames=list(asdict(self.resources[0]).keys())) await writer.writeheader() for resource in self.resources: await writer.writerow(asdict(resource)) logger.info(f"结果已保存至 {filename}") async def get_statistics(self) -> Dict: """获取爬取统计信息""" stats = { "total_resources": len(self.resources), "sites_count": len(set(r.source_site for r in self.resources)), "categories": {}, "file_types": {} } for resource in self.resources: # 统计分类 stats["categories"][resource.category] = \ stats["categories"].get(resource.category, 0) + 1 # 统计文件类型 stats["file_types"][resource.file_type] = \ stats["file_types"].get(resource.file_type, 0) + 1 return stats async def main(): """主函数""" # 初始化爬虫(配置高并发) crawler = AsyncTutorialCrawler( max_concurrent=50, request_timeout=45, use_proxy=False ) # 定义要爬取的网站 sites_to_crawl = [ {'name': 'tutorialspoint'}, {'name': 'freecodecamp'}, # 可以添加更多网站 ] logger.info("开始爬取编程教程资源...") try: # 并发爬取 await crawler.concurrent_crawl(sites_to_crawl) # 下载资源文件(可选) # async with crawler.create_session() as session: # download_tasks = [] # for resource in crawler.resources[:10]: # 限制前10个 # task = crawler.download_resource(session, resource) # download_tasks.append(task) # await asyncio.gather(*download_tasks) # 保存结果 await crawler.save_results('json') await crawler.save_results('csv') # 显示统计信息 stats = await crawler.get_statistics() logger.info(f"爬取完成!统计信息:") logger.info(f"总计资源: {stats['total_resources']}个") logger.info(f"来源网站: {stats['sites_count']}个") logger.info(f"分类分布: {stats['categories']}") except KeyboardInterrupt: logger.info("用户中断爬取过程") except Exception as e: logger.error(f"爬取过程发生错误: {e}") finally: logger.info("爬虫运行结束") if __name__ == "__main__": # 使用uvloop提升性能(Linux/Mac) try: import uvloop uvloop.install() logger.info("使用uvloop加速") except ImportError: logger.info("使用标准asyncio事件循环") # 运行主函数 asyncio.run(main())

四、高级功能扩展

4.1 Playwright动态渲染支持

python

async def crawl_js_heavy_site(url: str): """处理JavaScript动态渲染的网站""" from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' ) page = await context.new_page() await page.goto(url, wait_until='networkidle') # 等待内容加载 await page.wait_for_selector('.resource-item', timeout=10000) # 获取渲染后的HTML content = await page.content() await browser.close() return content

4.2 分布式爬虫支持

python

import redis.asyncio as redis from celery import Celery # Redis连接池 redis_pool = redis.ConnectionPool.from_url('redis://localhost:6379/0') class DistributedCrawler: def __init__(self): self.redis = redis.Redis(connection_pool=redis_pool) self.celery_app = Celery('crawler', broker='redis://localhost:6379/0') async def distribute_tasks(self, urls: List[str]): """分布式任务分发""" for url in urls: await self.redis.lpush('crawl_queue', url)

4.3 智能限速与反反爬策略

python

class RateLimiter: """智能请求限速器""" def __init__(self, requests_per_minute: int = 60): self.requests_per_minute = requests_per_minute self.request_times = [] async def acquire(self): """获取请求许可""" now = asyncio.get_event_loop().time() # 清理过期的请求记录 self.request_times = [t for t in self.request_times if now - t < 60] if len(self.request_times) >= self.requests_per_minute: # 计算需要等待的时间 oldest = self.request_times[0] wait_time = 60 - (now - oldest) if wait_time > 0: await asyncio.sleep(wait_time) self.request_times.append(now)

五、部署与监控

5.1 Docker部署配置

dockerfile

FROM python:3.11-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ gcc \ libpq-dev \ && rm -rf /var/lib/apt/lists/* # 安装Python依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制源代码 COPY . . # 运行爬虫 CMD ["python", "-m", "crawler.main"]

5.2 监控配置

python

from prometheus_client import start_http_server, Counter, Histogram # 定义监控指标 REQUESTS_TOTAL = Counter('crawler_requests_total', 'Total requests') REQUEST_DURATION = Histogram('crawler_request_duration_seconds', 'Request duration') @REQUEST_DURATION.time() async def monitored_fetch(session, url): """带监控的请求函数""" REQUESTS_TOTAL.inc() return await session.get(url)

六、最佳实践与注意事项

6.1 遵守robots.txt

python

from urllib.robotparser import RobotFileParser async def check_robots_txt(url: str): """检查robots.txt协议""" rp = RobotFileParser() base_url = f"{urlparse(url).scheme}://{urlparse(url).netloc}" rp.set_url(f"{base_url}/robots.txt") rp.read() return rp.can_fetch("*", url)

6.2 数据清洗与去重

python

import pandas as pd from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity def deduplicate_resources(resources: List[TutorialResource], similarity_threshold: float = 0.9): """基于内容相似度的去重""" titles = [r.title for r in resources] # 计算TF-IDF向量 vectorizer = TfidfVectorizer() title_vectors = vectorizer.fit_transform(titles) # 计算相似度矩阵 similarity_matrix = cosine_similarity(title_vectors) # 找出重复项 duplicates = set() for i in range(len(resources)): for j in range(i + 1, len(resources)): if similarity_matrix[i, j] > similarity_threshold: duplicates.add(j) # 返回去重后的列表 return [r for idx, r in enumerate(resources) if idx not in duplicates]

七、总结

本文详细介绍了使用Python最新异步技术构建高效爬虫的完整方案。通过采用aiohttp、playwright等现代库,结合智能限速、分布式部署等高级特性,可以构建出稳定高效的编程教程资源采集系统。在实际应用中,请务必:

  1. 遵守目标网站的robots.txt协议

  2. 设置合理的请求间隔,避免对目标服务器造成压力

  3. 尊重版权,仅下载允许自由传播的资源

  4. 定期更新爬虫以适应网站改版

  5. 实施完善的数据验证和清洗流程

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/3 4:49:38

编码器十年演进(2015–2025)

编码器十年演进&#xff08;2015–2025&#xff09; 一句话总论&#xff1a; 2015年编码器还是“有感霍尔/光电低分辨率集中式信号处理”的传统时代&#xff0c;2025年已进化成“无感高精度磁/电容编码器分布式一体化端到端VLA自校准量子级抗扰自愈”的具身智能时代&#xff0c…

作者头像 李华
网站建设 2026/3/28 10:46:39

商业化应用前景:基于lora-scripts的服务模式创新

商业化应用前景&#xff1a;基于lora-scripts的服务模式创新 在AI生成内容&#xff08;AIGC&#xff09;浪潮席卷各行各业的今天&#xff0c;一个核心矛盾日益凸显&#xff1a;通用大模型虽然强大&#xff0c;却难以精准满足企业或创作者对风格、术语、角色和输出格式的高度定制…

作者头像 李华
网站建设 2026/4/3 4:41:58

vue+uniapp+springboot小程序餐饮美食点单系统

文章目录系统概述技术架构应用价值关键词主要技术与实现手段系统设计与实现的思路系统设计方法java类核心代码部分展示结论源码lw获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;系统概述 VueUniappSpringBoot小程序餐饮美食点单系统是一款基于…

作者头像 李华
网站建设 2026/4/1 10:48:52

破解囚徒困境与樱桃案例:约束 + 信任的双轮驱动机制设计

破解囚徒困境与樱桃案例&#xff1a;约束 信任的双轮驱动机制设计破解两类困境的核心逻辑是双轮驱动&#xff1a;通过 “约束机制” 抬高背叛成本、压缩背叛收益&#xff0c;通过 “信任机制” 降低合作风险、强化合作回报&#xff0c;最终让 “合作” 成为个体的最优选择&…

作者头像 李华
网站建设 2026/3/30 3:06:55

基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的木材表面缺陷检测系统(深度学习+Python代码+UI界面+训练数据集)

摘要 随着木材加工业的快速发展&#xff0c;自动化缺陷检测成为提高生产效率和产品质量的关键技术。本文详细介绍了基于YOLOv5/v6/v7/v8的木材表面缺陷检测系统的完整实现方案&#xff0c;包括算法原理、数据集构建、模型训练、系统部署和用户界面设计。该系统能够实时检测木材…

作者头像 李华
网站建设 2026/3/23 2:10:40

泰山OFFICE开源:为了文档新布局

我在泰山的时候&#xff0c;因为痛恨布局与微软相差太大&#xff0c;所以决心重写布局。新布局思路完全不同于以前&#xff0c;可以说是石破天惊&#xff0c;为此申请了一系列专利。代码是基于泰山OFFICE3.3。为什么不基于5.0&#xff1f;一方面是我动手早&#xff0c;相当于预…

作者头像 李华