爬虫最难的不是把数据爬下来,而是把乱七八糟的原始数据洗干净存好。这一篇讲爬虫后处理的完整流程——清洗、去重、存储。
一、脏数据的常见问题
爬下来的原始数据基本是这样的:
data=[{"title":" iPhone 15 ","price":" ¥6,999 ","stock":"100件"},{"title":"华为 Mate 60","price":"null","stock":"50"},{"title":"","price":"5.99","stock":"-1"},{"title":"小米 14","price":"3999","stock":"abc"},None,]问题清单:
- 空格、换行符、特殊符号混在字符串里
- 价格带货币符号和逗号(“¥6,999”)
- 空值、None、null 混杂
- 数据类型不对(库存是数字但爬下来是字符串)
- 脏数据(负数库存、空标题)
- 重复数据
二、pandas 数据清洗三板斧
1. 去除空格和特殊字符
importpandasaspd df=pd.DataFrame(data)# 去除字符串两端的空格和换行df["title"]=df["title"].str.strip()# 去除所有空格(有些用全角空格)df["title"]=df["title"].str.replace(r"\s+","",regex=True)# 清洗价格字段:去符号、去逗号df["price"]=df["price"].str.replace(r"[¥¥$,]","",regex=True)df["price"]=pd.to_numeric(df["price"],errors="coerce")# errors="coerce":遇到不能转的变成 NaN# 清洗库存df["stock"]=pd.to_numeric(df["stock"],errors="coerce")print(df.dtypes)# 确认类型已经转成数字2. 处理空值
# 查看哪些列有空值print(df.isnull().sum())# 删除空值过多的行df=df.dropna(subset=["title"])# 标题为空就删掉# 填充数值列的空值df["price"]=df["price"].fillna(0)df["stock"]=df["stock"].fillna(0)# 或者用均值填充df["price"]=df["price"].fillna(df["price"].mean())# 填充字符串列df["brand"]=df["brand"].fillna("未知")3. 去重
# 查看重复行数print(df.duplicated().sum())# 删除完全重复的行df=df.drop_duplicates()# 指定列去重(保留第一次出现的)df=df.drop_duplicates(subset=["title"])# 查看某列重复值print(df["title"].value_counts())# 过滤异常数据df=df[df["price"]>0]# 价格必须大于0df=df[df["stock"]>=0]# 库存不能是负数4. 常用数据转换
# 类型转换df["price"]=df["price"].astype(float)df["stock"]=df["stock"].astype(int)# 处理时间df["crawl_time"]=pd.to_datetime(df["crawl_time"])# 提取字段中的信息df["city"]=df["address"].str.extract(r"(北京|上海|广州|深圳|郑州)")# 统一文本格式(如品牌名统一小写)df["brand"]=df["brand"].str.lower().str.strip()三、存储到 MySQL
1. pandas 直接写入(最简单)
fromsqlalchemyimportcreate_engineimportpandasaspd# 连接 MySQLengine=create_engine("mysql+pymysql://root:123456@localhost:3306/spider_db?charset=utf8mb4")# DataFrame 直接写入 MySQL 表# 如果表已存在,replace 替换,append 追加df.to_sql(name="products",# 表名con=engine,if_exists="append",# 追加模式index=False,# 不保存 DataFrame 的索引chunksize=1000,# 分批写入,每批1000条)print("数据已写入 MySQL")2. 逐条写入(适合自定义处理)
importpymysql conn=pymysql.connect(host="localhost",port=3306,user="root",password="123456",database="spider_db",charset="utf8mb4",)cursor=conn.cursor()# 批量插入sql="""INSERT INTO products (title, price, brand, stock, crawl_time) VALUES (%s, %s, %s, %s, %s)"""data_list=[("iPhone 15",6999,"Apple",100,"2026-06-26"),("华为 Mate 60",5999,"华为",50,"2026-06-26"),]cursor.executemany(sql,data_list)conn.commit()print(f"插入了{cursor.rowcount}条数据")cursor.close()conn.close()3. 更新 vs 插入(去重逻辑)
重复爬取时,同一件商品可能出现两次。处理方式:
# 方案一:INSERT IGNORE(主键或唯一索引冲突时跳过)sql="INSERT IGNORE INTO products (id, title, price) VALUES (%s, %s, %s)"# 方案二:REPLACE INTO(冲突时替换)sql="REPLACE INTO products (id, title, price) VALUES (%s, %s, %s)"# 方案三:ON DUPLICATE KEY UPDATE(冲突时更新指定字段)sql="""INSERT INTO products (id, title, price, crawl_time) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE price = VALUES(price), crawl_time = VALUES(crawl_time)"""推荐方案三,既保留旧数据,又更新最新价格和爬取时间。
四、存储到 MongoDB
MongoDB 适合结构不固定的爬虫数据——不同商品字段可能不一样(手机有"像素",冰箱有"容积"),用 MongoDB 不用提前建表。
1. 写入
frompymongoimportMongoClient# 连接 MongoDBclient=MongoClient("localhost",27017)db=client["spider_db"]collection=db["products"]# 插入一条collection.insert_one({"title":"iPhone 15","price":6999,"brand":"Apple","crawl_time":"2026-06-26",})# 批量插入(直接传 DataFrame)collection.insert_many(df.to_dict("records"))print(f"插入了{len(df)}条")2. 去重更新
# 以 title 为唯一标识,有则更新,无则插入for_,rowindf.iterrows():collection.update_one({"title":row["title"]},# 查询条件{"$set":row.to_dict()},# 更新内容upsert=True,# 不存在就插入)3. 查询
# 价格大于5000的商品forproductincollection.find({"price":{"$gt":5000}}):print(product["title"],product["price"])# 按品牌统计数量pipeline=[{"$group":{"_id":"$brand","count":{"$sum":1}}},{"$sort":{"count":-1}},]forresultincollection.aggregate(pipeline):print(result["_id"],result["count"])五、MySQL vs MongoDB 怎么选
| MySQL | MongoDB | |
|---|---|---|
| 数据格式 | 需要提前建表,字段固定 | JSON 格式,字段灵活 |
| 爬虫场景 | 同一种商品,字段稳定 | 不同来源,字段不一致 |
| 查询 | 适合关联查询、统计 | 适合存和查,不适合复杂关联 |
| 上手 | 需要建表写 SQL | 直接 insert,零门槛 |
实际建议:
- 爬电商商品(字段固定)→MySQL
- 爬多种网站,数据格式不统一 →MongoDB
- 小数据量(几千条)→JSON 文件就够了
六、完整流水线示例
importpandasaspdfromsqlalchemyimportcreate_engineimportrequestsfrombs4importBeautifulSoupimporttimedefcrawl_products():"""爬虫 → 清洗 → 存储 一条龙"""# 1. 爬取all_data=[]forpageinrange(1,6):resp=requests.get(f"https://example.com/products?page={page}")soup=BeautifulSoup(resp.text,"html.parser")foriteminsoup.select(".product"):all_data.append({"title":item.select_one(".title").text,"price":item.select_one(".price").text,"brand":item.select_one(".brand").text,"stock":item.select_one(".stock").text,})time.sleep(1)print(f"爬取完成,共{len(all_data)}条原始数据")# 2. 清洗df=pd.DataFrame(all_data)df["title"]=df["title"].str.strip()df["price"]=df["price"].str.replace(r"[¥¥$,]","",regex=True)df["price"]=pd.to_numeric(df["price"],errors="coerce")df["stock"]=pd.to_numeric(df["stock"],errors="coerce")df=df.dropna(subset=["title"])df=df.drop_duplicates(subset=["title"])df=df[df["price"]>0]df["crawl_time"]=pd.Timestamp.now()print(f"清洗后剩余{len(df)}条有效数据")# 3. 存储engine=create_engine("mysql+pymysql://root:123456@localhost:3306/spider_db?charset=utf8mb4")df.to_sql("products",con=engine,if_exists="append",index=False)print("数据已保存到 MySQL")returndfif__name__=="__main__":crawl_products()总结
爬虫的核心公式:
爬下来 → 清洗(去空格/去重/类型转换/去异常) → 存储(MySQL/MongoDB/文件) ↑ ↑ 最耗时的环节 按场景选很多人把精力都放在爬取上,结果数据存进去一堆脏数据,后面分析根本没法用。洗数据花的功夫,往往比写爬虫还多,但这步做好了,后面的分析才会顺利。
💡 觉得有用的话,点赞 + 关注【张老师技术栈】吧!每周更新 Java/Python/爬虫 实战干货,不让你白来。