目录
- 摘要
- 一、横截面引擎概述
- 1.1 什么是横截面引擎
- 1.2 横截面引擎 vs 时间序列引擎
- 1.3 适用场景
- 二、创建横截面引擎
- 2.1 基本语法
- 2.2 创建简单引擎
- 2.3 创建带分组的引擎
- 三、触发模式
- 3.1 触发模式类型
- 3.2 每行触发
- 3.3 定时触发
- 四、聚合指标
- 4.1 基本统计
- 4.2 百分位统计
- 4.3 Top-N计算
- 五、实战案例
- 5.1 设备实时排名
- 5.2 实时异常检测
- 5.3 生产效率监控
- 六、引擎管理
- 6.1 查看引擎状态
- 6.2 删除引擎
- 6.3 引擎监控
- 七、性能优化
- 7.1 分组数量优化
- 7.2 触发模式选择
- 7.3 内存管理
- 八、总结
- 参考资料
摘要
本文深入讲解DolphinDB横截面引擎。从引擎原理到创建配置,从实时统计到Top-N计算,从分组分析到性能优化,全面介绍横截面引擎的核心功能。通过丰富的代码示例,帮助读者掌握实时统计分析的核心技能。
一、横截面引擎概述
1.1 什么是横截面引擎
横截面引擎是对同一时刻所有分组的数据进行聚合计算:
1.2 横截面引擎 vs 时间序列引擎
| 特性 | 时间序列引擎 | 横截面引擎 |
|---|---|---|
| 聚合维度 | 时间窗口内 | 同一时刻所有分组 |
| 输出频率 | 窗口结束 | 每条数据触发 |
| 适用场景 | 时间趋势 | 实时排名、整体统计 |
1.3 适用场景
| 场景 | 说明 |
|---|---|
| 实时排名 | 设备实时排名 |
| 整体统计 | 所有设备整体统计 |
| Top-N计算 | 实时Top-N |
| 异常检测 | 整体异常检测 |
二、创建横截面引擎
2.1 基本语法
//创建横截面引擎 agg=createCrossSectionalEngine("engine_name",//引擎名称 metrics,//聚合指标 outputTable,//输出表 timeColumn,//时间列[keyColumn],//分组列(可选)[triggeringPattern],//触发模式[triggeringInterval]//触发间隔)2.2 创建简单引擎
//创建输入流表 share streamTable(1:0,`device_id`timestamp`temperature,[INT,TIMESTAMP,DOUBLE])asinput_stream//创建输出表 share table(1:0,`timestamp`avg_temp`max_temp`min_temp`device_count,[TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG])asoutput_table//创建横截面引擎 agg=createCrossSectionalEngine("cs_engine",<[avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,count(*)asdevice_count]>,output_table,`timestamp)//订阅流表 subscribeTable(,"input_stream","cs_agg",-1,agg,true)2.3 创建带分组的引擎
//创建带分组的横截面引擎 agg=createCrossSectionalEngine("grouped_cs_engine",<[avg(temperature)asavg_temp,max(temperature)asmax_temp]>,output_table,`timestamp,`device_id)//分组说明://-每个device_id维护最新值//-聚合计算所有device_id的最新值三、触发模式
3.1 触发模式类型
| 模式 | 说明 |
|---|---|
| perRow | 每条数据触发 |
| perBatch | 每批数据触发 |
| interval | 定时触发 |
3.2 每行触发
//每行触发:每条数据都触发计算 agg=createCrossSectionalEngine("per_row_engine",<[avg(temperature)asavg_temp]>,output_table,`timestamp,`device_id,"perRow")//每行触发3.3 定时触发
//定时触发:每隔N毫秒触发 agg=createCrossSectionalEngine("interval_engine",<[avg(temperature)asavg_temp]>,output_table,`timestamp,`device_id,"interval",1000)//每1秒触发四、聚合指标
4.1 基本统计
//基本统计指标 agg=createCrossSectionalEngine("basic_stats",<[avg(temperature)asavg_temp,sum(temperature)assum_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,count(*)asdevice_count,std(temperature)asstd_temp]>,output_table,`timestamp,`device_id)4.2 百分位统计
//百分位统计 agg=createCrossSectionalEngine("percentile_stats",<[percentile(temperature,50)asmedian,percentile(temperature,95)asp95,percentile(temperature,99)asp99]>,output_table,`timestamp,`device_id)4.3 Top-N计算
//Top-N计算:需要自定义函数//使用top函数获取Top-N//创建输出表 share table(1:0,`timestamp`top_devices`top_temps,[TIMESTAMP,STRING,STRING])astopn_table//创建引擎 agg=createCrossSectionalEngine("topn_engine",<[concat(string(device_id),",")astop_devices,concat(string(temperature),",")astop_temps]>,topn_table,`timestamp,`device_id)五、实战案例
5.1 设备实时排名
//==========1.创建流表==========share streamTable(100000:0,`device_id`timestamp`temperature`humidity,[INT,TIMESTAMP,DOUBLE,DOUBLE])assensor_stream//==========2.创建输出表==========share table(1:0,`timestamp`avg_temp`max_temp`min_temp`device_count`std_temp,[TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE])asstats_table//==========3.创建横截面引擎==========agg=createCrossSectionalEngine("ranking_engine",<[avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,count(*)asdevice_count,std(temperature)asstd_temp]>,stats_table,`timestamp,`device_id,"interval",5000)//每5秒输出//==========4.订阅流表==========subscribeTable(,"sensor_stream","ranking_agg",-1,agg,true)//==========5.模拟数据==========defsimulateRanking(){for(iin1..100){sensor_stream.append!(table(1..100asdevice_id,take(now(),100)astimestamp,rand(20.0..30.0,100)astemperature,rand(40.0..60.0,100)ashumidity))sleep(1000)}}simulateRanking()//查看结果 select top20*fromstats_table5.2 实时异常检测
//==========1.创建流表==========share streamTable(100000:0,`device_id`timestamp`temperature,[INT,TIMESTAMP,DOUBLE])assensor_stream//==========2.创建输出表==========share table(1:0,`timestamp`avg_temp`std_temp`threshold`anomaly_count,[TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG])asanomaly_table//==========3.创建异常检测引擎==========agg=createCrossSectionalEngine("anomaly_engine",<[avg(temperature)asavg_temp,std(temperature)asstd_temp,avg(temperature)+3*std(temperature)asthreshold,sum(iif(temperature>avg(temperature)+3*std(temperature),1,0))asanomaly_count]>,anomaly_table,`timestamp,`device_id,"interval",10000)//每10秒检测//==========4.订阅流表==========subscribeTable(,"sensor_stream","anomaly_agg",-1,agg,true)5.3 生产效率监控
//==========1.创建流表==========share streamTable(100000:0,`production_line`timestamp`output`efficiency,[SYMBOL,TIMESTAMP,DOUBLE,DOUBLE])asproduction_stream//==========2.创建输出表==========share table(1:0,`timestamp`total_output`avg_efficiency`max_efficiency`line_count,[TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG])asproduction_stats//==========3.创建监控引擎==========agg=createCrossSectionalEngine("production_engine",<[sum(output)astotal_output,avg(efficiency)asavg_efficiency,max(efficiency)asmax_efficiency,count(*)asline_count]>,production_stats,`timestamp,`production_line,"interval",60000)//每分钟统计//==========4.订阅流表==========subscribeTable(,"production_stream","production_agg",-1,agg,true)六、引擎管理
6.1 查看引擎状态
//查看所有引擎状态 getStreamEngineStat()//查看特定引擎 getStreamEngineStat("cs_engine")6.2 删除引擎
//删除引擎 dropStreamEngine("cs_engine")6.3 引擎监控
//引擎监控函数defmonitorCSEngine(){stat=getStreamEngineStat()for(rowinstat){if(row.type=="CrossSectionalAggregator"){print("横截面引擎: "+row.name)print(" 状态: "+row.status)print(" 分组数: "+string(row.numGroups))print(" 处理行数: "+string(row.processedRows))}}}monitorCSEngine()七、性能优化
7.1 分组数量优化
//分组数量建议//单引擎分组数<100000//如果分组数过多://1.使用多个引擎//2.增加触发间隔7.2 触发模式选择
| 场景 | 推荐触发模式 |
|---|---|
| 实时监控 | perRow |
| 周期统计 | interval |
| 批量处理 | perBatch |
7.3 内存管理
//横截面引擎内存使用//=分组数 × 每组数据大小//优化://1.减少分组数//2.使用过滤条件//3.定期清理不活跃分组八、总结
本文详细介绍了DolphinDB横截面引擎:
- 引擎原理:同一时刻所有分组聚合
- 创建方法:简单引擎、分组引擎
- 触发模式:每行触发、定时触发
- 聚合指标:基本统计、百分位、Top-N
- 实战应用:实时排名、异常检测、效率监控
- 性能优化:分组数量、触发模式、内存管理
思考题:
- 横截面引擎和时间序列引擎有什么区别?
- 如何选择合适的触发模式?
- 如何设计实时排名系统?
参考资料
- DolphinDB横截面引擎
- DolphinDB流计算