news 2026/2/3 6:37:43

金融数据实时行情API使用教程:如何跨市场查询多品种的实时行情数据

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
金融数据实时行情API使用教程:如何跨市场查询多品种的实时行情数据

在量化交易中,获取准确、及时的行情数据是策略执行的基础。本文将从概念到实操,详细介绍如何接入实时行情API,并给出具体代码示例。 如果您有查询股票、外汇、期货等多市场的需求,这篇内容你一定不能错过。

一、实时行情与延迟行情的区别

在交易系统中,行情数据通常分为两类:实时行情延迟行情

实时行情:

  • 数据尽可能接近市场发生的价格变动,延迟通常在毫秒级或秒级。

  • 对高频交易和短线策略尤为重要,因为价格的微小波动可能直接影响交易决策。

  • 优点:及时、精准;缺点:通常需要付费,并对系统性能有较高要求。

延迟行情:

  • 数据通常延迟几分钟到几十分钟,比如免费行情或部分财经网站提供的延迟数据。

  • 优点:可免费获取,系统压力小;

  • 缺点:对高频或精细策略几乎无用。

二、综合行情接口介绍

在量化交易中,我们可能需要跨市场、跨品种获取行情。传统接口往往局限于单市场或单品种,而综合行情接口可以一次性查询多个市场、多种资产类型的行情数据,包括:

  • A股、港股、美股

  • 外汇、加密货币

  • 期货等

例如,Infoway API提供的综合行情接口,可以让你在一次请求中获取多市场行情,大大简化数据管理和策略实现。

三、接入实时行情API

以 Infoway API 的批量K线查询接口为例,介绍如何获取实时K线数据。

3.1 HTTP接口请求说明

请求地址模板:

https://data.infoway.io/stock/v2/batch_kline/{klineType}/{klineNum}/{codes}

参数说明:

参数类型描述
klineTypeintK线周期:1=1分钟, 2=5分钟, 3=15分钟, 4=30分钟, 5=1小时, 6=2小时, 7=4小时, 8=日K, 9=周K, 10=月K, 11=季K, 12=年K
klineNumint查询K线数量,单产品最大500根
codesstring逗号分隔的股票代码,如002594.SZ,00285.HK,TSLA.US

下面是请求示例,我们一次性查询A股、港股和美股的最近10根1分钟K线:

import requests # 接口URL api_url = 'https://data.infoway.io/stock/batch_kline/1/10/002594.SZ%2C00285.HK%2CTSLA.US' # 设置请求头 headers = { 'User-Agent': 'Mozilla/5.0', 'Accept': 'application/json', 'apiKey': 'yourApikey' # API Key申请:www.infoway.io } # 发送GET请求 response = requests.get(api_url, headers=headers) # 输出结果 print(f"HTTP code: {response.status_code}") print(f"message: {response.text}")

3.2 HTTP返回结果解析

返回示例中,每个s对应一个交易品种,respList为对应的K线数组。你可以通过遍历respList获取每根K线的开盘价、收盘价、最高价、最低价及成交量。

{ "ret": 200, "msg": "success", "traceId": "19814db2-42f7-4788-9b51-b2001bf17953", "data": [ { "s": "TSLA.US", "respList": [ { "t": "1751372340", "h": "298.620", "o": "298.439", "l": "298.100", "c": "298.310", "v": "24329", "vw": "7259092.235", "pc": "-0.02%", "pca": "-0.070" }, { "t": "1751372280", "h": "298.450", "o": "298.090", "l": "298.000", "c": "298.380", "v": "32214", "vw": "9607344.900", "pc": "0.10%", "pca": "0.290" } ] }, { "s": "01810.HK", "respList": [ { "t": "1751270400", "h": "59.950", "o": "59.950", "l": "59.950", "c": "59.950", "v": "23669600", "vw": "1418992520.000", "pc": "0.50%", "pca": "0.300" }, { "t": "1751270340", "h": "59.700", "o": "59.650", "l": "59.650", "c": "59.650", "v": "829002", "vw": "49466778.300", "pc": "-0.08%", "pca": "-0.050" } ] } ] }

返回字段说明:

字段名类型描述示例
tstring成交时间(秒时间戳)1751270340
hstring最高价18.01
ostring开盘价18.01
lstring最低价18.01
cstring收盘价18.01
vstring成交量18000
vmstring成交额20000
pcstring涨跌幅0.12%
pcastring涨跌额0.11

四、使用WebSocket获取实时行情

在量化交易中,WebSocket是获取高频、低延迟行情的另一种重要方式,相比HTTP轮询有明显优势。

