声明:本文为个人学习笔记,仅供技术交流,不构成任何投资建议。
一、前言
作为一个从业二十年的期货老兵,我深知实盘运维的重要性。策略写得再好,如果运维出问题,一样会亏钱。
今天这篇文章,我来分享一下量化交易日志与监控系统的搭建经验。
二、为什么需要日志和监控?
| 作用 | 说明 |
|---|---|
| 问题排查 | 出问题时能快速定位原因 |
| 策略复盘 | 记录交易决策过程 |
| 异常告警 | 及时发现和处理异常 |
| 合规审计 | 保留交易记录 |
三、日志系统设计
3.1 日志分类
| 日志类型 | 内容 | 级别 |
|---|---|---|
| 系统日志 | 启动、连接、异常 | INFO/ERROR |
| 交易日志 | 下单、成交、持仓 | INFO |
| 策略日志 | 信号、计算过程 | DEBUG |
| 风控日志 | 风控触发、仓位调整 | WARNING |
3.2 日志配置
importloggingfromlogging.handlersimportRotatingFileHandler,TimedRotatingFileHandlerfromdatetimeimportdatetimeimportosdefsetup_logging(log_dir='logs'):"""配置日志系统"""# 创建日志目录ifnotos.path.exists(log_dir):os.makedirs(log_dir)# 日志格式formatter=logging.Formatter('%(asctime)s | %(levelname)-8s | %(name)s | %(message)s',datefmt='%Y-%m-%d %H:%M:%S')# 根日志器root_logger=logging.getLogger()root_logger.setLevel(logging.DEBUG)# 1. 控制台输出console_handler=logging.StreamHandler()console_handler.setLevel(logging.INFO)console_handler.setFormatter(formatter)root_logger.addHandler(console_handler)# 2. 交易日志(按天切割)trade_handler=TimedRotatingFileHandler(os.path.join(log_dir,'trade.log'),when='midnight',interval=1,backupCount=30,encoding='utf-8')trade_handler.setLevel(logging.INFO)trade_handler.setFormatter(formatter)trade_logger=logging.getLogger('trade')trade_logger.addHandler(trade_handler)# 3. 错误日志error_handler=RotatingFileHandler(os.path.join(log_dir,'error.log'),maxBytes=10*1024*1024,# 10MBbackupCount=5,encoding='utf-8')error_handler.setLevel(logging.ERROR)error_handler.setFormatter(formatter)root_logger.addHandler(error_handler)returnroot_logger# 使用logger=setup_logging()trade_logger=logging.getLogger('trade')logger.info("系统启动")trade_logger.info("下单 | SHFE.rb2505 | BUY | OPEN | 1手 | 3850")3.3 结构化日志
importjsonfromdatetimeimportdatetimeclassStructuredLogger:"""结构化日志记录器"""def__init__(self,log_file):self.log_file=log_filedeflog(self,event_type,**kwargs):"""记录结构化日志"""record={'timestamp':datetime.now().isoformat(),'event_type':event_type,**kwargs}withopen(self.log_file,'a',encoding='utf-8')asf:f.write(json.dumps(record,ensure_ascii=False)+'\n')deflog_order(self,symbol,direction,offset,volume,price):"""记录订单"""self.log('ORDER',symbol=symbol,direction=direction,offset=offset,volume=volume,price=price)deflog_trade(self,symbol,direction,price,volume,pnl):"""记录成交"""self.log('TRADE',symbol=symbol,direction=direction,price=price,volume=volume,pnl=pnl)deflog_signal(self,symbol,signal,reason):"""记录信号"""self.log('SIGNAL',symbol=symbol,signal=signal,reason=reason)# 使用示例slog=StructuredLogger('logs/structured.jsonl')slog.log_order('SHFE.rb2505','BUY','OPEN',1,3850)slog.log_signal('SHFE.rb2505','BUY','MA5上穿MA20')四、监控系统设计
4.1 监控指标
| 指标 | 说明 | 阈值 |
|---|---|---|
| 账户权益 | 实时权益 | - |
| 日盈亏 | 当日盈亏 | 亏损>3%告警 |
| 持仓 | 当前持仓 | 超限告警 |
| 系统状态 | 连接状态 | 断线告警 |
| 延迟 | 行情延迟 | >1秒告警 |
4.2 监控类实现
fromdatetimeimportdatetime,timeimportloggingclassTradingMonitor:"""交易监控器"""def__init__(self,config):self.config=config self.daily_start_balance=Noneself.last_heartbeat=datetime.now()self.logger=logging.getLogger('monitor')defon_day_start(self,balance):"""每日开始"""self.daily_start_balance=balance self.logger.info(f"交易日开始 | 初始权益:{balance:.2f}")defcheck_daily_pnl(self,current_balance):"""检查日盈亏"""ifself.daily_start_balanceisNone:returnTruepnl=current_balance-self.daily_start_balance pnl_pct=pnl/self.daily_start_balanceifpnl_pct<-self.config['daily_loss_limit']:self.logger.warning(f"日亏损告警 | 亏损:{pnl:.2f}({pnl_pct:.2%})")self.send_alert(f"日亏损告警:{pnl:.2f}({pnl_pct:.2%})")returnFalsereturnTruedefcheck_position(self,position,max_position):"""检查持仓"""total_pos=position.pos_long+position.pos_shortiftotal_pos>max_position:self.logger.warning(f"持仓超限 | 当前:{total_pos}上限:{max_position}")self.send_alert(f"持仓超限:{total_pos}/{max_position}")returnFalsereturnTruedefheartbeat(self):"""心跳检测"""self.last_heartbeat=datetime.now()defcheck_heartbeat(self,timeout_seconds=60):"""检查心跳"""elapsed=(datetime.now()-self.last_heartbeat).total_seconds()ifelapsed>timeout_seconds:self.logger.error(f"心跳超时 | 距上次心跳:{elapsed:.0f}秒")self.send_alert(f"策略心跳超时:{elapsed:.0f}秒")returnFalsereturnTruedefsend_alert(self,message):"""发送告警"""# 钉钉告警self.send_dingtalk(message)# 或微信告警# self.send_wechat(message)defsend_dingtalk(self,message):"""发送钉钉消息"""importrequests webhook=self.config.get('dingtalk_webhook')ifnotwebhook:returndata={"msgtype":"text","text":{"content":f"[量化告警]{message}"}}try:requests.post(webhook,json=data,timeout=5)exceptExceptionase:self.logger.error(f"钉钉发送失败:{e}")4.3 定时报告
fromdatetimeimportdatetime,timeimportscheduleclassReportGenerator:"""报告生成器"""def__init__(self,api):self.api=apidefgenerate_daily_report(self):"""生成每日报告"""account=self.api.get_account()report=f""" ========== 每日报告 ========== 日期:{datetime.now().strftime('%Y-%m-%d')}账户权益:{account.balance:.2f}可用资金:{account.available:.2f}持仓盈亏:{account.position_profit:.2f}当日盈亏:{account.close_profit:.2f}================================ """logging.info(report)returnreportdefschedule_reports(self):"""设置定时报告"""# 每日收盘后发送报告schedule.every().day.at("15:30").do(self.generate_daily_report)# 每小时发送状态schedule.every().hour.do(self.send_status)# 在主循环中运行# schedule.run_pending()五、完整监控系统示例
fromtqsdkimportTqApi,TqAuth,TqAccountimportloggingfromdatetimeimportdatetimeimportthreadingimporttime# 配置CONFIG={'broker':'期货公司','account':'资金账号','password':'密码','tq_user':'TQ账户','tq_pass':'TQ密码','symbol':'SHFE.rb2505','daily_loss_limit':0.03,'max_position':5,'dingtalk_webhook':'https://oapi.dingtalk.com/robot/send?access_token=xxx',}# 设置日志setup_logging()logger=logging.getLogger('main')# 初始化监控monitor=TradingMonitor(CONFIG)# 连接APIapi=TqApi(TqAccount(CONFIG['broker'],CONFIG['account'],CONFIG['password']),auth=TqAuth(CONFIG['tq_user'],CONFIG['tq_pass']))account=api.get_account()position=api.get_position(CONFIG['symbol'])# 初始化监控monitor.on_day_start(account.balance)logger.info("策略启动")# 主循环try:whileTrue:api.wait_update()# 心跳monitor.heartbeat()# 监控检查ifnotmonitor.check_daily_pnl(account.balance):logger.warning("触发日亏损限制")# 可以选择停止交易ifnotmonitor.check_position(position,CONFIG['max_position']):logger.warning("持仓超限")# 策略逻辑...exceptKeyboardInterrupt:logger.info("用户中断")exceptExceptionase:logger.error(f"异常:{e}")monitor.send_alert(f"策略异常:{e}")finally:api.close()logger.info("策略停止")六、运维最佳实践
6.1 日志管理
- 日志分级:DEBUG/INFO/WARNING/ERROR
- 日志切割:按天或按大小切割
- 日志保留:保留30天以上
- 定期清理:避免磁盘满
6.2 监控告警
- 分级告警:不同级别不同处理
- 告警去重:避免重复告警
- 告警升级:持续告警升级通知
- 告警测试:定期测试告警通道
6.3 故障恢复
- 自动重连:网络断开自动重连
- 状态恢复:重启后恢复状态
- 数据备份:定期备份配置和日志
七、我的运维配置
| 组件 | 选择 |
|---|---|
| 量化框架 | TqSdk |
| 日志库 | Python logging |
| 告警 | 钉钉机器人 |
| 监控 | 自建脚本 |
| 服务器 | 阿里云ECS |
选择TqSdk的原因是API简洁,容易集成日志和监控逻辑。
八、总结
量化交易运维的核心:
- 日志完善:记录所有关键信息
- 监控及时:异常第一时间发现
- 告警有效:告警信息准确、及时
- 定期复盘:分析日志优化策略
希望这篇文章对你有所帮助!
声明:本文基于个人学习经验整理,仅供技术交流参考,不构成任何投资建议。