news 2026/5/29 20:04:19

DolphinDB横截面引擎:实时统计分析

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
DolphinDB横截面引擎:实时统计分析

目录

    • 摘要
    • 一、横截面引擎概述
      • 1.1 什么是横截面引擎
      • 1.2 横截面引擎 vs 时间序列引擎
      • 1.3 适用场景
    • 二、创建横截面引擎
      • 2.1 基本语法
      • 2.2 创建简单引擎
      • 2.3 创建带分组的引擎
    • 三、触发模式
      • 3.1 触发模式类型
      • 3.2 每行触发
      • 3.3 定时触发
    • 四、聚合指标
      • 4.1 基本统计
      • 4.2 百分位统计
      • 4.3 Top-N计算
    • 五、实战案例
      • 5.1 设备实时排名
      • 5.2 实时异常检测
      • 5.3 生产效率监控
    • 六、引擎管理
      • 6.1 查看引擎状态
      • 6.2 删除引擎
      • 6.3 引擎监控
    • 七、性能优化
      • 7.1 分组数量优化
      • 7.2 触发模式选择
      • 7.3 内存管理
    • 八、总结
    • 参考资料

摘要

本文深入讲解DolphinDB横截面引擎。从引擎原理到创建配置,从实时统计到Top-N计算,从分组分析到性能优化,全面介绍横截面引擎的核心功能。通过丰富的代码示例,帮助读者掌握实时统计分析的核心技能。


一、横截面引擎概述

1.1 什么是横截面引擎

横截面引擎是对同一时刻所有分组的数据进行聚合计算:

横截面引擎

时刻T

设备1数据

设备2数据

设备3数据

横截面聚合

整体统计结果

1.2 横截面引擎 vs 时间序列引擎

特性时间序列引擎横截面引擎
聚合维度时间窗口内同一时刻所有分组
输出频率窗口结束每条数据触发
适用场景时间趋势实时排名、整体统计

1.3 适用场景

场景说明
实时排名设备实时排名
整体统计所有设备整体统计
Top-N计算实时Top-N
异常检测整体异常检测

二、创建横截面引擎

2.1 基本语法

//创建横截面引擎 agg=createCrossSectionalEngine("engine_name",//引擎名称 metrics,//聚合指标 outputTable,//输出表 timeColumn,//时间列[keyColumn],//分组列(可选)[triggeringPattern],//触发模式[triggeringInterval]//触发间隔)

2.2 创建简单引擎

//创建输入流表 share streamTable(1:0,`device_id`timestamp`temperature,[INT,TIMESTAMP,DOUBLE])asinput_stream//创建输出表 share table(1:0,`timestamp`avg_temp`max_temp`min_temp`device_count,[TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG])asoutput_table//创建横截面引擎 agg=createCrossSectionalEngine("cs_engine",<[avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,count(*)asdevice_count]>,output_table,`timestamp)//订阅流表 subscribeTable(,"input_stream","cs_agg",-1,agg,true)

2.3 创建带分组的引擎

//创建带分组的横截面引擎 agg=createCrossSectionalEngine("grouped_cs_engine",<[avg(temperature)asavg_temp,max(temperature)asmax_temp]>,output_table,`timestamp,`device_id)//分组说明://-每个device_id维护最新值//-聚合计算所有device_id的最新值

三、触发模式

3.1 触发模式类型

模式说明
perRow每条数据触发
perBatch每批数据触发
interval定时触发

3.2 每行触发

//每行触发:每条数据都触发计算 agg=createCrossSectionalEngine("per_row_engine",<[avg(temperature)asavg_temp]>,output_table,`timestamp,`device_id,"perRow")//每行触发

3.3 定时触发

//定时触发:每隔N毫秒触发 agg=createCrossSectionalEngine("interval_engine",<[avg(temperature)asavg_temp]>,output_table,`timestamp,`device_id,"interval",1000)//1秒触发

四、聚合指标

4.1 基本统计

//基本统计指标 agg=createCrossSectionalEngine("basic_stats",<[avg(temperature)asavg_temp,sum(temperature)assum_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,count(*)asdevice_count,std(temperature)asstd_temp]>,output_table,`timestamp,`device_id)

4.2 百分位统计

//百分位统计 agg=createCrossSectionalEngine("percentile_stats",<[percentile(temperature,50)asmedian,percentile(temperature,95)asp95,percentile(temperature,99)asp99]>,output_table,`timestamp,`device_id)

4.3 Top-N计算

//Top-N计算:需要自定义函数//使用top函数获取Top-N//创建输出表 share table(1:0,`timestamp`top_devices`top_temps,[TIMESTAMP,STRING,STRING])astopn_table//创建引擎 agg=createCrossSectionalEngine("topn_engine",<[concat(string(device_id),",")astop_devices,concat(string(temperature),",")astop_temps]>,topn_table,`timestamp,`device_id)

五、实战案例

5.1 设备实时排名

//==========1.创建流表==========share streamTable(100000:0,`device_id`timestamp`temperature`humidity,[INT,TIMESTAMP,DOUBLE,DOUBLE])assensor_stream//==========2.创建输出表==========share table(1:0,`timestamp`avg_temp`max_temp`min_temp`device_count`std_temp,[TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE])asstats_table//==========3.创建横截面引擎==========agg=createCrossSectionalEngine("ranking_engine",<[avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,count(*)asdevice_count,std(temperature)asstd_temp]>,stats_table,`timestamp,`device_id,"interval",5000)//5秒输出//==========4.订阅流表==========subscribeTable(,"sensor_stream","ranking_agg",-1,agg,true)//==========5.模拟数据==========defsimulateRanking(){for(iin1..100){sensor_stream.append!(table(1..100asdevice_id,take(now(),100)astimestamp,rand(20.0..30.0,100)astemperature,rand(40.0..60.0,100)ashumidity))sleep(1000)}}simulateRanking()//查看结果 select top20*fromstats_table

