news 2026/4/20 15:30:11

基于Playwright与异步技术的餐厅点评数据采集:新一代Python爬虫实战指南

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
基于Playwright与异步技术的餐厅点评数据采集:新一代Python爬虫实战指南

引言:餐厅点评数据采集的重要性与挑战

在数字化餐饮时代,餐厅点评数据已成为消费者决策、商家运营和行业分析的关键信息资源。这些数据包含了用户评分、评论内容、人均消费、推荐菜品等多维度信息,对餐饮行业的市场研究、竞争分析和用户体验优化具有重要价值。然而,随着各大平台反爬机制的日益完善,传统爬虫技术面临着前所未有的挑战。

本文将介绍如何使用最新Python爬虫技术(Playwright + Asyncio + 智能解析)构建一个高效、稳定的餐厅点评数据采集系统,并提供完整的实战代码和最佳实践。

目录

引言:餐厅点评数据采集的重要性与挑战

技术选型:为什么选择这些最新技术?

1. Playwright vs Selenium vs Requests

2. 核心工具栈

实战项目:多平台餐厅点评数据采集系统

系统架构设计

完整代码实现

高级功能与优化策略

1. 智能代理池管理

2. 分布式爬虫架构

3. 数据质量监控

法律与伦理考量

合规爬虫实践

性能优化技巧


技术选型:为什么选择这些最新技术?

1. Playwright vs Selenium vs Requests

  • Playwright:微软开发,支持Chromium、Firefox和WebKit,内置自动等待机制,API设计更现代化

  • 异步支持:原生支持异步操作,性能远超传统同步爬虫

  • 防检测能力:更好的模拟真实浏览器行为,规避反爬检测

2. 核心工具栈

  • 爬虫框架:Playwright + Asyncio

  • 数据解析:BeautifulSoup4 / Parsel

  • 数据存储:SQLAlchemy + PostgreSQL / SQLite

  • 代理管理:智能代理池轮换

  • 验证码处理:OCR识别 + 人工打码降级方案

实战项目:多平台餐厅点评数据采集系统

系统架构设计

python

""" 餐厅点评数据采集系统架构 ├── 爬虫调度中心 (Scheduler) ├── 网页采集模块 (Fetcher) ├── 数据解析模块 (Parser) ├── 数据存储模块 (Storage) ├── 反爬对抗模块 (Anti-Anti-Spider) └── 监控报警模块 (Monitor) """

完整代码实现

python