4.1 WebSocket的优势

实时性更强
WebSocket是长连接协议,一旦建立连接,服务器可以主动推送数据,无需客户端频繁请求。 这对高频交易和策略回测实时性要求高的场景尤其重要。

而且WS能减少请求开销,如果使用HTTP请求(一般是轮询发送),需要频繁发送请求,占用带宽并增加延迟。

反观WebSocket只需建立一次连接,即可持续接收数据,节省网络和计算资源。

WS对于多路订阅也方便,在同一连接上可以同时订阅多个品种的成交明细、盘口数据和K线数据,便于统一管理。

4.2 Python示例解析 (WebSocket)

下面的代码示例展示了如何使用WebSocket订阅实时行情数据,包括成交明细、盘口和K线数据。

import json import time import schedule import threading import websocket from loguru import logger class WebsocketExample: def __init__(self): self.session = None self.ws_url = "wss://data.infoway.io/ws?business=crypto&apikey=yourApikey" self.reconnecting = False self.is_ws_connected = False # 添加连接状态标志 def connect_all(self): """建立WebSocket连接并启动自动重连机制""" try: self.connect(self.ws_url) self.start_reconnection(self.ws_url) except Exception as e: logger.error(f"Failed to connect to {self.ws_url}: {str(e)}") def start_reconnection(self, url): """启动定时重连检查""" def check_connection(): if not self.is_connected(): logger.debug("Reconnection attempt...") self.connect(url) # 使用线程定期检查连接状态 schedule.every(10).seconds.do(check_connection) def run_scheduler(): while True: schedule.run_pending() time.sleep(1) threading.Thread(target=run_scheduler, daemon=True).start() def is_connected(self): """检查WebSocket连接状态""" return self.session and self.is_ws_connected def connect(self, url): """建立WebSocket连接""" try: if self.is_connected(): self.session.close() self.session = websocket.WebSocketApp( url, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close ) # 启动WebSocket连接(非阻塞模式) threading.Thread(target=self.session.run_forever, daemon=True).start() except Exception as e: logger.error(f"Failed to connect to the server: {str(e)}") def on_open(self, ws): """WebSocket连接建立成功后的回调""" logger.info(f"Connection opened") self.is_ws_connected = True # 设置连接状态为True try: # 发送实时成交明细订阅请求 trade_send_obj = { "code": 10000, "trace": "01213e9d-90a0-426e-a380-ebed633cba7a", "data": {"codes": "BTCUSDT"} } self.send_message(trade_send_obj) # 不同请求之间间隔一段时间 time.sleep(5) # 发送实时盘口数据订阅请求 depth_send_obj = { "code": 10003, "trace": "01213e9d-90a0-426e-a380-ebed633cba7a", "data": {"codes": "BTCUSDT"} } self.send_message(depth_send_obj) # 不同请求之间间隔一段时间 time.sleep(5) # 发送实时K线数据订阅请求 kline_data = { "arr": [ { "type": 1, "codes": "BTCUSDT" } ] } kline_send_obj = { "code": 10006, "trace": "01213e9d-90a0-426e-a380-ebed633cba7a", "data": kline_data } self.send_message(kline_send_obj) # 启动定时心跳任务 schedule.every(30).seconds.do(self.ping) except Exception as e: logger.error(f"Error sending initial messages: {str(e)}") def on_message(self, ws, message): """接收消息的回调""" try: logger.info(f"Message received: {message}") except Exception as e: logger.error(f"Error processing message: {str(e)}") def on_close(self, ws, close_status_code, close_msg): """连接关闭的回调""" logger.info(f"Connection closed: {close_status_code} - {close_msg}") self.is_ws_connected = False # 设置连接状态为False def on_error(self, ws, error): """错误处理的回调""" logger.error(f"WebSocket error: {str(error)}") self.is_ws_connected = False # 发生错误时设置连接状态为False def send_message(self, message_obj): """发送消息到WebSocket服务器""" if self.is_connected(): try: self.session.send(json.dumps(message_obj)) except Exception as e: logger.error(f"Error sending message: {str(e)}") else: logger.warning("Cannot send message: Not connected") def ping(self): """发送心跳包""" ping_obj = { "code": 10010, "trace": "01213e9d-90a0-426e-a380-ebed633cba7a" } self.send_message(ping_obj) # 使用示例 if __name__ == "__main__": ws_client = WebsocketExample() ws_client.connect_all() # 保持主线程运行 try: while True: schedule.run_pending() time.sleep(1) except KeyboardInterrupt: logger.info("Exiting...") if ws_client.is_connected(): ws_client.session.close()

