news 2026/2/2 18:49:50

Python实时股票数据抓取:基于异步爬虫与WebSocket的高效实践

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python实时股票数据抓取:基于异步爬虫与WebSocket的高效实践

摘要

本文详细介绍如何使用Python构建一个高效、实时的股票数据抓取系统。我们将结合异步爬虫技术、WebSocket协议以及现代API接口,打造一个能够实时监控股票市场动态的完整解决方案。文章包含详细的技术原理、代码实现和性能优化建议,总计超过2000字。

技术栈介绍

  • 异步爬虫框架: aiohttp + asyncio

  • 数据解析: BeautifulSoup4 + pandas

  • 实时数据推送: WebSocket连接

  • 数据存储: SQLite + pandas

  • 可视化: matplotlib (可选)

  • API服务: 使用免费财经数据接口

一、股票数据源分析与选择

1.1 免费数据源对比

  • Alpha Vantage: 提供免费API,每分钟5次请求限制

  • Yahoo Finance: 通过非官方API获取,稳定性一般

  • 腾讯财经: 国内数据,访问速度快

  • 新浪财经: 提供实时数据接口

1.2 本文选择的数据源

我们将使用多个数据源确保数据的完整性和可靠性:

  • 实时数据:新浪财经WebSocket接口

  • 历史数据:Alpha Vantage API

  • 备用方案:腾讯财经API

二、环境配置与依赖安装

python

# requirements.txt aiohttp==3.8.5 aiofiles==23.2.1 beautifulsoup4==4.12.2 pandas==2.1.4 websockets==12.0 matplotlib==3.8.2 plotly==5.18.0 requests==2.31.0 httpx==0.25.1 sqlalchemy==2.0.23 schedule==1.2.0

安装命令:

bash

pip install -r requirements.txt

三、异步股票数据爬虫实现

3.1 基础异步爬虫框架

python

import asyncio import aiohttp import pandas as pd from datetime import datetime, timedelta import json import sqlite3 from typing import List, Dict, Optional import logging # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class StockDataFetcher: """异步股票数据抓取器""" def __init__(self, max_concurrent: int = 10): self.max_concurrent = max_concurrent self.session = None self.semaphore = asyncio.Semaphore(max_concurrent) async def __aenter__(self): self.session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=30), headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', } ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() async def fetch_sina_realtime(self, symbols: List[str]) -> Dict: """从新浪财经获取实时数据""" url = "http://hq.sinajs.cn/list=" # 格式化股票代码:sh000001,sz000002 formatted_symbols = [] for symbol in symbols: if symbol.startswith('6'): formatted_symbols.append(f"sh{symbol}") else: formatted_symbols.append(f"sz{symbol}") query_string = ",".join(formatted_symbols) full_url = f"{url}{query_string}" async with self.semaphore: try: async with self.session.get(full_url) as response: if response.status == 200: text = await response.text(encoding='gbk') return self._parse_sina_data(text, symbols) else: logger.error(f"请求失败: {response.status}") return {} except Exception as e: logger.error(f"获取实时数据失败: {e}") return {} def _parse_sina_data(self, raw_data: str, symbols: List[str]) -> Dict: """解析新浪财经返回的数据""" result = {} lines = raw_data.strip().split('\n') for i, line in enumerate(lines): if i >= len(symbols): break try: # 提取数据部分 data_str = line.split('="')[1].rstrip('";') data = data_str.split(',') if len(data) >= 32: stock_info = { 'symbol': symbols[i], 'name': data[0], 'open': float(data[1]), 'close': float(data[2]), # 前收盘价 'price': float(data[3]), # 当前价格 'high': float(data[4]), 'low': float(data[5]), 'volume': int(float(data[8])), 'amount': float(data[9]), 'bid1': float(data[10]), 'bid1_volume': int(float(data[11])), 'ask1': float(data[20]), 'ask1_volume': int(float(data[21])), 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } result[symbols[i]] = stock_info except (IndexError, ValueError) as e: logger.warning(f"解析数据失败: {line[:50]}... - {e}") return result

3.2 WebSocket实时数据订阅

python

