news 2026/4/20 17:45:20

Flink ML K-Means 离线聚类 + 在线增量聚类(mini-batch + decayFactor)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink ML K-Means 离线聚类 + 在线增量聚类(mini-batch + decayFactor)

一、K-Means(离线版):有限数据上的迭代聚类

1)输入列(Input Columns)

参数名类型默认值说明
featuresColVector"features"特征向量

2)输出列(Output Columns)

参数名类型默认值说明
predictionColInteger"prediction"预测所属簇 ID(簇中心编号)

3)参数(Parameters)详解

KMeansModel(预测侧)参数
Key默认值类型说明
distanceMeasureEuclideanDistanceMeasure.NAMEString距离度量(当前支持欧式距离)
featuresCol"features"String特征列名
predictionCol"prediction"String输出列名
k2Integer簇数量(最大聚类数)
KMeans(训练侧)额外参数
Key默认值类型说明
initMode"random"String初始化方式(当前支持 random)
seednullLong随机种子(保证可复现)
maxIter20Integer最大迭代次数

4)Java 示例代码解读(离线 KMeans)

示例流程很标准:

  1. 构造输入数据(DenseVector 流)
  2. DataStream → Table,并命名为features
  3. kmeans.fit(table)训练得到KMeansModel
  4. model.transform(table)输出每条数据的簇 ID
  5. collect 打印 features + clusterId

关键代码片段:

DataStream<DenseVector>inputStream=env.fromElements(Vectors.dense(0.0,0.0),Vectors.dense(0.0,0.3),Vectors.dense(0.3,0.0),Vectors.dense(9.0,0.0),Vectors.dense(9.0,0.6),Vectors.dense(9.6,0.0));TableinputTable=tEnv.fromDataStream(inputStream).as("features");KMeanskmeans=newKMeans().setK(2).setSeed(1L);KMeansModelkmeansModel=kmeans.fit(inputTable);TableoutputTable=kmeansModel.transform(inputTable)[0];

输出打印(预测列是 Integer):

DenseVectorfeatures=(DenseVector)row.getField(kmeans.getFeaturesCol());intclusterId=(Integer)row.getField(kmeans.getPredictionCol());

二、Online K-Means:无界流上的持续聚类(mini-batch + 遗忘)

1)为什么需要 Online K-Means?

离线 KMeans 训练出来的中心是“固定”的。
但很多业务数据分布会随时间变化,例如:

  • 用户行为习惯变了
  • 商品/内容热点变化
  • 流量来源变化

这时你希望模型能“持续学习”,让聚类中心跟着数据漂移而更新,就需要 Online K-Means。

2)Online K-Means 的核心思想(mini-batch + decayFactor)

Online K-Means 基于“mini-batch KMeans”的更新规则,并加入遗忘机制(decay):

  • 每次从训练流中积累一个 mini-batch
  • 基于这个 batch 计算临时中心(estimated centroids)
  • 用加权平均更新旧中心(original centroids):

decayFactor 解释(非常关键)

  • decayFactor = 1:历史与新数据同等重要(几乎不遗忘)
  • decayFactor = 0:完全由最新数据决定中心(强遗忘)
  • 值越小 → 遗忘越强 → 模型越“跟新”
  • 值越大 → 趋于稳定 → 变化越慢

3)输入输出列(Online)

输入列同离线:

参数名类型默认值说明
featuresColVector"features"特征向量

输出列同离线:

参数名类型默认值说明
predictionColInteger"prediction"所属簇 ID

4)参数(OnlineKMeans)详解

OnlineKMeansModel(预测侧)
Key默认值类型说明
distanceMeasureEuclideanDistanceMeasure.NAMEString距离度量(欧式距离)
featuresCol"features"String特征列名
predictionCol"prediction"String输出列名
k2Integer簇数量
OnlineKMeans(训练侧)额外参数
Key默认值类型说明
batchStrategyCOUNT_STRATEGYStringmini-batch 构造策略
globalBatchSize32Integer全局 batch 大小
decayFactor0.0Double遗忘系数(历史中心贡献缩放)
seednullLong随机种子

5)Java 示例代码解读(OnlineKMeans)

示例里做了非常“演示型”的设计:训练数据分两段周期性出现,观察聚类结果如何随时间变化。

(1)训练流是无限流,周期性吐两批不同分布的数据
  • trainData1:大致在 (0~10) 附近
  • trainData2:分布跳到了 (10,100) 与 (-10,-100) 两块

这等于让数据分布发生“漂移”,你就能看到在线聚类中心被新数据影响。

(2)predict 也是周期性吐同一组预测点
List<Row>predictData=Arrays.asList(Row.of(Vectors.dense(10.0,10.0)),Row.of(Vectors.dense(-10.0,10.0)));

输出里会不停打印:

  • 两个点是否被分到同一个簇
    因为随着训练数据改变、中心改变,聚类结果可能随时间变化。