核心类WebsocketExample的功能包括:

1. 建立连接

connect_all():建立WebSocket连接,并启动定时重连机制。

connect(url):创建WebSocketApp,绑定回调函数,非阻塞运行。

2. 订阅数据

on_open(ws):连接建立成功后发送订阅请求:

  • 实时成交明细
  • 实时盘口数据
  • 实时K线数据

不同请求之间用time.sleep()控制间隔,防止请求过快。

3. 接收数据

on_message(ws, message):回调函数接收服务器推送的消息,并打印日志。

4. 连接管理

is_connected():检查连接状态。

start_reconnection(url):定时检查连接状态,如断开自动重连。

on_error/on_close:处理连接错误或关闭事件,更新状态标志。

5. 心跳机制

ping():定时发送心跳包,防止服务器断开空闲连接。

6. 发送消息

send_message(message_obj):封装JSON消息发送,并处理异常。

4.3 WebSocket使用中的常见问题及解决方案

1. 连接不稳定 / 自动断开

原因:网络波动或服务器空闲断开。

解决:增加心跳机制(如每30秒发送一次ping),并定时检查连接状态,必要时重连。

2. 消息处理阻塞

原因:回调函数执行耗时操作。

解决:在回调中尽量只做消息解析和入队处理,后续数据处理放到单独线程或队列。

3. 订阅请求过快导致失败

原因:一次性发送大量订阅消息。

解决:订阅请求之间加延迟(如 time.sleep(1~5秒)),或分批发送。

4. 网络异常导致发送失败

原因:连接断开但未检测。

解决:在发送前使用is_connected()检查状态,如果未连接则先重连。

5. 多线程和调度冲突

原因:同时使用WebSocket线程和定时任务调度器。

解决:保证调度器在单独守护线程中运行,WebSocket也在独立线程运行,避免阻塞主线程。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/29 18:45:52

长期投资在波动市场中的优势

长期投资在波动市场中的优势 关键词:长期投资、波动市场、投资优势、资产配置、复利效应 摘要:本文聚焦于长期投资在波动市场中的优势。通过深入剖析波动市场的特点以及长期投资的核心原理,从多个角度阐述了长期投资在应对市场波动时所展现出的独特优势。详细介绍了相关的数…

作者头像 李华
网站建设 2026/2/3 5:11:16

YOLOv11锚框设计调整:适应不同尺度目标检测

YOLOv11锚框设计调整:适应不同尺度目标检测 在智能交通系统中,一辆自动驾驶汽车需要同时识别远处的行人、近处的车辆以及空中悬停的无人机。这些目标尺寸差异巨大——从几十像素的小人影到占据画面三分之一的大卡车——对检测模型的多尺度感知能力提出了…

作者头像 李华
网站建设 2026/1/29 19:24:00

使用GitHub Pages搭建个人技术博客:分享PyTorch心得

使用GitHub Pages搭建个人技术博客:分享PyTorch心得 在深度学习领域,一个常见的困境是:你刚刚在网上找到一篇令人兴奋的教程,满心欢喜地准备复现结果,却卡在了环境配置的第一步——CUDA版本不匹配、PyTorch安装失败、…

作者头像 李华
网站建设 2026/2/1 6:29:59

Markdown+Jupyter:打造高质量技术博客输出体系

Markdown Jupyter:构建现代技术写作的高效闭环 在深度学习与数据科学日益普及的今天,一个模型能否被广泛理解、采纳甚至复现,早已不再仅仅取决于它的准确率高低。真正决定影响力的是——你如何讲清楚这个故事。从实验设计到结果分析&#xf…

作者头像 李华
网站建设 2026/1/31 14:12:01

Docker Volume持久化存储:保存PyTorch训练检查点

Docker Volume持久化存储:保存PyTorch训练检查点 在深度学习项目中,一次完整的模型训练往往需要数小时甚至数天。尤其是在使用大规模数据集或复杂网络结构时,任何意外中断都可能导致前功尽弃——GPU资源被白白消耗,实验进度归零。…

作者头像 李华
网站建设 2026/2/1 5:14:31

CUDA核心概念解析:理解PyTorch背后的GPU加速原理

CUDA核心概念解析:理解PyTorch背后的GPU加速原理 在深度学习模型日益庞大的今天,一次训练动辄需要处理数十亿参数和海量数据。面对如此繁重的计算任务,CPU那串行执行的架构显得捉襟见肘——你可能有过这样的体验:在一个中等规模的…

作者头像 李华