import websockets import asyncio import json from typing import Callable class StockWebSocketClient: """WebSocket股票实时数据客户端""" def __init__(self, callback: Callable): self.callback = callback self.websocket = None self.running = False async def connect(self, symbols: List[str]): """连接WebSocket服务器""" # 使用腾讯财经的WebSocket服务 url = "wss://push2.eastmoney.cn/websocket" try: async with websockets.connect(url) as websocket: self.websocket = websocket self.running = True # 订阅股票代码 subscribe_msg = { "Action": "Subscribe", "Codes": symbols, "Fields": ["1", "2", "3", "4", "5", "6", "7", "8", "9"] } await websocket.send(json.dumps(subscribe_msg)) logger.info(f"已订阅 {len(symbols)} 支股票") # 接收数据 await self.receive_messages() except Exception as e: logger.error(f"WebSocket连接失败: {e}") async def receive_messages(self): """接收WebSocket消息""" while self.running: try: message = await self.websocket.recv() data = json.loads(message) if isinstance(data, dict) and 'Data' in data: processed_data = self._process_websocket_data(data['Data']) await self.callback(processed_data) except websockets.exceptions.ConnectionClosed: logger.warning("WebSocket连接已关闭") break except Exception as e: logger.error(f"处理消息失败: {e}") def _process_websocket_data(self, raw_data: List) -> List[Dict]: """处理WebSocket原始数据""" processed = [] for item in raw_data: try: stock_data = { 'symbol': item.get('Code'), 'name': item.get('Name'), 'price': item.get('Price', 0), 'change': item.get('Change', 0), 'change_percent': item.get('ChangePercent', 0), 'volume': item.get('Volume', 0), 'amount': item.get('Amount', 0), 'timestamp': datetime.now().isoformat() } processed.append(stock_data) except Exception as e: logger.error(f"处理股票数据失败: {e}") return processed async def disconnect(self): """断开连接""" self.running = False if self.websocket: await self.websocket.close()

3.3 Alpha Vantage API集成

python

import httpx from typing import Dict, Any class AlphaVantageClient: """Alpha Vantage API客户端""" def __init__(self, api_key: str): self.api_key = api_key self.base_url = "https://www.alphavantage.co/query" self.client = httpx.AsyncClient(timeout=30.0) async def get_intraday_data(self, symbol: str, interval: str = "5min") -> pd.DataFrame: """获取日内交易数据""" params = { "function": "TIME_SERIES_INTRADAY", "symbol": symbol, "interval": interval, "apikey": self.api_key, "outputsize": "compact", "datatype": "json" } try: response = await self.client.get(self.base_url, params=params) data = response.json() if "Time Series (5min)" in data: time_series = data["Time Series (5min)"] df = pd.DataFrame.from_dict(time_series, orient='index') df.columns = ['open', 'high', 'low', 'close', 'volume'] df.index = pd.to_datetime(df.index) df = df.astype(float) return df else: logger.error(f"数据格式错误: {data.get('Note', '未知错误')}") return pd.DataFrame() except Exception as e: logger.error(f"获取日内数据失败: {e}") return pd.DataFrame() async def get_company_overview(self, symbol: str) -> Dict[str, Any]: """获取公司概览信息""" params = { "function": "OVERVIEW", "symbol": symbol, "apikey": self.api_key } try: response = await self.client.get(self.base_url, params=params) return response.json() except Exception as e: logger.error(f"获取公司信息失败: {e}") return {}

四、数据存储与管理

python

import sqlite3 from contextlib import contextmanager import pandas as pd import numpy as np class StockDatabase: """股票数据数据库管理""" def __init__(self, db_path: str = "stocks.db"): self.db_path = db_path self._init_database() def _init_database(self): """初始化数据库表结构""" with self._get_connection() as conn: # 实时数据表 conn.execute(''' CREATE TABLE IF NOT EXISTS realtime_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT NOT NULL, name TEXT, price REAL, open_price REAL, high REAL, low REAL, volume INTEGER, amount REAL, timestamp DATETIME, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') # 创建索引 conn.execute('CREATE INDEX IF NOT EXISTS idx_symbol ON realtime_data(symbol)') conn.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON realtime_data(timestamp)') # 历史数据表 conn.execute(''' CREATE TABLE IF NOT EXISTS historical_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT NOT NULL, date DATE, open REAL, high REAL, low REAL, close REAL, volume INTEGER, adj_close REAL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, UNIQUE(symbol, date) ) ''') @contextmanager def _get_connection(self): """获取数据库连接""" conn = sqlite3.connect(self.db_path) try: yield conn conn.commit() finally: conn.close() async def save_realtime_data(self, stock_data: Dict): """保存实时数据""" with self._get_connection() as conn: cursor = conn.cursor() for symbol, data in stock_data.items(): cursor.execute(''' INSERT INTO realtime_data (symbol, name, price, open_price, high, low, volume, amount, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( data.get('symbol'), data.get('name'), data.get('price'), data.get('open'), data.get('high'), data.get('low'), data.get('volume'), data.get('amount'), data.get('timestamp') )) async def save_historical_data(self, symbol: str, df: pd.DataFrame): """保存历史数据""" if df.empty: return with self._get_connection() as conn: for date, row in df.iterrows(): try: conn.execute(''' INSERT OR REPLACE INTO historical_data (symbol, date, open, high, low, close, volume, adj_close) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', ( symbol, date.strftime('%Y-%m-%d'), row['open'], row['high'], row['low'], row['close'], int(row['volume']), row.get('adj_close', row['close']) )) except Exception as e: logger.error(f"保存历史数据失败: {e}") def get_latest_data(self, symbol: str, limit: int = 100) -> pd.DataFrame: """获取最新数据""" with self._get_connection() as conn: query = ''' SELECT * FROM realtime_data WHERE symbol = ? ORDER BY timestamp DESC LIMIT ? ''' df = pd.read_sql_query(query, conn, params=(symbol, limit)) return df

