news 2026/5/26 19:16:00

量化交易系统:历史行情 API 批量拉取与回测数据清洗

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
量化交易系统:历史行情 API 批量拉取与回测数据清洗

做量化交易的人都知道,回测系统的核心不是策略有多花哨,而是数据有多可靠
如果历史行情数据本身就有问题,那么再完美的回测结果也只是“垃圾进,垃圾出”。

本文从实战出发,聊聊如何通过 API 批量拉取历史行情数据,并做一套严谨的回测数据清洗流程。这些坑,我都踩过。

一、为什么历史行情数据这么难搞?

很多人以为历史行情就是“股票代码+日期+开高低收+成交量”。真上手才发现,问题一大堆:

  • 不同数据源格式不同,有的前复权、有的后复权、有的不复权
  • 停牌日、除权除息日、涨跌停板数据容易被忽略
  • API 限流、断点续传、数据缺失需要处理
  • 国内 A 股、美股、期货的数据格式和规则差异巨大

一个合格的量化回测系统,必须能从源头保证数据的完整性、一致性、无偏性

二、批量拉取的工程设计

2.1 基础思路

不要一次性拉全部历史数据,更不要写死日期。合理的设计应该是:

配置股票池 → 判断本地已有数据 → 只拉缺失区间 → 合并去重 → 校验一致性

2.2 代码示例:带断点续传的批量拉取

下面使用 iTick API 获取历史日线数据(前复权),并实现本地缓存与断点续传。

importrequestsimportpandasaspdimporttimefrompathlibimportPath API_TOKEN="your_token_here"# 替换为实际 TokenBASE_URL="https://api.itick.org"defbuild_headers():"""构造请求头,包含 API Token 验证"""return{"token":API_TOKEN,"Content-Type":"application/json"}deffetch_stock_history(stock_code,region="HK",k_type=8,start_date="20000101",end_date="20231231",cache_dir="./data/raw"):""" 带缓存的批量拉取,自动断点续传 参数说明: stock_code : 股票代码(港股示例:00700) region : 市场代码(HK/US/SZ/SH 等) k_type : K线类型(8:日线,9:周线,10:月线) start_date : 开始日期(格式 YYYYMMDD) end_date : 结束日期(格式 YYYYMMDD) cache_dir : 本地缓存目录 """Path(cache_dir).mkdir(parents=True,exist_ok=True)cache_file=Path(cache_dir)/f"{stock_code}.parquet"# 已有数据则加载,仅拉取缺失区间ifcache_file.exists():df_old=pd.read_parquet(cache_file)df_old['trade_date']=pd.to_datetime(df_old['trade_date'])last_date=df_old['trade_date'].max()start_date=(last_date+pd.Timedelta(days=1)).strftime('%Y%m%d')ifstart_date>end_date:returndf_oldprint(f"{stock_code}: 本地已有数据至{last_date.date()},开始增量拉取...")else:df_old=pd.DataFrame()# 将日期范围转换为时间戳(iTick kType 模式下需通过 et 参数控制截止)start_ts=int(pd.Timestamp(start_date).timestamp())end_ts=int(pd.Timestamp(end_date).timestamp())all_data=[]current_end_ts=end_ts batch_days=100# 每批最多拉取约 100 个交易日whileTrue:# 计算当前批次的起始截止区间(基于天数回推)batch_start_ts=max(start_ts,current_end_ts-batch_days*86400)params={"region":region,"code":stock_code,"kType":k_type,"limit":500,# 每次最多返回 500 根 K 线"et":current_end_ts}try:url=f"{BASE_URL}/stock/kline"resp=requests.get(url,headers=build_headers(),params=params,timeout=15)ifresp.status_code!=200:print(f"拉取失败:{stock_code}, 状态码{resp.status_code}")time.sleep(2)continuedata=resp.json()ifdata.get("code")==0anddata.get("data"):batch_data=data["data"]all_data.extend(batch_data)print(f"{stock_code}: 拉取到{len(batch_data)}条数据")# 判断是否还有更早的数据earliest_ts=batch_data[-1].get("t",0)ifbatch_dataelse0ifearliest_ts<=start_tsorlen(batch_data)<500:breakcurrent_end_ts=earliest_ts-86400# 继续拉取更早数据else:print(f"拉取失败:{stock_code}, 错误信息:{data.get('msg')}")breaktime.sleep(0.5)# 限流控制exceptExceptionase:print(f"拉取异常:{stock_code}, 错误:{e}")time.sleep(5)continueifnotall_data:returndf_old# 数据转换与合并df_new=pd.DataFrame(all_data)# 将时间戳转换为日期df_new['trade_date']=pd.to_datetime(df_new['t'],unit='s')# 重命名字段为统一格式df_new=df_new.rename(columns={'o':'open','h':'high','l':'low','c':'close','v':'volume'})df_new=df_new[['trade_date','open','high','low','close','volume']]df_combined=pd.concat([df_old,df_new],ignore_index=True)ifnotdf_old.emptyelsedf_new df_combined=df_combined.drop_duplicates(subset=['trade_date']).sort_values('trade_date')df_combined.to_parquet(cache_file,index=False)print(f"{stock_code}: 数据保存至{cache_file}, 共计{len(df_combined)}条")returndf_combined

