Python+QMT全自动获取股票Tick数据实战指南
在量化交易领域,获取高质量的Tick级数据是构建有效策略的基础。传统手动下载方式不仅效率低下,还容易出错。本文将手把手教你如何用Python调用国信QMT的get_market_data_ex接口,实现股票历史Tick数据的全自动获取与处理。
1. 环境准备与QMT配置
1.1 安装必要依赖
确保你的Python环境已安装以下基础库:
pip install pandas numpy pytz注意:QMT客户端通常自带Python环境,建议使用其内置解释器以避免兼容性问题
1.2 QMT权限配置
- 登录QMT交易终端
- 在"系统设置"中开启"量化交易权限"
- 申请"历史数据下载"权限(需券商审核)
不同券商权限开放程度可能不同,建议提前联系客户经理确认
2. 核心API详解与参数设置
2.1 get_market_data_ex接口解析
国信QMT提供的关键数据获取接口:
data = C.get_market_data_ex( fields=['open','high','low','close','volume','amount'], stock_code=['600000.SH'], period='tick', start_time='20240520093000', end_time='20240520150000', count=-1, dividend_type='follow', fill_data=True, subscribe=False )关键参数说明:
| 参数名 | 类型 | 说明 | 典型值 |
|---|---|---|---|
| period | str | 数据周期 | 'tick'(分笔)/'1m'(1分钟) |
| start_time | str | 开始时间 | 'YYYYMMDDHHMMSS' |
| fill_data | bool | 是否填充缺失数据 | True/False |
| count | int | 获取数据条数 | -1(全部) |
2.2 时间格式避坑指南
常见错误:直接使用datetime.now()生成的时间格式不兼容
正确做法:
from datetime import datetime def format_qmt_time(dt): return dt.strftime('%Y%m%d%H%M%S') # 获取今天9:30-11:30的数据 start = datetime.now().replace(hour=9, minute=30, second=0) end = datetime.now().replace(hour=11, minute=30, second=0) formatted_start = format_qmt_time(start) formatted_end = format_qmt_time(end)3. 完整数据获取流程实现
3.1 基础数据获取框架
# encoding: utf-8 import pandas as pd from pytz import timezone def init(C): # 设置显示选项 pd.set_option('display.max_rows', 500) pd.set_option('display.max_columns', 10) pd.set_option('display.width', 1000) # 获取平安银行tick数据 stock_code = '000001.SZ' trade_date = '20240520' tick_data = get_history_ticks( C, stock_code=stock_code, date_str=trade_date ) # 计算逐笔成交额变化 tick_data['delta_amount'] = tick_data['amount'].diff() print(tick_data.head(10)) def get_history_ticks(C, stock_code, date_str): """获取单日全部tick数据""" start_time = f"{date_str}093000" end_time = f"{date_str}150000" data = C.get_market_data_ex( ['price', 'volume', 'amount', 'bid', 'ask'], [stock_code], period='tick', start_time=start_time, end_time=end_time, count=-1, fill_data=False ) return data[stock_code]3.2 多日数据批量获取
def batch_get_ticks(C, stock_code, start_date, end_date): """获取多日tick数据""" date_range = pd.date_range(start_date, end_date) all_data = [] for date in date_range: if date.weekday() >= 5: # 跳过周末 continue date_str = date.strftime('%Y%m%d') try: daily_data = get_history_ticks(C, stock_code, date_str) daily_data['trade_date'] = date_str all_data.append(daily_data) except Exception as e: print(f"获取{date_str}数据失败: {str(e)}") return pd.concat(all_data) if all_data else None4. 高级数据处理技巧
4.1 Tick数据清洗与校验
常见数据问题处理方案:
异常值过滤:
def filter_abnormal_ticks(df): # 价格异常 df = df[(df['price'] > 0) & (df['price'] < df['price'].quantile(0.999))] # 成交量异常 df = df[df['volume'] < df['volume'].quantile(0.99)] return df时间连续性检查:
def check_time_gaps(df): df['datetime'] = pd.to_datetime(df.index) time_diff = df['datetime'].diff().dt.total_seconds() gaps = time_diff[time_diff > 5] # 超过5秒间隔 return gaps
4.2 成交快照重构
从逐笔数据重构盘口状态:
def reconstruct_orderbook(tick_df): book = { 'ask1': None, 'ask_vol1': None, 'bid1': None, 'bid_vol1': None } snapshots = [] for idx, row in tick_df.iterrows(): if not pd.isna(row['bid']): book['bid1'] = row['bid'][0][0] book['bid_vol1'] = row['bid'][0][1] if not pd.isna(row['ask']): book['ask1'] = row['ask'][0][0] book['ask_vol1'] = row['ask'][0][1] snapshot = book.copy() snapshot['price'] = row['price'] snapshot['volume'] = row['volume'] snapshot['time'] = idx snapshots.append(snapshot) return pd.DataFrame(snapshots).set_index('time')5. 性能优化与实战建议
5.1 数据获取加速技巧
并行下载:对多只股票使用线程池
from concurrent.futures import ThreadPoolExecutor def parallel_download(C, stock_list, date_str): with ThreadPoolExecutor(max_workers=5) as executor: futures = { stock: executor.submit(get_history_ticks, C, stock, date_str) for stock in stock_list } return {k: v.result() for k, v in futures.items()}增量更新:只获取最新数据
def get_incremental_ticks(C, stock_code, last_time): now = datetime.now(timezone('Asia/Shanghai')) start_time = format_qmt_time(last_time) end_time = format_qmt_time(now) return get_history_ticks(C, stock_code, start_time, end_time)
5.2 数据存储方案对比
| 存储方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| CSV | 简单直观 | 读写慢 | 小规模数据 |
| HDF5 | 高速IO | 单文件限制 | 中型数据集 |
| Parquet | 列式存储 | 需要额外库 | 大规模数据 |
| 数据库 | 易查询 | 需要维护 | 生产环境 |
推荐方案:
# 使用PyArrow保存Parquet tick_data.to_parquet('ticks.parquet', engine='pyarrow') # 带压缩版本 tick_data.to_parquet( 'ticks_compressed.parquet', engine='pyarrow', compression='snappy' )在实际项目中,这套自动化方案将数据获取时间从原来的小时级缩短到分钟级,且完全避免了人工操作可能带来的错误。一个特别实用的技巧是建立数据校验机制,在每天收盘后自动验证当日数据的完整性和准确性。