做量化交易的人都知道,回测系统的核心不是策略有多花哨,而是数据有多可靠。
如果历史行情数据本身就有问题,那么再完美的回测结果也只是“垃圾进,垃圾出”。
本文从实战出发,聊聊如何通过 API 批量拉取历史行情数据,并做一套严谨的回测数据清洗流程。这些坑,我都踩过。
一、为什么历史行情数据这么难搞?
很多人以为历史行情就是“股票代码+日期+开高低收+成交量”。真上手才发现,问题一大堆:
- 不同数据源格式不同,有的前复权、有的后复权、有的不复权
- 停牌日、除权除息日、涨跌停板数据容易被忽略
- API 限流、断点续传、数据缺失需要处理
- 国内 A 股、美股、期货的数据格式和规则差异巨大
一个合格的量化回测系统,必须能从源头保证数据的完整性、一致性、无偏性。
二、批量拉取的工程设计
2.1 基础思路
不要一次性拉全部历史数据,更不要写死日期。合理的设计应该是:
配置股票池 → 判断本地已有数据 → 只拉缺失区间 → 合并去重 → 校验一致性2.2 代码示例:带断点续传的批量拉取
下面使用 iTick API 获取历史日线数据(前复权),并实现本地缓存与断点续传。
importrequestsimportpandasaspdimporttimefrompathlibimportPath API_TOKEN="your_token_here"# 替换为实际 TokenBASE_URL="https://api.itick.org"defbuild_headers():"""构造请求头,包含 API Token 验证"""return{"token":API_TOKEN,"Content-Type":"application/json"}deffetch_stock_history(stock_code,region="HK",k_type=8,start_date="20000101",end_date="20231231",cache_dir="./data/raw"):""" 带缓存的批量拉取,自动断点续传 参数说明: stock_code : 股票代码(港股示例:00700) region : 市场代码(HK/US/SZ/SH 等) k_type : K线类型(8:日线,9:周线,10:月线) start_date : 开始日期(格式 YYYYMMDD) end_date : 结束日期(格式 YYYYMMDD) cache_dir : 本地缓存目录 """Path(cache_dir).mkdir(parents=True,exist_ok=True)cache_file=Path(cache_dir)/f"{stock_code}.parquet"# 已有数据则加载,仅拉取缺失区间ifcache_file.exists():df_old=pd.read_parquet(cache_file)df_old['trade_date']=pd.to_datetime(df_old['trade_date'])last_date=df_old['trade_date'].max()start_date=(last_date+pd.Timedelta(days=1)).strftime('%Y%m%d')ifstart_date>end_date:returndf_oldprint(f"{stock_code}: 本地已有数据至{last_date.date()},开始增量拉取...")else:df_old=pd.DataFrame()# 将日期范围转换为时间戳(iTick kType 模式下需通过 et 参数控制截止)start_ts=int(pd.Timestamp(start_date).timestamp())end_ts=int(pd.Timestamp(end_date).timestamp())all_data=[]current_end_ts=end_ts batch_days=100# 每批最多拉取约 100 个交易日whileTrue:# 计算当前批次的起始截止区间(基于天数回推)batch_start_ts=max(start_ts,current_end_ts-batch_days*86400)params={"region":region,"code":stock_code,"kType":k_type,"limit":500,# 每次最多返回 500 根 K 线"et":current_end_ts}try:url=f"{BASE_URL}/stock/kline"resp=requests.get(url,headers=build_headers(),params=params,timeout=15)ifresp.status_code!=200:print(f"拉取失败:{stock_code}, 状态码{resp.status_code}")time.sleep(2)continuedata=resp.json()ifdata.get("code")==0anddata.get("data"):batch_data=data["data"]all_data.extend(batch_data)print(f"{stock_code}: 拉取到{len(batch_data)}条数据")# 判断是否还有更早的数据earliest_ts=batch_data[-1].get("t",0)ifbatch_dataelse0ifearliest_ts<=start_tsorlen(batch_data)<500:breakcurrent_end_ts=earliest_ts-86400# 继续拉取更早数据else:print(f"拉取失败:{stock_code}, 错误信息:{data.get('msg')}")breaktime.sleep(0.5)# 限流控制exceptExceptionase:print(f"拉取异常:{stock_code}, 错误:{e}")time.sleep(5)continueifnotall_data:returndf_old# 数据转换与合并df_new=pd.DataFrame(all_data)# 将时间戳转换为日期df_new['trade_date']=pd.to_datetime(df_new['t'],unit='s')# 重命名字段为统一格式df_new=df_new.rename(columns={'o':'open','h':'high','l':'low','c':'close','v':'volume'})df_new=df_new[['trade_date','open','high','low','close','volume']]df_combined=pd.concat([df_old,df_new],ignore_index=True)ifnotdf_old.emptyelsedf_new df_combined=df_combined.drop_duplicates(subset=['trade_date']).sort_values('trade_date')df_combined.to_parquet(cache_file,index=False)print(f"{stock_code}: 数据保存至{cache_file}, 共计{len(df_combined)}条")returndf_combined这个函数做了几件关键的事:
- 检查本地缓存(Parquet 格式),只拉取缺失区间
- 通过
limit和分批区间控制拉取量,支持大量历史数据的自动分页 - 异常重试与限流睡眠
- 时间戳自动转换为标准化日期字段
2.3 多股票并发拉取
单线程循环拉取效率较低,可使用线程池实现并发,但仍需控制并发数以避免 API 限流:
fromconcurrent.futuresimportThreadPoolExecutor,as_completeddeffetch_batch(stock_list,region="HK",max_workers=3):""" 批量拉取多只股票的历史数据 max_workers: 并发数建议 ≤ 5,防止被限流 """results={}withThreadPoolExecutor(max_workers=max_workers)asexecutor:futures={executor.submit(fetch_stock_history,code,region):codeforcodeinstock_list}forfutureinas_completed(futures):code=futures[future]try:results[code]=future.result()print(f"{code}: 拉取完成")exceptExceptionase:print(f"{code}: 拉取失败, 错误:{e}")returnresults并发数建议不超过5,否则容易被数据源封禁。
三、回测数据清洗 Checklist
拉下来的原始数据,离直接用于回测还差好几步。这是我总结的清洗流程,每一步都不能省。
3.1 时间轴处理
# 确保交易日连续,无跳空defalign_trading_days(df,trading_calendar=None):df['trade_date']=pd.to_datetime(df['trade_date'])df=df.sort_values('trade_date').set_index('trade_date')iftrading_calendarisNone:# 生成完整日历(工作日频率)full_calendar=pd.date_range(start=df.index.min(),end=df.index.max(),freq='B')else:full_calendar=trading_calendar df=df.reindex(full_calendar)returndf用工作日频率(freq='B')生成完整日历,缺失日期会自动填入 NaN,后续再填充或标记。
3.2 除权除息与复权统一
这是最大的坑!
很多新手直接用不复权数据做回测,结果会发现某天价格突然跳空低开 30%(实际上是除权),策略却以为是大跌而错误开平仓。
最佳实践:全程使用前复权(qfq)数据,保持历史价格连续可比。但要注意,前复权会导致早期价格出现负数(极端分红),需要做截断处理:
# 剔除前复权后的负价格或极小价格df=df[(df['close']>0.01)&(df['high']>0.01)]3.3 涨跌停板标记
回测时,如果策略根据信号在涨停价买入,实际根本无法成交。需要提前标记:
# 计算涨跌停价(A股主板±10%,科创/创业±20%,港股无涨跌停板限制)defcalc_limit_prices(df,stock_code):# 根据股票代码判断市场ifstock_code.startswith('688')orstock_code.startswith('300'):limit_pct=0.20# 科创板/创业板elifstock_code.startswith('600')orstock_code.startswith('000'):limit_pct=0.10# A股主板else:# 港股无涨跌停板限制,直接返回df['is_limit_up']=Falsedf['is_limit_down']=Falsereturndf df['prev_close']=df['close'].shift(1)df['upper_limit']=df['prev_close']*(1+limit_pct)df['lower_limit']=df['prev_close']*(1-limit_pct)# 标记一字板df['is_limit_up']=(df['open']>=df['upper_limit']-0.001)&(df['close']>=df['upper_limit']-0.001)df['is_limit_down']=(df['open']<=df['lower_limit']+0.001)&(df['close']<=df['lower_limit']+0.001)returndf回测执行时,遇到is_limit_up且为买入信号,应跳过或转换策略。
3.4 停牌数据处理
停牌期间,没有成交,不应填充为前一日价格(会导致回测出现不合理收益)。正确做法:
# 停牌日成交量应该为0或NaN,不做前向填充df['volume']=df['volume'].fillna(0)# 对于价格字段,停牌日保持NaN,后续回测引擎遇到NaN应直接跳过该日3.5 数据对齐(多股票回测)
多股票回测时,需将所有股票对齐到同一个交易日历:
defalign_multi_stocks(stock_dfs,trading_days):""" stock_dfs: dict {code: DataFrame} trading_days: 交易日列表(pd.DatetimeIndex) """aligned={}forcode,dfinstock_dfs.items():df_aligned=df.set_index('trade_date').reindex(trading_days)aligned[code]=df_alignedreturnaligned四、数据质量校验
清洗完毕后,一定要跑一遍自动化校验:
defvalidate_data(df,stock_code):checks={"是否有重复日期":df.index.duplicated().sum()==0,"是否有空价格":df[['open','high','low','close']].isna().any().any()==False,"最低价是否高于最高价":(df['low']<=df['high']).all(),"成交量是否非负":(df['volume']>=0).all(),"价格序列是否单调异常":((df['close']-df['close'].shift(1)).abs()/df['close'].shift(1)<0.2).all(),# 除去涨跌停}forname,resultinchecks.items():print(f"{stock_code}-{name}:{'通过'ifresultelse'失败'}")returnall(checks.values())五、存储与版本管理建议
- 格式:强烈推荐Parquet或Feather,比 CSV 快 10 倍以上,且占用空间小。
- 目录结构:
data/ raw/ # 原始API拉取数据(按股票保存) cleaned/ # 清洗后数据(已复权、对齐、填充) meta/ # 股票列表、交易日历、除权因子备份 - 版本控制:历史数据不要放 Git,用DVC(Data Version Control)或直接云存储(S3、OSS)。
六、个人建议
- 永远保留原始拉取数据,清洗脚本可重复执行。否则哪天发现清洗逻辑错了,你还得全部重拉。
- 不要完美主义。回测数据做不到 100%精确,但必须保证无偏性(误差在买卖双方随机出现)。
- 先验小样本。对某只股票拉 3 年数据,手动核对除权除息日、涨跌停日,确信流程正确后再批量跑。
- 备胎数据源。核心股票池至少准备两个数据源交叉验证。
最后,记住一句话:回测是用来排除坏策略的,不是用来证明好策略的。而这一切的起点,就是靠谱的历史行情数据。希望这篇文章能帮你少走弯路。
参考文档:https://docs.itick.org/websocket/stocks
GitHub:https://github.com/itick-org/