这个函数做了几件关键的事:

  • 检查本地缓存(Parquet 格式),只拉取缺失区间
  • 通过limit和分批区间控制拉取量,支持大量历史数据的自动分页
  • 异常重试与限流睡眠
  • 时间戳自动转换为标准化日期字段

2.3 多股票并发拉取

单线程循环拉取效率较低,可使用线程池实现并发,但仍需控制并发数以避免 API 限流:

fromconcurrent.futuresimportThreadPoolExecutor,as_completeddeffetch_batch(stock_list,region="HK",max_workers=3):""" 批量拉取多只股票的历史数据 max_workers: 并发数建议 ≤ 5,防止被限流 """results={}withThreadPoolExecutor(max_workers=max_workers)asexecutor:futures={executor.submit(fetch_stock_history,code,region):codeforcodeinstock_list}forfutureinas_completed(futures):code=futures[future]try:results[code]=future.result()print(f"{code}: 拉取完成")exceptExceptionase:print(f"{code}: 拉取失败, 错误:{e}")returnresults

并发数建议不超过5,否则容易被数据源封禁。

三、回测数据清洗 Checklist

拉下来的原始数据,离直接用于回测还差好几步。这是我总结的清洗流程,每一步都不能省。

3.1 时间轴处理

# 确保交易日连续,无跳空defalign_trading_days(df,trading_calendar=None):df['trade_date']=pd.to_datetime(df['trade_date'])df=df.sort_values('trade_date').set_index('trade_date')iftrading_calendarisNone:# 生成完整日历(工作日频率)full_calendar=pd.date_range(start=df.index.min(),end=df.index.max(),freq='B')else:full_calendar=trading_calendar df=df.reindex(full_calendar)returndf

用工作日频率(freq='B')生成完整日历,缺失日期会自动填入 NaN,后续再填充或标记。

3.2 除权除息与复权统一

这是最大的坑!

很多新手直接用不复权数据做回测,结果会发现某天价格突然跳空低开 30%(实际上是除权),策略却以为是大跌而错误开平仓。

最佳实践:全程使用前复权(qfq)数据,保持历史价格连续可比。但要注意,前复权会导致早期价格出现负数(极端分红),需要做截断处理:

# 剔除前复权后的负价格或极小价格df=df[(df['close']>0.01)&(df['high']>0.01)]

3.3 涨跌停板标记

回测时,如果策略根据信号在涨停价买入,实际根本无法成交。需要提前标记:

# 计算涨跌停价(A股主板±10%,科创/创业±20%,港股无涨跌停板限制)defcalc_limit_prices(df,stock_code):# 根据股票代码判断市场ifstock_code.startswith('688')orstock_code.startswith('300'):limit_pct=0.20# 科创板/创业板elifstock_code.startswith('600')orstock_code.startswith('000'):limit_pct=0.10# A股主板else:# 港股无涨跌停板限制,直接返回df['is_limit_up']=Falsedf['is_limit_down']=Falsereturndf df['prev_close']=df['close'].shift(1)df['upper_limit']=df['prev_close']*(1+limit_pct)df['lower_limit']=df['prev_close']*(1-limit_pct)# 标记一字板df['is_limit_up']=(df['open']>=df['upper_limit']-0.001)&(df['close']>=df['upper_limit']-0.001)df['is_limit_down']=(df['open']<=df['lower_limit']+0.001)&(df['close']<=df['lower_limit']+0.001)returndf

回测执行时,遇到is_limit_up且为买入信号,应跳过或转换策略。

3.4 停牌数据处理

停牌期间,没有成交,不应填充为前一日价格(会导致回测出现不合理收益)。正确做法:

