1. 核心语义:HBase 永远是 Upsert
- HBase Connector始终按 Upsert 模式交换 changelog(支持 UPDATE/DELETE 的那套语义)
- 必须有 rowkey 字段(表里必须声明一个原子类型字段作为 rowkey)
- PRIMARY KEY 必须定义在 rowkey 上;如果不写 PRIMARY KEY,connector 默认把 rowkey 当主键
一句话:HBase 表的主键就是 rowkey,Flink 也要求你按这个规则来。
2. 表结构映射:列族必须用 ROW 类型声明
HBase 的数据模型:rowkey + column family + qualifier + value
Flink SQL 里映射规则:
除 ROW 类型字段之外的单一原子类型字段会被识别为rowkey
每个列族(family)必须声明为 ROW<…>
- ROW 字段名 = column family 名
- ROW 内嵌字段名 = qualifier 名
你不需要把 HBase 的所有列族/qualifier 都声明出来,只声明你查询/写入会用到的就行。
2.1 建表示例(官方格式)
CREATETABLEhTable(rowkeyINT,family1ROW<q1INT>,family2ROW<q2 STRING,q3BIGINT>,family3ROW<q4DOUBLE,q5BOOLEAN,q6 STRING>,PRIMARYKEY(rowkey)NOTENFORCED)WITH('connector'='hbase-2.2','table-name'='mytable','zookeeper.quorum'='localhost:2181');注意:
- rowkey 字段名可以随便起,但如果是 SQL 关键字要用反引号(
rowkey) table-name默认 namespace 是default,指定 namespace 用namespace:table
3. 写入:用 ROW(…) 构造列族
写入 HBase 时,每个列族要传一个 ROW 值:
INSERTINTOhTableSELECTrowkey,ROW(f1q1),ROW(f2q2,f2q3),ROW(f3q4,f3q5,f3q6)FROMT;这里ROW(...)的位置要和 DDL 中列族字段的声明顺序一致。
3.1 是否写入 NULL:sink.ignore-null-value
- 默认
sink.ignore-null-value = false:null 也会写(具体到 HBase 是空 bytes 的编码逻辑,见后文) - 如果你希望“字段为 null 就不覆盖 HBase 里已有值”,可以考虑:
'sink.ignore-null-value'='true'这在做“增量补字段”或“只更新非空字段”的场景很有用。
4. 读取:Scan 与维表 Join
4.1 扫描查询(Bounded Scan)
SELECTrowkey,family1,family3.q4,family3.q6FROMhTable;4.2 Temporal Join:把 HBase 当维表
SELECT*FROMmyTopicLEFTJOINhTableFORSYSTEM_TIMEASOFmyTopic.proctimeONmyTopic.key=hTable.rowkey;- Lookup Source:同步(Sync Mode)
- 可选开启异步 lookup(仅 hbase-2.2 支持):
'lookup.async'='true'异步 lookup 的典型价值:维表查询慢或并发大时,减少算子阻塞,提高吞吐。
5. Metadata:写入时可以指定 HBase mutation 的 timestamp/ttl
HBase connector 支持两个可写元数据列(W):
timestamp:TIMESTAMP_LTZ(3) —— mutation 时间戳ttl:BIGINT —— mutation TTL(毫秒)
用法要点:
- 这是写入相关的元数据列;如果是只读列通常要 VIRTUAL,但这里是可写字段(W)
你可以在表里加上元数据列(示意):
CREATETABLEhTable(rowkeyINT,family1ROW<q1INT>,ts TIMESTAMP_LTZ(3)METADATAFROM'timestamp',ttl_msBIGINTMETADATAFROM'ttl',PRIMARYKEY(rowkey)NOTENFORCED)WITH('connector'='hbase-2.2','table-name'='mytable','zookeeper.quorum'='localhost:2181');然后 insert 时把 ts/ttl 一并写进去(适合“按业务时间回写版本”或“写入即过期”这类需求)。
6. 写入性能:三件套(max-size / max-rows / interval)
HBase sink 内置缓冲,靠批量 flush 提升吞吐:
sink.buffer-flush.max-size(默认 2mb)sink.buffer-flush.max-rows(默认 1000)sink.buffer-flush.interval(默认 1s)
含义:达到任意阈值就触发一次 flush。
建议思路:
- 吞吐优先:提高 max-rows / max-size,适当拉大 interval
代价:端到端延迟增加,checkpoint/反压风险上升 - 延迟优先:缩小 interval(甚至 100~500ms),控制 max-rows
代价:写入批量变小,吞吐下降,HBase 压力可能更大 - 想完全异步依赖 interval:可以把 max-size/max-rows 置 0,再用 interval 控制(文档允许)
7. Lookup 缓存:PARTIAL 缓存怎么配
Lookup 默认不缓存(NONE)。开启:
'lookup.cache'='PARTIAL','lookup.partial-cache.max-rows'='100000','lookup.partial-cache.expire-after-write'='10 min','lookup.partial-cache.expire-after-access'='5 min','lookup.partial-cache.caching-missing-key'='true'关键点:
- 缓存是TaskManager 进程级,每个 TM 一份
caching-missing-key=true会把“查不到”的结果也缓存,能减少热点 miss 的反复 IO,但如果维表会新增 key,可能短时间内“查不到”被缓存住- TTL 越短越接近实时,但外部请求越多;TTL 越长性能越好但数据更“旧”
如果你的维表更新很频繁:TTL 不要太长;或者不缓存。
8. HBase 配置透传:properties.*(Kerberos 等)
properties.*可以透传任意 HBase 配置:
'properties.hbase.security.authentication'='kerberos'Flink 会去掉properties.前缀,把剩余 key/value 交给底层 HBase Client。
这对生产环境很关键:Kerberos、RPC 超时、重试、连接池相关参数都靠它。
9. 数据类型映射与“空 bytes = null”规则(超级容易踩)
HBase 存 byte[],Flink HBase connector 用org.apache.hadoop.hbase.util.Bytes做序列化/反序列化。
核心规则:
- 除 STRING 外的所有类型:
写入 null → 编码成 empty bytes
读取 empty bytes → 解码成 null - STRING 类型例外:
empty bytes 会被当成一个“特殊 null literal”,由null-string-literal决定(默认是"null"字符串)
如果你业务里字符串可能真的会出现"null"这样的值,建议明确设置null-string-literal,避免歧义:
'null-string-literal'='__HBASE_NULL__'10. 一个更“生产味”的 DDL 模板(可直接套)
CREATETABLEuser_profile_hbase(user_id STRING,-- rowkey(原子类型字段)baseROW<name STRING,ageINT,statusBOOLEAN>,-- 列族 basestatROW<uvBIGINT,pvBIGINT>,-- 列族 statPRIMARYKEY(user_id)NOTENFORCED)WITH('connector'='hbase-2.2','table-name'='ns:user_profile','zookeeper.quorum'='zk1:2181,zk2:2181,zk3:2181','zookeeper.znode.parent'='/hbase','null-string-literal'='__HBASE_NULL__','sink.buffer-flush.max-rows'='2000','sink.buffer-flush.max-size'='4mb','sink.buffer-flush.interval'='1s','sink.ignore-null-value'='true','lookup.async'='true','lookup.cache'='PARTIAL','lookup.partial-cache.max-rows'='200000','lookup.partial-cache.expire-after-write'='10 min','lookup.partial-cache.caching-missing-key'='true');你接下来如果要把它写成发布到 CSDN 的博客,我也能按“实战场景”帮你补齐两段很关键的内容:
- RowKey 设计与热点规避(salt/hash 前缀、时间倒排、业务分桶)
- 一致性语义与幂等写(upsert + checkpoint + 重放时的覆盖行为,以及 null 覆盖问题怎么处理)