(3)初始化模型数据 initialModelData

在线聚类必须有初始中心,否则没法开始迭代。示例使用:

.setInitialModelData(KMeansModelData.generateRandomModelData(tEnv,2,2,0.0,0))

含义是:

  • k=2 个中心
  • 每个中心 2 维
  • 随机生成初始中心
(4)globalBatchSize=6:每 6 条数据更新一次中心
.setGlobalBatchSize(6)

这与训练数据每批 6 条刚好对应,便于演示“每批更新一次”的效果。

三、离线 KMeans vs 在线 OnlineKMeans:怎么选?

选离线 KMeans 的典型场景

  • 你有明确的历史数据窗口(按天、按周)
  • 模型周期性训练发布,追求稳定可控
  • 线上只是推理(transform),不希望训练影响延迟

选 OnlineKMeans 的典型场景

  • 数据持续流入且分布变化快
  • 你希望模型能持续适应新模式(概念漂移)
  • 你可以接受聚类结果随时间变化

四、实战建议(非常重要)

1)KMeans 之前强烈建议做标准化

KMeans 基于距离(欧式距离),特征尺度不同会导致“某个维度支配聚类”。典型做法:

  • VectorAssembler(拼特征)
  • StandardScaler(标准化)
  • KMeans / OnlineKMeans

2)k 的选择不要拍脑袋

常见方法:

  • 肘部法(Elbow)
  • 轮廓系数(Silhouette)
  • 结合业务可解释性(比如用户分群常选 5/8/10)

3)OnlineKMeans 的 decayFactor 是控制“跟新程度”的旋钮

简单经验:

  • 数据分布很稳定:decayFactor 接近 1
  • 数据漂移明显:decayFactor 取 0.1~0.5 让模型更灵活
  • 想快速跟随热点:decayFactor 更小

4)batch size 与更新频率要结合吞吐与稳定性

  • batch 小:更新快但抖动大
  • batch 大:更稳定但响应慢

五、小结

Flink ML 的 KMeans 家族可以覆盖绝大多数“聚类/分群”需求:

  • KMeans(离线):有限数据、迭代训练、中心稳定
  • OnlineKMeans(在线):无界流、mini-batch 更新、支持遗忘机制

掌握了k / maxIter / globalBatchSize / decayFactor这些关键参数,你就能把聚类从“demo”落到“线上可用”。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/20 17:15:00

无需手动编译!PyTorch-CUDA基础镜像一键启动AI项目

无需手动编译&#xff01;PyTorch-CUDA基础镜像一键启动AI项目 在深度学习项目开发中&#xff0c;最让人头疼的往往不是模型设计或调参&#xff0c;而是环境配置——“为什么代码在我机器上跑得好好的&#xff0c;换台设备就报错&#xff1f;”这种问题几乎成了每个AI工程师都经…

作者头像 李华
网站建设 2026/4/17 20:19:33

移动测试的变革与工具选型挑战

在设备碎片化&#xff08;Android超3万种设备型号&#xff09;和iOS/Android双平台迭代加速的背景下&#xff0c;2025年移动测试工具已从单一功能向AI驱动的全链路解决方案进化。本文基于全球Top 500移动团队的实践反馈&#xff0c;精选10款必备工具&#xff0c;覆盖自动化、云…

作者头像 李华
网站建设 2026/4/18 10:03:42

三菱 FX3U 电机转速与频率互转 FB 功能块实战分享

三菱FX3U 电机转速与频率互转FB功能块实际项目中的应用&#xff0c;做成fb块出给有需要的朋友。程序分三种情况&#xff0c;一是直接转换&#xff0c;二是使用减速机情况下的速度频率转换&#xff0c;三是使用皮带轮情况下的速度频率转换。 更多使用场景可以探讨。把换算封装成…

作者头像 李华
网站建设 2026/4/20 10:13:05

【计算机毕业设计案例】基于SpringBoot的供应链管理系统的设计与实现基于SpringBoot的粮食供应链管理系统的设计与实现(程序+文档+讲解+定制)

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

作者头像 李华
网站建设 2026/4/20 11:45:22

Java毕设项目:基于SpringBoot的粮食供应链管理系统的设计与实现(源码+文档,讲解、调试运行,定制等)

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

作者头像 李华
网站建设 2026/4/20 2:00:29

PyTorch 2.6版本新特性解析 + CUDA 12支持实测性能提升

PyTorch 2.6 CUDA 12&#xff1a;性能跃迁与容器化开发新范式 在高端 GPU 日益普及的今天&#xff0c;一个令人尴尬的现象依然普遍存在&#xff1a;许多深度学习项目在 A100 或 H100 上跑出的训练吞吐&#xff0c;甚至还不如理论峰值的 60%。问题往往不在于模型设计&#xff0…

作者头像 李华