# 停牌日成交量应该为0或NaN,不做前向填充df['volume']=df['volume'].fillna(0)# 对于价格字段,停牌日保持NaN,后续回测引擎遇到NaN应直接跳过该日

3.5 数据对齐(多股票回测)

多股票回测时,需将所有股票对齐到同一个交易日历:

defalign_multi_stocks(stock_dfs,trading_days):""" stock_dfs: dict {code: DataFrame} trading_days: 交易日列表(pd.DatetimeIndex) """aligned={}forcode,dfinstock_dfs.items():df_aligned=df.set_index('trade_date').reindex(trading_days)aligned[code]=df_alignedreturnaligned

四、数据质量校验

清洗完毕后,一定要跑一遍自动化校验:

defvalidate_data(df,stock_code):checks={"是否有重复日期":df.index.duplicated().sum()==0,"是否有空价格":df[['open','high','low','close']].isna().any().any()==False,"最低价是否高于最高价":(df['low']<=df['high']).all(),"成交量是否非负":(df['volume']>=0).all(),"价格序列是否单调异常":((df['close']-df['close'].shift(1)).abs()/df['close'].shift(1)<0.2).all(),# 除去涨跌停}forname,resultinchecks.items():print(f"{stock_code}-{name}:{'通过'ifresultelse'失败'}")returnall(checks.values())

五、存储与版本管理建议

  • 格式:强烈推荐ParquetFeather,比 CSV 快 10 倍以上,且占用空间小。
  • 目录结构
    data/ raw/ # 原始API拉取数据(按股票保存) cleaned/ # 清洗后数据(已复权、对齐、填充) meta/ # 股票列表、交易日历、除权因子备份
  • 版本控制:历史数据不要放 Git,用DVC(Data Version Control)或直接云存储(S3、OSS)。

六、个人建议

  1. 永远保留原始拉取数据,清洗脚本可重复执行。否则哪天发现清洗逻辑错了,你还得全部重拉。
  2. 不要完美主义。回测数据做不到 100%精确,但必须保证无偏性(误差在买卖双方随机出现)。
  3. 先验小样本。对某只股票拉 3 年数据,手动核对除权除息日、涨跌停日,确信流程正确后再批量跑。
  4. 备胎数据源。核心股票池至少准备两个数据源交叉验证。

最后,记住一句话:回测是用来排除坏策略的,不是用来证明好策略的。而这一切的起点,就是靠谱的历史行情数据。希望这篇文章能帮你少走弯路。

参考文档:https://docs.itick.org/websocket/stocks
GitHub:https://github.com/itick-org/

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/26 19:14:05

从OpenWrt拨号异常到网络畅通:一次MTU值的精准调优实战

1. 当OpenWrt拨号变得异常缓慢时 最近给家里的路由器刷了最新的OpenWrt固件&#xff0c;本以为能享受更强大的功能和更稳定的网络&#xff0c;结果却遇到了一个让人头疼的问题&#xff1a;ADSL拨号变得异常缓慢。WAN口经常连接不上&#xff0c;有时候要等好几分钟才能勉强连上…

作者头像 李华
网站建设 2026/5/26 19:13:50

Print.js完整指南:5分钟掌握网页打印的最佳实践

Print.js完整指南&#xff1a;5分钟掌握网页打印的最佳实践 【免费下载链接】Print.js A tiny javascript library to help printing from the web. 项目地址: https://gitcode.com/gh_mirrors/pr/Print.js 想要在网页应用中实现专业级的打印功能吗&#xff1f;Print.js…

作者头像 李华
网站建设 2026/5/26 19:07:06

这份榜单够用!盘点2026年全网顶尖的的降AIGC平台

轻松降低论文AI率在2026年已不再是天方夜谭。以下是2026年最炸裂、实测效果显著的降AIGC平台神器&#xff0c;覆盖AI痕迹消除、文本改写润色、降重优化、学术合规检测四大核心场景&#xff0c;帮你稳妥搞定毕业论文。 一、全流程王者&#xff1a;一站式搞定论文全链路 这类工…

作者头像 李华
网站建设 2026/5/26 19:04:24

基于ENS210传感器与Arduino的高精度露点监测仪设计与实现

1. 项目概述&#xff1a;为什么我们需要精确测量露点&#xff1f;在电子制作和智能家居领域&#xff0c;温湿度传感器项目很常见&#xff0c;但大多数都停留在显示当前环境数值的层面。然而&#xff0c;真正影响我们生活质量的&#xff0c;往往不是单一的温湿度读数&#xff0c…

作者头像 李华