五、完整爬虫系统集成

python

import asyncio import signal import sys from datetime import datetime class StockMonitorSystem: """股票监控系统主程序""" def __init__(self, config: Dict): self.config = config self.fetcher = None self.db = StockDatabase() self.websocket_client = None self.running = False # 监控的股票列表 self.symbols = config.get('symbols', ['000001', '000002', '600519']) # 设置信号处理 signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) def signal_handler(self, signum, frame): """处理退出信号""" logger.info("收到退出信号,正在关闭系统...") self.running = False sys.exit(0) async def websocket_callback(self, data: List[Dict]): """WebSocket数据回调函数""" logger.info(f"收到 {len(data)} 条实时数据") # 保存到数据库 stock_dict = {item['symbol']: item for item in data} await self.db.save_realtime_data(stock_dict) # 打印最新价格 for item in data[:3]: # 只显示前3支股票 logger.info(f"{item['name']}({item['symbol']}): ¥{item['price']} " f"({item.get('change_percent', 0):+.2f}%)") async def fetch_historical_data(self): """获取历史数据""" alpha_client = AlphaVantageClient(self.config.get('alpha_vantage_key')) tasks = [] for symbol in self.symbols: # 添加美股需要".SS"后缀 if symbol.startswith('6'): alpha_symbol = f"{symbol}.SS" else: alpha_symbol = f"{symbol}.SZ" task = alpha_client.get_intraday_data(alpha_symbol) tasks.append(task) results = await asyncio.gather(*tasks, return_exceptions=True) for i, df in enumerate(results): if isinstance(df, pd.DataFrame) and not df.empty: await self.db.save_historical_data(self.symbols[i], df) logger.info(f"已保存 {self.symbols[i]} 的历史数据,共 {len(df)} 条记录") async def run_realtime_monitoring(self): """运行实时监控""" async with StockDataFetcher(max_concurrent=5) as fetcher: self.fetcher = fetcher # 启动WebSocket客户端 self.websocket_client = StockWebSocketClient(self.websocket_callback) websocket_task = asyncio.create_task( self.websocket_client.connect(self.symbols) ) # 定期获取HTTP数据作为备份 while self.running: try: # 每10秒获取一次HTTP数据 await asyncio.sleep(10) realtime_data = await fetcher.fetch_sina_realtime(self.symbols) if realtime_data: await self.db.save_realtime_data(realtime_data) logger.debug(f"HTTP备份数据已保存,{len(realtime_data)} 条记录") except Exception as e: logger.error(f"实时监控出错: {e}") # 清理 if self.websocket_client: await self.websocket_client.disconnect() async def run(self): """运行主程序""" self.running = True logger.info("股票监控系统启动...") logger.info(f"监控股票: {self.symbols}") # 获取历史数据 logger.info("开始获取历史数据...") await self.fetch_historical_data() # 启动实时监控 logger.info("启动实时监控...") await self.run_realtime_monitoring() # 配置和运行 async def main(): config = { 'symbols': ['000001', '000002', '600519', '000858', '300750'], 'alpha_vantage_key': 'YOUR_API_KEY_HERE', # 替换为你的API密钥 'database_path': 'stocks.db' } monitor = StockMonitorSystem(config) await monitor.run() if __name__ == "__main__": # Windows系统需要设置事件循环策略 if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) try: asyncio.run(main()) except KeyboardInterrupt: print("\n程序已退出")

六、性能优化与注意事项

6.1 性能优化建议

  1. 连接池管理: 使用aiohttp的连接池减少连接开销

  2. 数据缓存: 对不常变的数据进行缓存

  3. 批量操作: 数据库操作使用批量插入

  4. 错误重试: 实现指数退避重试机制

  5. 内存管理: 及时清理不再需要的数据

6.2 添加重试机制

python

import asyncio from functools import wraps import random def retry(max_attempts=3, delay_range=(1, 5)): """重试装饰器""" def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_attempts): try: return await func(*args, **kwargs) except Exception as e: last_exception = e if attempt < max_attempts - 1: delay = random.uniform(*delay_range) * (2 ** attempt) logger.warning(f"重试 {func.__name__}, 第{attempt+1}次重试, 等待{delay:.1f}秒") await asyncio.sleep(delay) raise last_exception return wrapper return decorator