""" 餐厅点评数据采集爬虫 - 基于Playwright的异步高效解决方案 作者:爬虫技术专家 日期:2024年 版本:2.0 """ import asyncio import json import random import time from dataclasses import dataclass from typing import List, Dict, Optional, Any from urllib.parse import urljoin, urlencode from datetime import datetime import asyncpg import pandas as pd from playwright.async_api import async_playwright, Browser, Page, Response from bs4 import BeautifulSoup from fake_useragent import UserAgent from pydantic import BaseModel, Field import aiofiles import aiohttp from tenacity import retry, stop_after_attempt, wait_exponential from rich.progress import Progress, SpinnerColumn, TextColumn from rich.console import Console from rich.table import Table import logging # 配置日志系统 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('restaurant_spider.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 数据模型定义 class RestaurantReview(BaseModel): """餐厅点评数据模型""" platform: str = Field(description="平台名称") restaurant_id: str = Field(description="餐厅ID") restaurant_name: str = Field(description="餐厅名称") average_rating: float = Field(description="平均评分") review_count: int = Field(description="点评数量") price_range: Optional[str] = Field(description="价格区间") address: Optional[str] = Field(description="地址") phone: Optional[str] = Field(description="电话") review_content: Optional[str] = Field(description="点评内容") reviewer_name: Optional[str] = Field(description="点评者名称") reviewer_rating: Optional[float] = Field(description="点评者评分") review_time: Optional[datetime] = Field(description="点评时间") useful_count: Optional[int] = Field(description="有用数") images: Optional[List[str]] = Field(description="点评图片") collected_at: datetime = Field(default_factory=datetime.now) class ReviewSpiderConfig(BaseModel): """爬虫配置模型""" headless: bool = Field(default=True, description="无头模式") proxy: Optional[str] = Field(default=None, description="代理服务器") timeout: int = Field(default=30000, description="超时时间(ms)") max_concurrent: int = Field(default=3, description="最大并发数") delay_range: tuple = Field(default=(1, 3), description="延迟范围(秒)") max_retries: int = Field(default=3, description="最大重试次数") class AsyncRestaurantSpider: """异步餐厅点评爬虫核心类""" def __init__(self, config: ReviewSpiderConfig): self.config = config self.playwright = None self.browser = None self.context = None self.semaphore = asyncio.Semaphore(config.max_concurrent) self.ua = UserAgent() self.console = Console() async def __aenter__(self): await self.init_browser() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() async def init_browser(self): """初始化Playwright浏览器""" self.playwright = await async_playwright().start() launch_options = { 'headless': self.config.headless, 'timeout': self.config.timeout, 'args': [ '--disable-blink-features=AutomationControlled', '--disable-dev-shm-usage', '--no-sandbox', f'--user-agent={self.ua.random}', ] } if self.config.proxy: launch_options['proxy'] = {'server': self.config.proxy} self.browser = await self.playwright.chromium.launch(**launch_options) # 设置上下文,模拟真实用户 self.context = await self.browser.new_context( viewport={'width': 1920, 'height': 1080}, locale='zh-CN', timezone_id='Asia/Shanghai', user_agent=self.ua.random ) # 添加随机鼠标移动脚本 await self.context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); window.chrome = { runtime: {}, loadTimes: function(){}, csi: function(){}, app: {} }; """) logger.info("浏览器初始化完成") @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10) ) async def fetch_page(self, url: str, page_num: int = None) -> Optional[Page]: """获取页面内容""" async with self.semaphore: try: # 随机延迟,模拟人类行为 await asyncio.sleep(random.uniform(*self.config.delay_range)) page = await self.context.new_page() # 随机设置视口大小 await page.set_viewport_size({ 'width': random.randint(1200, 1920), 'height': random.randint(800, 1080) }) # 监听请求和响应 page.on('request', lambda req: logger.debug(f"请求: {req.url}")) page.on('response', lambda res: logger.debug(f"响应: {res.status} {res.url}")) # 添加随机鼠标移动 await self.simulate_human_behavior(page) logger.info(f"正在访问: {url}") response = await page.goto(url, timeout=self.config.timeout, wait_until='networkidle') if not response or response.status != 200: logger.warning(f"页面加载失败: {url}, 状态码: {getattr(response, 'status', '未知')}") await page.close() return None # 检查是否被反爬 if await self.check_anti_spider(page): logger.warning("检测到反爬机制,正在尝试绕过...") await self.handle_anti_spider(page) # 滚动页面加载更多内容 await self.scroll_page(page) return page except Exception as e: logger.error(f"获取页面失败: {url}, 错误: {str(e)}") if 'page' in locals(): await page.close() raise async def simulate_human_behavior(self, page: Page): """模拟人类浏览行为""" # 随机移动鼠标 for _ in range(random.randint(3, 7)): x = random.randint(100, 1800) y = random.randint(100, 1000) await page.mouse.move(x, y) await asyncio.sleep(random.uniform(0.1, 0.5)) # 随机滚动 scroll_steps = random.randint(2, 5) for _ in range(scroll_steps): scroll_amount = random.randint(200, 800) await page.evaluate(f"window.scrollBy(0, {scroll_amount})") await asyncio.sleep(random.uniform(0.2, 1)) async def scroll_page(self, page: Page): """滚动页面以加载动态内容""" scroll_height = await page.evaluate("document.body.scrollHeight") current_position = 0 scroll_step = random.randint(300, 600) while current_position < scroll_height: await page.evaluate(f"window.scrollTo(0, {current_position})") await asyncio.sleep(random.uniform(0.5, 1.5)) current_position += scroll_step # 随机停留 if random.random() > 0.7: await asyncio.sleep(random.uniform(1, 3)) async def check_anti_spider(self, page: Page) -> bool: """检查是否触发反爬机制""" # 检查常见反爬特征 checks = [ page.locator("text=验证码"), page.locator("text=访问过于频繁"), page.locator("text=请完成验证"), page.locator("text=Security Check"), page.locator(".captcha"), page.locator("#challenge-form") ] for check in checks: if await check.count() > 0: return True # 检查页面内容是否异常 content = await page.content() if len(content) < 1000 or "robot" in content.lower(): return True return False async def handle_anti_spider(self, page: Page): """处理反爬机制""" # 尝试刷新页面 await page.reload(wait_until='networkidle') await asyncio.sleep(random.uniform(3, 7)) # 如果还有验证码,尝试简单处理 if await page.locator(".captcha").count() > 0: logger.warning("遇到验证码,尝试人工处理或更换代理") # 这里可以集成验证码识别服务 # 或者暂停爬虫等待人工干预 async def parse_dianping_restaurant(self, page: Page) -> List[RestaurantReview]: """解析大众点评餐厅页面""" reviews = [] try: # 获取餐厅基本信息 content = await page.content() soup = BeautifulSoup(content, 'html.parser') # 餐厅名称 name_elem = soup.select_one('.shop-name') restaurant_name = name_elem.get_text(strip=True) if name_elem else "未知" # 餐厅评分 rating_elem = soup.select_one('.brief-info .num') average_rating = float(rating_elem.get_text(strip=True)) if rating_elem else 0.0 # 点评数量 count_elem = soup.select_one('.review-amount .count') if count_elem: review_count = int(count_elem.get_text(strip=True).replace(',', '')) else: review_count = 0 # 解析点评列表 review_items = soup.select('.reviews-items .main-review') for item in review_items: review = RestaurantReview( platform="dianping", restaurant_id=self.extract_restaurant_id(page.url), restaurant_name=restaurant_name, average_rating=average_rating, review_count=review_count, review_content=item.select_one('.review-words').get_text(strip=True) if item.select_one('.review-words') else None, reviewer_name=item.select_one('.name').get_text(strip=True) if item.select_one('.name') else None, reviewer_rating=float(item.select_one('.score').get_text(strip=True)) if item.select_one('.score') else None, review_time=datetime.strptime(item.select_one('.time').get_text(strip=True), '%Y-%m-%d') if item.select_one('.time') else None, useful_count=int(item.select_one('.useful-count').get_text(strip=True)) if item.select_one('.useful-count') else None ) reviews.append(review) except Exception as e: logger.error(f"解析大众点评页面失败: {str(e)}") return reviews async def parse_meituan_restaurant(self, page: Page) -> List[RestaurantReview]: """解析美团餐厅页面""" reviews = [] try: # 美团页面通常有更多的动态加载内容 # 需要等待元素加载 await page.wait_for_selector('.review-list', timeout=10000) content = await page.content() soup = BeautifulSoup(content, 'html.parser') # 餐厅信息解析 # 这里根据实际页面结构编写解析逻辑 # 由于篇幅限制,简化处理 except Exception as e: logger.error(f"解析美团页面失败: {str(e)}") return reviews def extract_restaurant_id(self, url: str) -> str: """从URL中提取餐厅ID""" import re patterns = [ r'shop/(\d+)', r'poi/(\d+)', r'id=(\d+)', r'item/(\d+)' ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return "unknown" async def save_to_database(self, reviews: List[RestaurantReview], db_url: str): """保存数据到数据库""" conn = await asyncpg.connect(db_url) try: async with conn.transaction(): for review in reviews: await conn.execute(''' INSERT INTO restaurant_reviews (platform, restaurant_id, restaurant_name, average_rating, review_count, review_content, reviewer_name, reviewer_rating, review_time, useful_count, collected_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (platform, restaurant_id, reviewer_name, review_time) DO UPDATE SET review_content = EXCLUDED.review_content, useful_count = EXCLUDED.useful_count, collected_at = EXCLUDED.collected_at ''', review.platform, review.restaurant_id, review.restaurant_name, review.average_rating, review.review_count, review.review_content, review.reviewer_name, review.reviewer_rating, review.review_time, review.useful_count, review.collected_at) logger.info(f"成功保存 {len(reviews)} 条点评数据") except Exception as e: logger.error(f"数据库保存失败: {str(e)}") finally: await conn.close() async def save_to_csv(self, reviews: List[RestaurantReview], filename: str): """保存数据到CSV文件""" df = pd.DataFrame([review.dict() for review in reviews]) df.to_csv(filename, index=False, encoding='utf-8-sig') logger.info(f"数据已保存到 {filename}") async def crawl_restaurant_list(self, search_urls: List[str], max_pages: int = 10): """爬取餐厅列表""" all_reviews = [] with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=self.console ) as progress: task = progress.add_task("[cyan]爬取餐厅数据...", total=len(search_urls)) for url in search_urls: try: for page_num in range(1, max_pages + 1): paginated_url = f"{url}&page={page_num}" if "?" in url else f"{url}?page={page_num}" page = await self.fetch_page(paginated_url, page_num) if not page: break # 根据URL判断平台并解析 if 'dianping' in url: reviews = await self.parse_dianping_restaurant(page) elif 'meituan' in url: reviews = await self.parse_meituan_restaurant(page) else: reviews = [] all_reviews.extend(reviews) logger.info(f"第 {page_num} 页爬取完成,获取 {len(reviews)} 条点评") await page.close() # 随机延迟,避免请求过快 await asyncio.sleep(random.uniform(2, 5)) except Exception as e: logger.error(f"爬取 {url} 失败: {str(e)}") progress.update(task, advance=1) return all_reviews async def close(self): """关闭浏览器和Playwright""" if self.context: await self.context.close() if self.browser: await self.browser.close() if self.playwright: await self.playwright.stop() class RestaurantSpiderManager: """爬虫管理器""" def __init__(self, config_path: str = "config.json"): self.config = self.load_config(config_path) self.spiders = [] def load_config(self, config_path: str) -> Dict: """加载配置文件""" try: with open(config_path, 'r', encoding='utf-8') as f: return json.load(f) except FileNotFoundError: return { "headless": True, "max_concurrent": 3, "delay_range": [1, 3], "database_url": "postgresql://user:password@localhost/restaurant_reviews", "platforms": [ { "name": "dianping", "search_urls": [ "https://www.dianping.com/search/keyword/1/0_餐厅", "https://www.dianping.com/search/keyword/1/0_火锅" ], "max_pages": 5 } ] } async def run(self): """运行爬虫""" console = Console() console.print("[bold green]🚀 餐厅点评数据采集系统启动[/bold green]") spider_config = ReviewSpiderConfig( headless=self.config.get('headless', True), max_concurrent=self.config.get('max_concurrent', 3), delay_range=tuple(self.config.get('delay_range', [1, 3])) ) async with AsyncRestaurantSpider(spider_config) as spider: all_reviews = [] for platform_config in self.config.get('platforms', []): platform_name = platform_config['name'] search_urls = platform_config['search_urls'] max_pages = platform_config.get('max_pages', 5) console.print(f"\n[bold cyan]开始爬取 {platform_name} 数据...[/bold cyan]") reviews = await spider.crawl_restaurant_list(search_urls, max_pages) all_reviews.extend(reviews) console.print(f"[green]✓ {platform_name} 爬取完成,共获取 {len(reviews)} 条数据[/green]") # 保存数据 if all_reviews: # 保存到数据库 db_url = self.config.get('database_url') if db_url: await spider.save_to_database(all_reviews, db_url) # 保存到CSV timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") csv_file = f"restaurant_reviews_{timestamp}.csv" await spider.save_to_csv(all_reviews, csv_file) # 显示统计信息 self.display_statistics(all_reviews) console.print("[bold green]✨ 数据采集任务完成![/bold green]") def display_statistics(self, reviews: List[RestaurantReview]): """显示统计信息""" console = Console() if not reviews: console.print("[yellow]⚠️ 未获取到任何数据[/yellow]") return table = Table(title="数据采集统计", show_header=True, header_style="bold magenta") table.add_column("平台", style="cyan") table.add_column("餐厅数量", justify="right") table.add_column("点评数量", justify="right") table.add_column("平均评分", justify="right") from collections import defaultdict platform_stats = defaultdict(lambda: {'restaurants': set(), 'reviews': 0, 'ratings': []}) for review in reviews: stats = platform_stats[review.platform] stats['restaurants'].add(review.restaurant_id) stats['reviews'] += 1 if review.reviewer_rating: stats['ratings'].append(review.reviewer_rating) for platform, stats in platform_stats.items(): avg_rating = sum(stats['ratings'])/len(stats['ratings']) if stats['ratings'] else 0 table.add_row( platform, str(len(stats['restaurants'])), str(stats['reviews']), f"{avg_rating:.1f}" ) console.print(table) async def main(): """主函数""" # 创建爬虫管理器 manager = RestaurantSpiderManager() try: # 运行爬虫 await manager.run() except KeyboardInterrupt: logger.info("用户中断爬虫程序") except Exception as e: logger.error(f"爬虫运行失败: {str(e)}", exc_info=True) if __name__ == "__main__": # 创建数据库表(如果不存在) async def init_database(): conn = await asyncpg.connect('postgresql://user:password@localhost/restaurant_reviews') await conn.execute(''' CREATE TABLE IF NOT EXISTS restaurant_reviews ( id SERIAL PRIMARY KEY, platform VARCHAR(50), restaurant_id VARCHAR(100), restaurant_name VARCHAR(200), average_rating DECIMAL(3,1), review_count INTEGER, review_content TEXT, reviewer_name VARCHAR(100), reviewer_rating DECIMAL(3,1), review_time TIMESTAMP, useful_count INTEGER, collected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(platform, restaurant_id, reviewer_name, review_time) ) ''') await conn.close() # 运行异步主函数 asyncio.run(main())

