news 2026/5/16 8:29:21

Flink 读文本文件TextLineInputFormat + FileSource(批/流一体)+ 目录持续监控

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink 读文本文件TextLineInputFormat + FileSource(批/流一体)+ 目录持续监控

1. 依赖准备:flink-connector-files

Java 工程要使用文本文件 Source,需要引入 Flink 的文件连接器依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>2.2.0</version></dependency>

PyFlink 用户通常可以直接在作业里使用(但如果你集群环境缺少对应 jar,也需要通过 Python dependency management 方式携带)。

2. 为什么用 FileSource + TextLineInputFormat

TextLineInputFormat 解决两件事:

  • 按行切分:每行一个 record
  • 字符集解码:使用InputStreamReader支持多种 charset(UTF-8、GBK 等)

而 FileSource 解决两件事:

  • bounded:一次性读完目录/文件(批处理)
  • continuous:持续监控目录,新文件出现就继续读(流式文件输入)

也就是说,你可以用同一套 Source,覆盖两类场景:

  • 离线回放历史日志
  • 实时消费不断落盘的新日志文件

3. 批处理模式(Bounded):读完就结束

目标:把一个文本文件(或目录里所有文本文件)的每一行读成String,生成DataStream<String>

因为文本行一般不自带事件时间,所以不需要 watermark:

finalFileSource<String>source=FileSource.forRecordStreamFormat(newTextLineInputFormat(),/* Flink Path */).build();finalDataStream<String>stream=env.fromSource(source,WatermarkStrategy.noWatermarks(),"file-source");

适用场景:

  • 跑一次把历史文件处理完(ETL、离线修数、回放)

4. 流处理模式(Continuous):持续监控目录,新文件不断加入

目标:目录持续落文件(例如按小时切日志),Flink 任务一直跑,新文件出现就读,DataStream 会“无限增长”。

通过monitorContinuously(Duration)开启目录监控,比如每 1 秒扫描一次:

finalFileSource<String>source=FileSource.forRecordStreamFormat(newTextLineInputFormat(),/* Flink Path */).monitorContinuously(Duration.ofSeconds(1L)).build();finalDataStream<String>stream=env.fromSource(source,WatermarkStrategy.noWatermarks(),"file-source");

适用场景:

  • 应用日志落盘目录(log rolling)
  • 上游系统定时导出文件到目录
  • 简易的“文件流”采集管道(没有 Kafka 也能跑)

5. 生产建议:文本文件“流式监控”最容易踩的坑

5.1 只监控“新文件”,不等于“追尾追加写”

大多数文件监控模式更适合“文件落地后不再变”(写完再 rename/commit)。如果你希望读一个不断追加的单文件(类似tail -f),要非常谨慎:有些文件系统/写入方式会导致重复读或读到半行。

推荐的落地方式:

  • 上游写临时文件(.tmp),写完后rename 成正式文件名
  • Flink 只消费正式文件名规则(例如不匹配.tmp

5.2 监控频率不是越小越好

monitorContinuously(1s)会频繁扫描目录:

  • 目录文件数大时会产生明显压力
  • 对对象存储(S3/OSS)类系统,list 成本更高

经验:

  • 本地/小目录:1s~5s 可以
  • 大目录/对象存储:10s~60s 起步,并控制目录分区层级(按日期/小时分层)

5.3 字符集与脏数据治理要提前考虑

TextLineInputFormat 基于InputStreamReader解码,编码不一致会出现乱码或异常。建议:

  • 统一上游编码(最好 UTF-8)
  • 对异常行做侧输出(side output)或打到 DLQ(如果你后续接 Kafka)

5.4 文本行没有事件时间时,watermark 怎么办

如果你的行里其实包含时间戳(比如日志行开头有2026-01-15 12:34:56),你可以在 map/flatMap 里解析事件时间,再配置 watermark 策略;否则默认 noWatermarks 没问题。

6. 一句话总结

  • TextLineInputFormat:把文件按“行”读成String,并处理字符集解码
  • FileSource:同一套代码支持批(bounded)与流(continuous 目录监控)
  • 批:.build()直接读完结束
  • 流:.monitorContinuously(Duration)目录新文件持续进入
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/16 12:53:46

让大模型更“懂”外部知识:RAG技术及未来发展综述

&#xff5c;引言 如何更好地结合外部数据&#xff0c;如何提升模型处理专业领域问题的可靠性&#xff0c;是大语言模型应用开发中值得不断思考的问题。针对此&#xff0c;微软亚洲研究院的研究员们提出了一种基于查询需求分层的 RAG 任务分类法&#xff0c;从显式事实、隐式事…

作者头像 李华
网站建设 2026/5/6 1:28:33

从单智能体到多智能体:九种模式教你搭建高效AI应用

想要构建一个智能体应用&#xff0c;最重要的是什么&#xff1f;可能很多人首先会想到要选择一个性能强大的大模型。这个回答没错&#xff0c;毕竟当前的LLM Based Agent哪能缺少LLM的支撑。但事实却是&#xff0c;很多基于先进大模型构建的智能体没能体现出应用效果&#xff0…

作者头像 李华
网站建设 2026/5/12 6:51:44

降重去 AI 双 buff 拉满!虎贲等考 AI 解锁论文 “隐形优化” 新姿势

当论文查重率飘红的焦虑&#xff0c;遇上 AIGC 检测的 “生死大考”&#xff0c;多少毕业生陷入 “改了又改&#xff0c;还是过不了关” 的死循环&#xff1f;市面上的降重工具要么是简单同义词替换&#xff0c;改完逻辑混乱&#xff1b;要么只能降重复率&#xff0c;AI 痕迹依…

作者头像 李华
网站建设 2026/5/10 20:08:16

【Java毕设源码分享】基于springboot+vue的智慧物业服务系统的设计与实现(程序+文档+代码讲解+一条龙定制)

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

作者头像 李华
网站建设 2026/5/15 18:29:00

在CentOS上快速安装NVM和Node.js 14:完整指南与优化方案

个人名片 &#x1f393;作者简介&#xff1a;java领域优质创作者 &#x1f310;个人主页&#xff1a;码农阿豪 &#x1f4de;工作室&#xff1a;新空间代码工作室&#xff08;提供各种软件服务) &#x1f48c;个人邮箱&#xff1a;[2435024119qq.com] &#x1f4f1;个人微信&a…

作者头像 李华