news 2026/7/2 8:17:26

异步爬虫结合 MongoDB 异步驱动 pymongo:高效数据爬取与存储实践

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
异步爬虫结合 MongoDB 异步驱动 pymongo:高效数据爬取与存储实践

在数据爬取场景中,传统同步爬虫受限于 IO 等待(如网络请求、数据库写入),效率往往难以满足大规模数据采集需求。而异步编程能最大化利用 CPU 资源,结合 MongoDB 的异步驱动,则可实现 “爬取 - 存储” 全流程异步化,大幅提升爬虫整体性能。本文将详细讲解如何基于 Python 的 aiohttp 实现异步爬虫,并结合 pymongo 的异步特性(Motor)完成数据的高效存储。

一、核心技术栈说明

在开始编码前,先明确核心依赖库的作用:

  • aiohttp:Python 异步 HTTP 客户端 / 服务器框架,用于发起异步网络请求,替代同步的 requests 库;
  • Motor:MongoDB 官方推荐的异步驱动,是 pymongo 的异步版本,支持异步 IO 操作 MongoDB;
  • asyncio:Python 内置的异步编程框架,用于管理异步任务、事件循环;
  • pymongo:MongoDB 的同步驱动(本文仅作为对比参考,核心使用 Motor)。

二、环境准备

1. 安装依赖

bash

运行

pip install aiohttp motor python-dotenv

2. 前提条件

  • 已安装并启动 MongoDB 服务(本地或远程);
  • 了解基本的异步编程概念(如协程、async/await);
  • 目标网站允许爬虫访问(遵守 robots 协议,避免法律风险)。

三、异步爬虫 + MongoDB 异步存储实现

1. 核心思路

  1. 初始化 Motor 客户端,建立与 MongoDB 的异步连接;
  2. 定义异步爬虫函数,通过 aiohttp 发起异步请求,解析目标数据;
  3. 定义异步存储函数,将解析后的数据异步写入 MongoDB;
  4. 利用 asyncio 创建任务列表,批量执行异步爬取 + 存储任务;
  5. 统一管理事件循环,确保所有异步任务完成后关闭连接。

2. 完整代码实现

python

运行

import asyncio import aiohttp from motor.motor_asyncio import AsyncIOMotorClient from dotenv import load_dotenv import os from typing import Dict, List # 加载环境变量(建议将敏感信息放在.env文件中) load_dotenv() # MongoDB配置 MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/") MONGO_DB_NAME = os.getenv("MONGO_DB_NAME", "spider_db") MONGO_COLLECTION_NAME = os.getenv("MONGO_COLLECTION_NAME", "async_spider_data") # 爬虫配置 TARGET_URLS = [ "https://jsonplaceholder.typicode.com/posts/1", "https://jsonplaceholder.typicode.com/posts/2", "https://jsonplaceholder.typicode.com/posts/3", "https://jsonplaceholder.typicode.com/posts/4", "https://jsonplaceholder.typicode.com/posts/5", ] HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } # 初始化MongoDB异步客户端 async def init_mongo() -> AsyncIOMotorClient: """初始化MongoDB异步连接""" try: client = AsyncIOMotorClient(MONGO_URI) # 测试连接 await client.admin.command("ping") print("MongoDB异步连接成功!") return client except Exception as e: print(f"MongoDB连接失败:{e}") raise # 异步爬取单条数据 async def crawl_single_url(session: aiohttp.ClientSession, url: str) -> Dict | None: """异步爬取单个URL的内容""" try: async with session.get(url, headers=HEADERS, timeout=aiohttp.ClientTimeout(10)) as response: if response.status == 200: data = await response.json() print(f"成功爬取:{url}") return data else: print(f"爬取失败,状态码:{response.status},URL:{url}") return None except Exception as e: print(f"爬取异常:{e},URL:{url}") return None # 异步存储数据到MongoDB async def save_to_mongo(db, data: Dict) -> None: """异步将数据写入MongoDB""" if not data: return try: collection = db[MONGO_COLLECTION_NAME] # 异步插入单条数据 result = await collection.insert_one(data) print(f"数据存储成功,ID:{result.inserted_id}") except Exception as e: print(f"数据存储失败:{e}") # 核心异步任务:爬取+存储 async def crawl_and_save(session: aiohttp.ClientSession, db, url: str) -> None: """单个URL的爬取+存储一体化异步任务""" data = await crawl_single_url(session, url) await save_to_mongo(db, data) # 主函数:批量执行异步任务 async def main(): # 初始化MongoDB连接 client = await init_mongo() db = client[MONGO_DB_NAME] # 创建aiohttp会话(复用连接池,提升效率) async with aiohttp.ClientSession() as session: # 创建异步任务列表 tasks = [] for url in TARGET_URLS: task = asyncio.create_task(crawl_and_save(session, db, url)) tasks.append(task) # 等待所有任务完成 await asyncio.gather(*tasks) # 关闭MongoDB连接 client.close() print("所有爬取和存储任务完成!") if __name__ == "__main__": # 适配Python 3.7+的事件循环运行方式 if sys.version_info >= (3, 7): asyncio.run(main()) else: loop = asyncio.get_event_loop() loop.run_until_complete(main())

3. 代码关键解析

(1)MongoDB 异步初始化

init_mongo函数通过AsyncIOMotorClient创建异步客户端,替代 pymongo 的同步MongoClient,并通过await client.admin.command("ping")测试连接,确保连接成功后再执行后续操作。