高级功能与优化策略

1. 智能代理池管理

python

class ProxyManager: """智能代理池管理器""" def __init__(self): self.proxies = [] self.blacklist = set() self.success_rate = {} async def get_proxy(self) -> str: """获取最优代理""" # 根据成功率、响应时间等指标选择代理 # 实现代理自动切换和故障转移 pass

2. 分布式爬虫架构

python

class DistributedSpider: """基于Redis的分布式爬虫""" def __init__(self): self.redis_client = None self.task_queue = "restaurant:urls" self.result_queue = "restaurant:results" async def distribute_tasks(self, urls: List[str]): """分发爬取任务""" pass async def collect_results(self): """收集爬取结果""" pass

3. 数据质量监控

python

class DataQualityMonitor: """数据质量监控系统""" @staticmethod def check_review_quality(review: RestaurantReview) -> Dict: """检查数据质量""" checks = { 'content_length': len(review.review_content or '') > 10, 'rating_valid': 0 <= review.reviewer_rating <= 5, 'time_valid': review.review_time < datetime.now(), 'no_duplicate': True # 需要实现去重检查 } score = sum(checks.values()) / len(checks) return {'score': score, 'details': checks}

法律与伦理考量

合规爬虫实践

  1. 遵守robots.txt:尊重网站的爬虫协议

