1. State Processor API 能解决什么问题
典型用法(都是真实生产会遇到的):
状态审计/验收:对线上作业打一个 savepoint,用批作业读出来做一致性校验
状态修复:修掉异常 key、纠正不一致 entries、清理脏数据
状态引导(bootstrap):从离线历史数据构造 state,写成 savepoint,给流作业冷启动
作业演进不丢状态:
- 修改 state 的数据类型(兼容/迁移)
- 调整算子最大并行度(maxParallelism)
- 拆分/合并 operator state
- 重分配 operator UID(或从 UID hash 迁移到 UID)
依赖(Flink 2.2.0 示例):
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-state-processor-api</artifactId><version>2.2.0</version></dependency>2. 心智模型:Savepoint 就是一座“数据库”
理解它,你基本就通了。
一个 Flink Job 由多个 operator 组成(Src/Proc/Snk)
每个 operator 可能有两类状态:
- Operator State:算子级别,按 subtask 组织,常见 ListState/UnionListState/BroadcastState
- Keyed State:按 key 分区的状态,像分布式 KV(ValueState/ListState/MapState/AggregatingState…)
State Processor API 会把一个 savepoint 映射成“数据库”:
- 每个 operator(用 UID 标识)是一个 namespace
- 每个 operator state映射成单列表(所有 subtasks 的 list entry 汇总)
- 同一个 operator 的所有 keyed states合并到一张表:
key一列 + 每个 keyed state 一个列(同 key 的不同 state 并在一行)
这也解释了为什么 Table/SQL 很适合做状态分析:它天然就是在查表。
3. 先把“算子识别”做好:UID 优先,hash 兜底
State Processor API 通过OperatorIdentifier定位算子:
- 最推荐:
OperatorIdentifier.forUid("my-uid") - UID 不可用时(历史作业没设置 UID):
OperatorIdentifier.forUidHash("...")
工程建议:生产作业务必显式.uid("xxx"),否则后面迁移/修状态会非常痛苦。
4. DataStream API 读状态:SavepointReader
读取的第一步:给出 savepoint/checkpoint 路径 +与原作业一致的 StateBackend(兼容性规则与正常恢复一致)。
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();SavepointReadersp=SavepointReader.read(env,"hdfs://path/",newHashMapStateBackend());4.1 读 Operator List State
对应作业里getListState(new ListStateDescriptor<>("list-state", ...))写出的状态:
DataStream<Integer>listState=sp.readListState(OperatorIdentifier.forUid("my-uid"),"list-state",Types.INT);4.2 读 Union List State
对应getUnionListState:读取时会返回“等价于并行度 1 的单份状态”。
DataStream<Integer>unionState=sp.readUnionState(OperatorIdentifier.forUid("my-uid"),"union-state",Types.INT);4.3 读 Broadcast State
BroadcastState 读取同样是“单份副本”语义:
DataStream<Tuple2<Integer,Integer>>bc=sp.readBroadcastState(OperatorIdentifier.forUid("my-uid"),"broadcast-state",Types.INT,Types.INT);4.4 自定义序列化器
如果原 StateDescriptor 用了自定义TypeSerializer,读取也要对应指定:
DataStream<Integer>listState=sp.readListState(OperatorIdentifier.forUid("uid"),"list-state",Types.INT,newMyCustomIntSerializer());5. DataStream API 读 Keyed State:KeyedStateReaderFunction(最强也最容易踩坑)
Keyed State 的读取入口是:
DataStream<Out>ds=sp.readKeyedState(OperatorIdentifier.forUid("my-uid"),newMyReaderFunction());你需要实现KeyedStateReaderFunction<KeyType, OutType>,在open()里注册你要读的各种 state descriptor,然后在readKey()中针对每个 key 输出一条(或多条)结果。
示例:读取ValueState<Integer>+ListState<Long>:
publicstaticclassKeyedState{publicintkey;publicintvalue;publicList<Long>times;}publicstaticclassReaderFunctionextendsKeyedStateReaderFunction<Integer,KeyedState>{privateValueState<Integer>state;privateListState<Long>updateTimes;@Overridepublicvoidopen(OpenContextopenContext){state=getRuntimeContext().getState(newValueStateDescriptor<>("state",Types.INT));updateTimes=getRuntimeContext().getListState(newListStateDescriptor<>("times",Types.LONG));}@OverridepublicvoidreadKey(Integerkey,Contextctx,Collector<KeyedState>out)throwsException{KeyedStatedata=newKeyedState();data.key=key;data.value=state.value();data.times=StreamSupport.stream(updateTimes.get().spliterator(),false).collect(Collectors.toList());out.collect(data);}}关键坑点(非常重要):
- 所有 state descriptor 必须在
open()里“提前注册”
文档明确说:在readKey()里再调用getRuntimeContext().get*State会直接抛RuntimeException。 Context还能访问该 key 的元数据:event time / processing time timers(适合做诊断)
6. 读 Window State:读窗口聚合结果 + 定时器
State Processor API 支持读取窗口算子状态,适合排查“窗口聚合对不对、定时器是否异常”等。
使用方式:指定 window assigner + 聚合函数 + 可选WindowReaderFunction进行“富化输出”。
示例:每分钟按 userId 统计点击数的窗口聚合,读出count、window、trigger timers:
savepoint.window(TumblingEventTimeWindows.of(Duration.ofMinutes(1))).aggregate("click-window",newClickCounter(),newClickReader(),Types.STRING,Types.INT,Types.INT).print();并且在WindowReaderFunction的 context 中还能读 trigger state(CountTrigger 或自定义 trigger 的状态)。
7. 写 Savepoint:SavepointWriter + BootstrapTransformation(用离线数据“造状态”)
写 savepoint 的核心用途:bootstrap。比如你要让新作业上线时直接带着历史累计值,而不是从 0 开始。
注意:写 savepoint 的程序必须是BATCH 执行。
7.1 基本写法:newSavepoint + withOperator
intmaxParallelism=128;SavepointWriter.newSavepoint(env,newHashMapStateBackend(),maxParallelism).withOperator(OperatorIdentifier.forUid("uid1"),transformation1).withOperator(OperatorIdentifier.forUid("uid2"),transformation2).write(savepointPath);这里最关键的是:uid1/uid2必须和未来要恢复的 DataStream 作业中算子的.uid("...")一一对应,否则恢复不了。
7.2 写 Operator State:StateBootstrapFunction
适合CheckpointedFunction里用的 operator list state。
publicclassSimpleBootstrapFunctionextendsStateBootstrapFunction<Integer>{privateListState<Integer>state;@OverridepublicvoidinitializeState(FunctionInitializationContextcontext)throwsException{state=context.getOperatorState().getListState(newListStateDescriptor<>("state",Types.INT));}@OverridepublicvoidprocessElement(Integervalue,Contextctx)throwsException{state.add(value);}}构造 transformation:
StateBootstrapTransformationt=OperatorTransformation.bootstrapWith(env.fromElements(1,2,3)).transform(newSimpleBootstrapFunction());7.3 写 Broadcast State:BroadcastStateBootstrapFunction
Broadcast state 要求“全量能放进内存”,和流作业的广播语义一致。
publicclassCurrencyBootstrapFunctionextendsBroadcastStateBootstrapFunction<CurrencyRate>{publicstaticfinalMapStateDescriptor<String,Double>descriptor=newMapStateDescriptor<>("currency-rates",Types.STRING,Types.DOUBLE);@OverridepublicvoidprocessElement(CurrencyRatev,Contextctx)throwsException{ctx.getBroadcastState(descriptor).put(v.currency,v.rate);}}7.4 写 Keyed State:KeyedStateBootstrapFunction(还能设置 timers)
publicclassAccountBootstrapperextendsKeyedStateBootstrapFunction<Integer,Account>{privateValueState<Double>total;@Overridepublicvoidopen(OpenContextopenContext){total=getRuntimeContext().getState(newValueStateDescriptor<>("total",Types.DOUBLE));}@OverridepublicvoidprocessElement(Accountvalue,Contextctx)throwsException{total.update(value.amount);}}组装 transformation:
StateBootstrapTransformation<Account>t=OperatorTransformation.bootstrapWith(accountDataSet).keyBy(acc->acc.id).transform(newAccountBootstrapper());定时器注意点:
- bootstrap 函数里设置的 timers不会在 bootstrap 过程中触发
- 恢复到流作业后才会激活
- 如果设置了 processing time timer,但恢复时刻已晚于触发时间,则会在作业启动后立刻触发
- 文档强调:如果 bootstrap 创建 timers,恢复端必须用 process 类型算子(process function family)
7.5 写 Window State:必须严格匹配原窗口配置
写窗口状态时,bootstrap 侧的窗口 assigner/trigger/evictor/聚合逻辑要与未来流作业完全一致,否则恢复语义会对不上。
8. 基于已有 Savepoint 修改:fromExistingSavepoint(增量补状态)
常见场景:老作业有 savepoint,你只想给新加的算子补一份 state,不动别的。
SavepointWriter.fromExistingSavepoint(env,oldPath,newHashMapStateBackend()).withOperator(OperatorIdentifier.forUid("uid"),transformation).write(newPath);9. 改 UID 或 UID hash:救命技能
当历史作业没显式 UID 时,你可能只能从日志里拿到 uid hash,此时可以先把 hash 映射成可控 UID:
savepointWriter.changeOperatorIdentifier(OperatorIdentifier.forUidHash("2feb7f8bcc404c3ac8a981959780bd78"),OperatorIdentifier.forUid("new-uid"));或者直接替换旧 UID 为新 UID(算子重命名/重构时很常见):
savepointWriter.changeOperatorIdentifier(OperatorIdentifier.forUid("old-uid"),OperatorIdentifier.forUid("new-uid"));10. Table/SQL 读状态:把 Savepoint 当表查(只支持 keyed state)
如果你更喜欢 SQL(或者想给运维/数据同学一个可读的排查方式),State Table API 很香。
重要限制:State Table API 只支持 keyed state。
10.1 读元信息:savepoint_metadata
LOADMODULE state;SELECT*FROMsavepoint_metadata('/root/dir/of/checkpoint-data/chk-1');它会告诉你 checkpoint id、operator uid、uid hash、并行度、max parallelism、各类 state size 等信息,定位问题非常快。
10.2 建表读取 keyed state:savepoint connector
CREATETABLEstate_table(kINTEGER,MyValueStateINTEGER,MyAccountValueStateROW<idINTEGER,amountDOUBLE>,MyListState ARRAY<INTEGER>,MyMapState MAP<INTEGER,INTEGER>,MyAvroStateROW<longDataBIGINT>,PRIMARYKEY(k)NOTENFORCED)WITH('connector'='savepoint','state.backend.type'='rocksdb','state.path'='/root/dir/of/checkpoint-data/chk-1','operator.uid'='my-uid','fields.MyAvroState.value-type-factory'='org.apache.flink.state.table.AvroSavepointTypeInformationFactory');然后你就可以:
SELECTk,MyValueStateFROMstate_tableWHEREk=42;SELECTCOUNT(*)FROMstate_table;SELECTk,CARDINALITY(MyListState)ASlist_lenFROMstate_table;几个实战点:
state.backend.type必须与原作业一致(hashmap/rocksdb)operator.uid与operator.uid.hash二选一- state name 不符合列名时,用
fields.#.state-name覆盖 - MapState 的 key/value 类型推断不准时,用
fields.#.key-class/value-class或 type factory 明确指定 - Avro 这类复杂类型,常需要
value-type-factory(文档给了SavepointTypeInformationFactory的套路)
10.3 默认类型映射与“快捷查看”
SQL connector 会对基础类型做默认映射。还有个很实用的“快捷方式”:
- 如果你把一个复杂 Java 类映射成
STRING列,那么 SQL 读出来就是该对象toString()的结果
适合“先看个大概、快速解释性查询”,排障很省事。
11. 生产最佳实践清单
- 作业里给关键算子统一规划 UID(强烈建议)
- Source/关键处理算子/Sink 都显式
.uid("...")
- 定义 state descriptor 名称要稳定
- State Processor 读写都靠 state name,改名等于断档
- RocksDB 作业读状态也尽量用 RocksDB backend
- backend 不一致会直接导致读取/恢复不兼容
- 写 savepoint 的 bootstrap 作业一定是 BATCH
- 别拿 streaming env 去写,否则会踩执行模式问题
- 大状态先做“元信息盘点”
- 先
savepoint_metadata看 size、并行度、max parallelism,再决定怎么读/怎么修