news 2026/4/21 8:17:17

DataGen Connector本地造数神器(不用 Kafka 也能把 Pipeline 跑起来)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
DataGen Connector本地造数神器(不用 Kafka 也能把 Pipeline 跑起来)

1、它到底做了什么

  • Source 并行运行:有多少个 source 并发子任务,就把Long的序列切成多少段(sub-sequence)
  • 你提供一个GeneratorFunction<Long, OUT>:把输入的 index(Long)映射成任意事件类型
  • 每个 subtask 内部有序,但全局顺序取决于并行度(parallelism)

一句话:Flink 负责发 index,你负责把 index 变成事件。

2、最小可跑示例:生成 0~999 的字符串

importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.GeneratorFunction;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.connector.datagen.source.DataGeneratorSource;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;publicclassDataGenDemo{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();GeneratorFunction<Long,String>generator=index->"Number: "+index;longnumberOfRecords=1000;DataGeneratorSource<String>source=newDataGeneratorSource<>(generator,numberOfRecords,Types.STRING);DataStreamSource<String>stream=env.fromSource(source,WatermarkStrategy.noWatermarks(),"Generator Source");stream.print();env.execute("datagen-demo");}}

要点:

  • 并行度为 1 时输出是严格"Number: 0""Number: 999"顺序
  • 并行度 > 1 时:每个 subtask 内部仍然按序,但不同 subtask 的结果交织输出

3、限速:控制总吞吐(全局每秒不超过 N 条)

importorg.apache.flink.api.common.functions.GeneratorFunction;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.connector.datagen.source.DataGeneratorSource;importorg.apache.flink.connector.datagen.source.RateLimiterStrategy;GeneratorFunction<Long,String>generator=index->"Number: "+index;DataGeneratorSource<String>source=newDataGeneratorSource<>(generator,Long.MAX_VALUE,RateLimiterStrategy.perSecond(100),// 全部 source subtasks 加起来 <= 100 条/sTypes.STRING);

适用场景:

  • 你想模拟“上游流量”但又不想把本机打爆
  • 做算子性能对比、Backpressure 观察、checkpoint 行为观察

4、有界/无界:它“永远是 bounded”,但可以“看起来无界”

  • 语义上永远是 bounded(理论上会结束)
  • numberOfRecords = Long.MAX_VALUE基本等同“不会结束”(实践上像 unbounded)

建议:

  • 要跑有限数据:考虑 BATCH mode,更贴近离线回放
  • 要模拟持续输入:用Long.MAX_VALUE+ rate limit

5、容错语义:at-least-once / end-to-end exactly-once 能不能保证?

可以,但有个硬条件:

  • GeneratorFunction必须对输入 index 完全确定性
    也就是:同一个 index 永远生成同样的输出。

反例(会破坏确定性):

  • random()System.currentTimeMillis()、读外部可变配置、读网络请求结果

正确做法:

  • 用 index 推导数据(例如 hash(index) 生成用户、金额、状态)
  • 或者用固定 seed 的伪随机:new Random(index)(每个 index 固定)

6、Watermark:也可以在 Source 侧发“确定性水位线”

默认例子用noWatermarks(),但你完全可以:

  • 在生成事件里带 eventTime
  • 配合自定义WatermarkStrategy生成 deterministic watermarks
    适合做 event-time 窗口、乱序、迟到数据的测试演示。
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/17 17:08:13

Dynamic Kafka Source不重启也能“动态切换集群/主题”

1. 核心概念&#xff1a;从“物理订阅”升级为“逻辑订阅” Dynamic Kafka Source 不是直接让你写 topics ["a", "b"]&#xff0c;而是让你订阅一个或多个 stream id&#xff1a; stream id&#xff1a;逻辑流名称&#xff0c;比如 "input-stream&…

作者头像 李华
网站建设 2026/4/17 23:46:35

小红书去水印工具:轻松保存高清无水印图片与视频

小红书已成为许多人分享生活、发现灵感的重要平台&#xff0c;无论是精美的摄影作品、有趣的短视频&#xff0c;还是实用的教程笔记&#xff0c;都让人忍不住想要收藏。然而&#xff0c;平台自带的水印有时会影响内容的观看与二次使用。为此&#xff0c;一款方便快捷的“小红书…

作者头像 李华
网站建设 2026/4/19 7:09:39

普洛斯集团任命赵明琪为普洛斯中国首席执行官

、美通社消息&#xff1a;普洛斯集团(GLP Pte Ltd)宣布任命赵明琪为普洛斯中国首席执行官。赵明琪将向全球首席执行官梅志明汇报&#xff0c;她领导的中国管理团队都有深厚行业积累&#xff0c;以保证业务的连续性&#xff0c;并共同推动未来的成功。普洛斯中国前常务副董事长诸…

作者头像 李华
网站建设 2026/4/15 15:45:05

“棋圣”聂卫平去世 享年74岁

九派新闻01-15 07:58:06记者从中国围棋协会获悉&#xff0c;中国围棋协会名誉主席、“棋圣”聂卫平九段昨晚在北京病逝&#xff0c;享年74岁。聂卫平是上世纪中国围棋振兴的关键人物&#xff0c;在八十年代的中日围棋擂台赛中&#xff0c;他作为主将力挽狂澜&#xff0c;连胜多…

作者头像 李华