  2. 限制爬取频率:避免对目标网站造成负担

  3. 数据使用规范:仅用于合法目的和研究

  4. 用户隐私保护:匿名化处理用户个人信息

  5. 版权尊重:注明数据来源,不用于商业侵权

性能优化技巧

  1. 连接池管理:重用数据库和HTTP连接

  2. 内存优化:使用生成器处理大量数据

  3. 错误重试机制:指数退避策略

  4. 缓存策略:避免重复请求相同页面

  5. 异步文件IO:提高数据写入效率

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/18 12:42:37

空间音频处理技术揭秘:沉浸式声音背后的科学与工程

空间音频处理技术背后的科学 每一次新设备的发布或升级&#xff0c;目标都是为使用者带来更佳且价格合理的音频体验。今年&#xff0c;引入了一项自主研发的空间音频处理技术&#xff0c;旨在增强兼容设备上的立体声效果。 以某款特定智能音箱为例&#xff0c;该版本的技术针对…

作者头像 李华
网站建设 2026/4/20 14:03:14

历史人物再现:博物馆用VoxCPM-1.5-TTS-WEB-UI‘复活’孔子李白对话

历史人物再现&#xff1a;博物馆用VoxCPM-1.5-TTS-WEB-UI“复活”孔子李白对话 在一座现代博物馆的展厅中央&#xff0c;一块巨大的交互屏缓缓亮起。一位孩子轻触屏幕&#xff0c;输入&#xff1a;“孔子爷爷&#xff0c;什么是仁&#xff1f;”片刻之后&#xff0c;一个沉稳庄…

作者头像 李华
网站建设 2026/4/19 19:34:42

ZGC分代模式真的适合你吗?3种典型场景下的堆分配对比分析

第一章&#xff1a;ZGC分代模式真的适合你吗&#xff1f; 随着Java应用对低延迟需求的不断提升&#xff0c;ZGC&#xff08;Z Garbage Collector&#xff09;的分代模式成为关注焦点。它在保留ZGC极低暂停时间优势的同时&#xff0c;引入了分代回收机制&#xff0c;旨在提升吞吐…

作者头像 李华
网站建设 2026/4/16 9:38:01

掌握Java 24结构化并发异常处理的3个核心技巧,告别线程失控

第一章&#xff1a;Java 24结构化并发异常处理概述Java 24 引入了结构化并发&#xff08;Structured Concurrency&#xff09;的正式支持&#xff0c;极大简化了多线程编程中的异常处理与任务生命周期管理。该特性将并发任务视为结构化代码块&#xff0c;确保子任务在父作用域内…

作者头像 李华
网站建设 2026/4/17 7:43:40

【Java智能运维日志分析实战】:掌握高效日志解析与异常预警核心技术

第一章&#xff1a;Java智能运维日志分析概述在现代分布式系统中&#xff0c;Java应用广泛部署于高并发、多节点的生产环境&#xff0c;随之产生的海量运行日志成为系统可观测性的核心数据源。智能运维日志分析通过采集、解析、存储和挖掘这些日志&#xff0c;实现故障预警、性…

作者头像 李华
网站建设 2026/4/17 15:26:29

【飞算JavaAI需求优化实战】:3大核心技巧提升需求描述准确率90%

第一章&#xff1a;飞算JavaAI需求描述优化概述在现代软件开发中&#xff0c;需求描述的准确性与可执行性直接影响开发效率与系统质量。飞算JavaAI作为一种融合人工智能技术的开发辅助工具&#xff0c;致力于将自然语言形式的需求描述自动转化为结构清晰、逻辑严谨的技术实现方…

作者头像 李华