(2)异步 HTTP 请求

使用aiohttp.ClientSession创建会话(复用连接池),通过async with session.get()发起异步请求,await response.json()异步解析响应数据,避免同步请求的 IO 阻塞。

(3)异步数据存储

save_to_mongo函数中,await collection.insert_one(data)是核心异步操作,替代 pymongo 同步的insert_one,无需等待数据库写入完成即可继续处理下一个爬取任务。

(4)批量任务管理

通过asyncio.create_task创建多个异步任务,再用asyncio.gather批量执行,实现多 URL 并行爬取和存储,最大化利用资源。

四、性能对比与优化建议

1. 同步 vs 异步性能差异

以爬取 100 个 URL 并写入 MongoDB 为例:

  • 同步爬虫(requests+pymongo):约需 30-60 秒(受网络延迟、数据库写入等待影响);
  • 异步爬虫(aiohttp+Motor):约需 5-10 秒(并行处理 IO 操作,无等待时间)。

2. 优化方向

(1)控制并发数

避免无限制并发导致目标网站封禁 IP 或 MongoDB 压力过大,可使用asyncio.Semaphore限制并发数:

python

运行

# 在main函数中添加信号量,限制最大并发为5 semaphore = asyncio.Semaphore(5) # 修改crawl_and_save函数,增加信号量控制 async def crawl_and_save(session: aiohttp.ClientSession, db, url: str) -> None: async with semaphore: # 限制并发 data = await crawl_single_url(session, url) await save_to_mongo(db, data)
(2)数据批量写入

若爬取数据量极大,可将多条数据缓存后批量插入,减少数据库交互次数:

python

运行

async def save_batch_to_mongo(db, data_list: List[Dict]) -> None: """批量异步插入数据""" if not data_list: return try: collection = db[MONGO_COLLECTION_NAME] result = await collection.insert_many(data_list) print(f"批量存储成功,插入ID数量:{len(result.inserted_ids)}") except Exception as e: print(f"批量存储失败:{e}")
(3)异常重试机制

对失败的爬取任务添加重试逻辑,提升稳定性:

python

运行

async def crawl_single_url(session: aiohttp.ClientSession, url: str, retry=3) -> Dict | None: """带重试机制的异步爬取""" for i in range(retry): try: async with session.get(url, headers=HEADERS, timeout=aiohttp.ClientTimeout(10)) as response: if response.status == 200: return await response.json() else: print(f"第{i+1}次重试失败,状态码:{response.status}") await asyncio.sleep(1) # 重试前休眠1秒 except Exception as e: print(f"第{i+1}次重试异常:{e}") await asyncio.sleep(1) return None

五、注意事项

  1. 遵守网站规则:异步爬虫效率高,需控制爬取频率,避免对目标网站造成压力,必要时添加延迟;
  2. MongoDB 索引:针对查询频繁的字段(如 url、id)创建索引,提升后续数据查询效率;
  3. 资源释放:确保异步任务完成后关闭 MongoDB 客户端和 aiohttp 会话,避免资源泄漏;
  4. 数据去重:可通过 MongoDB 的唯一索引(如create_index("id", unique=True))避免重复存储数据。

总结

  1. 异步爬虫结合 MongoDB 异步驱动(Motor)可实现 “爬取 - 存储” 全流程异步化,相比同步方案能提升 5-10 倍效率;
  2. 核心依赖为 aiohttp(异步请求)和 Motor(MongoDB 异步驱动),需通过async/await管理异步任务;
  3. 实际应用中需控制并发数、添加重试机制、批量写入数据,平衡效率与稳定性,同时遵守网站爬取规则。

通过本文的实践,你可以快速搭建高效的异步爬虫系统,满足大规模数据采集与存储的需求,同时兼顾代码的可维护性和扩展性。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/26 17:19:10

2026年,Agent与APP必有一战

旧钥匙打不开新大门,旧地图找不到新大陆。 刚过去的2025年,AI炙手可热,人工智能第一次走进人类日常生活——前所未有地通过手机AI甚至AI手机。 但颠覆与创新,也总是伴随“争议”。 从近年手机厂运用AI算法辅助,让更多人…

作者头像 李华
网站建设 2026/7/1 17:54:19

基于PLC的立体车库管理系统设计

基于PLC的立体车库管理系统设计与实现 第一章 绪论 随着城市汽车保有量激增,停车难已成为城市交通治理的核心痛点,立体车库凭借空间利用率高(较传统平面车库提升3-5倍)的优势成为主流解决方案,但传统立体车库多仅具备…

作者头像 李华
网站建设 2026/6/29 4:46:01

DDD 架构演进,单层、三层,四层,工程分层演进过程!

定义接口、创建方法、调用展示,其实编程写代码说到底也就这3步,人人都是程序员👨🏻‍💻。公司老板都觉得,它有个AI工具,它都能写代码。 但现在的系统工程的分层结构,可不只是一层就写个 Controller,甚至是3层(Model-View-Controller),也有可能是4层(DDD)架构。…

作者头像 李华
网站建设 2026/6/29 8:13:10

Python 的 with 语句:把「资源管理」这件事交给语法

文章目录一、with 语句是干什么的?二、不用 with 会发生什么?三、传统解法:try / finally四、with 的本质:语法级 try / finally五、上下文管理器(Context Manager)5.1 一个最简单的例子5.2 __enter__ 和 _…

作者头像 李华