七、数据可视化示例

python

import matplotlib.pyplot as plt import plotly.graph_objects as go from plotly.subplots import make_subplots class StockVisualizer: """股票数据可视化""" @staticmethod def plot_realtime_prices(db_path: str, symbol: str): """绘制实时价格图""" db = StockDatabase(db_path) df = db.get_latest_data(symbol, limit=100) if df.empty: print("没有数据可显示") return fig = make_subplots( rows=2, cols=1, subplot_titles=('价格走势', '成交量'), vertical_spacing=0.1, row_heights=[0.7, 0.3] ) fig.add_trace( go.Scatter(x=df['timestamp'], y=df['price'], mode='lines', name='价格'), row=1, col=1 ) fig.add_trace( go.Bar(x=df['timestamp'], y=df['volume'], name='成交量'), row=2, col=1 ) fig.update_layout( title=f'{symbol} 实时数据', hovermode='x unified', showlegend=True ) fig.show()

八、法律与道德注意事项

  1. 遵守网站规则: 检查robots.txt,遵守爬取频率限制

  2. 数据使用: 仅用于个人学习和研究

  3. API限制: 遵守API服务商的使用条款

  4. 隐私保护: 不获取个人隐私数据

  5. 商业使用: 如需商业使用,请购买正规数据服务

九、总结

本文详细介绍了如何构建一个完整的Python股票数据实时抓取系统。我们使用了以下关键技术:

  1. 异步编程: 提高数据抓取效率

  2. WebSocket: 实现真正的实时数据推送

  3. 多数据源: 确保数据可靠性

  4. 数据库管理: 有效存储和查询数据

  5. 错误处理: 增强系统稳定性

这个系统可以根据需求进一步扩展,例如添加:

  • 技术指标计算

  • 价格预警功能

  • 自动交易策略

  • 移动端通知

  • 大数据分析

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/2 17:09:43

ANSYS小白必看:2022R1最简单安装教程

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 创建一个面向新手的ANSYS 2022R1安装指导应用&#xff0c;功能包括&#xff1a;1) 术语解释小词典&#xff1b;2) 安装进度可视化&#xff1b;3) 一键求助功能&#xff1b;4) 安装…

作者头像 李华
网站建设 2026/2/2 16:30:22

[大模型架构] LangGraph AI 工作流编排(6)

一、前端交互层技术选型与环境搭建作为 AI 工作流编排工具的 “用户入口”&#xff0c;前端交互层需兼顾 “可视化操作便捷性” 与 “功能扩展性”&#xff0c;本集大概率首先明确技术选型&#xff0c;完成基础开发环境搭建&#xff0c;适配工作流编排的界面需求&#xff1a;&a…

作者头像 李华
网站建设 2026/1/28 10:37:07

如何高效批量制作桌游卡牌:CardEditor免费开源工具完整指南

如何高效批量制作桌游卡牌&#xff1a;CardEditor免费开源工具完整指南 【免费下载链接】CardEditor 一款专为桌游设计师开发的批处理数值填入卡牌生成器/A card batch generator specially developed for board game designers 项目地址: https://gitcode.com/gh_mirrors/ca…

作者头像 李华
网站建设 2026/1/29 4:16:33

突破AIGC重复瓶颈:十大高效工具评测与核心理论详解

核心工具对比速览 工具名称 核心功能 适用场景 处理速度 特色优势 aibiye 降AIGC率查重 学术论文优化 20分钟 适配知网/格子达/维普规则 aicheck AIGC检测 风险区域识别 实时 可视化热力图报告 askpaper 学术内容优化 论文降重 20分钟 保留专业术语 秒篇 …

作者头像 李华
网站建设 2026/2/2 12:37:42

婚恋平台反欺诈:用MGeo预训练模型识别虚假定位

婚恋平台反欺诈&#xff1a;用MGeo预训练模型识别虚假定位 在社交和婚恋平台运营中&#xff0c;虚假定位信息是一个常见但棘手的问题。当用户声称位于"朝阳区"而实际IP显示在"昌平区"时&#xff0c;这不仅影响用户体验&#xff0c;还可能涉及欺诈行为。本文…

作者头像 李华
网站建设 2026/1/25 18:06:39

1小时搞定PLC控制原型:快马平台实战

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 创建一个快速PLC原型开发工具&#xff0c;能够&#xff1a;1) 通过拖拽方式搭建控制逻辑框图&#xff1b;2) 自动生成可运行的PLC代码&#xff1b;3) 提供虚拟PLC运行环境进行即时…

作者头像 李华