5.2 实时异常检测

//==========1.创建流表==========share streamTable(100000:0,`device_id`timestamp`temperature,[INT,TIMESTAMP,DOUBLE])assensor_stream//==========2.创建输出表==========share table(1:0,`timestamp`avg_temp`std_temp`threshold`anomaly_count,[TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG])asanomaly_table//==========3.创建异常检测引擎==========agg=createCrossSectionalEngine("anomaly_engine",<[avg(temperature)asavg_temp,std(temperature)asstd_temp,avg(temperature)+3*std(temperature)asthreshold,sum(iif(temperature>avg(temperature)+3*std(temperature),1,0))asanomaly_count]>,anomaly_table,`timestamp,`device_id,"interval",10000)//10秒检测//==========4.订阅流表==========subscribeTable(,"sensor_stream","anomaly_agg",-1,agg,true)

5.3 生产效率监控

//==========1.创建流表==========share streamTable(100000:0,`production_line`timestamp`output`efficiency,[SYMBOL,TIMESTAMP,DOUBLE,DOUBLE])asproduction_stream//==========2.创建输出表==========share table(1:0,`timestamp`total_output`avg_efficiency`max_efficiency`line_count,[TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG])asproduction_stats//==========3.创建监控引擎==========agg=createCrossSectionalEngine("production_engine",<[sum(output)astotal_output,avg(efficiency)asavg_efficiency,max(efficiency)asmax_efficiency,count(*)asline_count]>,production_stats,`timestamp,`production_line,"interval",60000)//每分钟统计//==========4.订阅流表==========subscribeTable(,"production_stream","production_agg",-1,agg,true)

六、引擎管理

6.1 查看引擎状态

//查看所有引擎状态 getStreamEngineStat()//查看特定引擎 getStreamEngineStat("cs_engine")

6.2 删除引擎

//删除引擎 dropStreamEngine("cs_engine")

6.3 引擎监控

//引擎监控函数defmonitorCSEngine(){stat=getStreamEngineStat()for(rowinstat){if(row.type=="CrossSectionalAggregator"){print("横截面引擎: "+row.name)print(" 状态: "+row.status)print(" 分组数: "+string(row.numGroups))print(" 处理行数: "+string(row.processedRows))}}}monitorCSEngine()

七、性能优化

7.1 分组数量优化

//分组数量建议//单引擎分组数<100000//如果分组数过多://1.使用多个引擎//2.增加触发间隔

7.2 触发模式选择

场景推荐触发模式
实时监控perRow
周期统计interval
批量处理perBatch

7.3 内存管理

//横截面引擎内存使用//=分组数 × 每组数据大小//优化://1.减少分组数//2.使用过滤条件//3.定期清理不活跃分组

八、总结

本文详细介绍了DolphinDB横截面引擎:

  1. 引擎原理:同一时刻所有分组聚合
  2. 创建方法:简单引擎、分组引擎
  3. 触发模式:每行触发、定时触发
  4. 聚合指标:基本统计、百分位、Top-N
  5. 实战应用:实时排名、异常检测、效率监控
  6. 性能优化:分组数量、触发模式、内存管理

思考题

  1. 横截面引擎和时间序列引擎有什么区别?
  2. 如何选择合适的触发模式?
  3. 如何设计实时排名系统?

参考资料

  • DolphinDB横截面引擎
  • DolphinDB流计算
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/29 20:04:16

基于TFLite的端侧语音表征模型:FRILL项目实战与优化指南

1. 项目概述&#xff1a;为什么我们需要在设备端进行语音表征&#xff1f;最近几年&#xff0c;语音交互已经渗透到我们生活的方方面面&#xff0c;从智能音箱到车载系统&#xff0c;再到手机上的语音助手。但不知道你有没有发现一个现象&#xff1a;很多语音功能&#xff0c;尤…

作者头像 李华
网站建设 2026/5/29 20:01:59

ESP32驱动CRT电视板与SHARP TFT屏:模拟视频系统改造全解析

1. 项目概述与核心思路几年前&#xff0c;我在一个旧货市场淘到了一台已经无法开机的CRT电视。拆开外壳后&#xff0c;那块布满灰尘、焊点发黄的主板并没有被我直接丢弃&#xff0c;反而让我萌生了一个想法&#xff1a;能否将这块“古董”主板与现代的微控制器结合&#xff0c;…

作者头像 李华
网站建设 2026/5/29 20:01:20

从RC电路到Buck电源:手把手教你用Simulink搭建环路模型并验证传递函数

从RC电路到Buck电源&#xff1a;手把手教你用Simulink搭建环路模型并验证传递函数在开关电源设计中&#xff0c;环路稳定性是决定系统可靠性的核心指标之一。许多初学者面对波特图、相位裕度等概念时常常感到抽象难懂&#xff0c;而传统教材中复杂的数学推导更是让人望而生畏。…

作者头像 李华
网站建设 2026/5/29 19:59:23

CXL异构内存中树形索引的层级感知优化

1. 项目概述&#xff1a;CXL异构内存中的树形索引优化在当今数据中心和云计算环境中&#xff0c;内存访问性能已成为系统瓶颈的关键因素。随着CXL&#xff08;Compute Express Link&#xff09;协议的普及&#xff0c;异构内存架构&#xff08;如DRAMCXL内存的组合&#xff09;…

作者头像 李华