news 2026/4/15 7:50:32

PyFlink Table API Data Types DataType 是什么、UDF 类型声明怎么写、Python / Pandas 类型映射一文搞懂

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
PyFlink Table API Data Types DataType 是什么、UDF 类型声明怎么写、Python / Pandas 类型映射一文搞懂

1. DataType 是什么:逻辑类型,不等于物理存储类型

DataType 描述的是表生态里一个值的逻辑类型(Logical Type),比如BIGINTVARCHARDECIMAL(10,2)ROW<...>

关键理解点:

  • DataType 只定义“是什么类型”,不暗示它在网络传输或存储时怎么编码(那是物理表示层的事情)。
  • PyFlink 中所有预定义类型都在pyflink.table.types,并且推荐用DataTypes工具类来构造。

典型写法:

frompyflink.table.typesimportDataTypes DataTypes.BIGINT()DataTypes.STRING()DataTypes.ROW([DataTypes.FIELD("id",DataTypes.BIGINT()),DataTypes.FIELD("name",DataTypes.STRING())])

在写 UDF 时,你常会看到两种声明方式:

  • 用字符串:result_type='ROW<id BIGINT, data STRING>'
  • 用 DataTypes:result_type=DataTypes.ROW([...])

两者都行;工程里建议 DataTypes 写法更安全(IDE 友好,拼写不易错)。

2. DataType 与 Python 类型映射:UDF 参数/返回值怎么落地

当你在 UDF 声明了 DataType,Flink 会做两件事:

1)把输入列值转换成对应的 Python 对象传给你
2)要求你的返回值类型匹配你声明的 DataType(否则运行时会报类型/序列化问题)

你给的映射表可以直接当“速查表”。

2.1 标量 UDF:DataType → Python Type

Data TypePython Type
BOOLEANbool
TINYINT / SMALLINT / INT / BIGINTint
FLOAT / DOUBLEfloat
VARCHARstr
VARBINARYbytes
DECIMALdecimal.Decimal
DATEdatetime.date
TIMEdatetime.time
TimestampTypedatetime.datetime
LocalZonedTimestampTypedatetime.datetime
INTERVAL YEAR TO MONTHint(注意:pandas 不支持)
INTERVAL DAY TO SECONDdatetime.timedelta(注意:pandas 不支持)
ARRAYlist
MULTISETlist(Not Supported Yet in pandas)
MAPdict(Not Supported Yet in pandas)
ROWpyflink.common.Row

2.2 向量化 pandas UDF:输入/输出是 pandas.Series(元素类型由 DataType 决定)

文档里的核心点是:

对于 vectorized Python UDF,输入类型和输出类型是pandas.Series,Series 里每个元素类型对应你声明的 DataType。

映射中 pandas type 这一列很关键:

Data TypePandas Type
BOOLEANnumpy.bool_
INTnumpy.int32
BIGINTnumpy.int64
FLOATnumpy.float32
DOUBLEnumpy.float64
VARCHARstr
VARBINARYbytes
DECIMALdecimal.Decimal
DATE / TIME / Timestampdatetime.*
ARRAYnumpy.ndarray
ROWdict

注意:表里也写了很多Not Supported Yet,比如MAP/MULTISET/INTERVAL在 pandas UDF 场景还不支持或不完整,生产里尽量绕开或提前验证。

3. 复合类型在 UDF 中的使用:ARRAY / MAP / ROW

这是工程里最容易踩坑的部分:复合类型传到 Python 侧会变成什么?

3.1 ARRAY:Python 侧是 list(pandas 侧是 ndarray)

  • 标量 UDF:拿到的是list
  • pandas UDF:单元格可能是numpy.ndarray

示例(标量 UDF):

frompyflink.table.udfimportudffrompyflink.table.typesimportDataTypes@udf(input_types=[DataTypes.ARRAY(DataTypes.INT())],result_type=DataTypes.INT())defarray_len(arr):returnlen(arr)ifarrisnotNoneelse0

3.2 MAP:Python 侧是 dict(pandas UDF 暂不支持)

MAP 在 Python 标量 UDF 中是dict,但 pandas UDF 不支持(表里明确写了 Not Supported Yet),如果你需要向量化处理 MAP,通常要先在 SQL/Table 层把 MAP 展开/转换成基础列。

3.3 ROW:Python 侧是 Row(pandas UDF 侧是 dict)

  • 标量 UDF:ROW →pyflink.common.Row
  • pandas UDF:ROW →dict

这也是为什么你在 Row-based Operations 里看到两种写法:

  • def func2(r: Row) -> Row: ...
  • pandas 模式下DataFrame里取列,再拼DataFrame返回

4. 为什么“声明类型”很重要:类型决定运行时转换与序列化

在 PyFlink 里,很多问题的根源都是“类型不明确”:

  • 你写 UDF 返回了 Pythonint,但声明的是STRING
  • 你返回 Row 的字段数量/顺序与声明的ROW<...>不一致
  • pandas UDF 返回列的 dtype 跟 DataType 冲突
  • 你用了 pandas UDF 但传了 MAP/INTERVAL 等 pandas 不支持类型

最稳的做法是:

  • UDF 明确写input_types/result_type(不要只靠推断)
  • 复合类型尽量用DataTypes.ROW/ARRAY/...明确结构
  • pandas UDF 场景优先用基础数值/字符串/时间类型

5. 给你一套“UDF 类型声明模板”(生产更稳)

5.1 标量 UDF:建议用 DataTypes 写清楚

frompyflink.table.udfimportudffrompyflink.table.typesimportDataTypes@udf(input_types=[DataTypes.BIGINT(),DataTypes.STRING()],result_type=DataTypes.ROW([DataTypes.FIELD("id",DataTypes.BIGINT()),DataTypes.FIELD("data",DataTypes.STRING())]))defenrich(id_,data):frompyflink.commonimportRowreturnRow(id_,data+"_x")

5.2 pandas UDF:牢记输入输出是 Series/DataFrame(元素类型由 DataType 控制)

importpandasaspdfrompyflink.table.udfimportudffrompyflink.table.typesimportDataTypes@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id",DataTypes.BIGINT()),DataTypes.FIELD("data",DataTypes.STRING())]),func_type="pandas")defenrich_vec(df:pd.DataFrame)->pd.DataFrame:returnpd.concat([df["id"],df["data"]+"_x"],axis=1)

如果你接下来要写 CSDN 系列文章,我建议你把这一篇作为“类型基础篇”,下一篇可以直接承接你前面写的 Row-based Operations:把每种算子(map/flat_map/aggregate/flat_aggregate)里涉及到的result_typeinput_types、ROW 扁平化、pandas 模式的 dtype 坑,都用本文的类型映射做解释,会非常连贯。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/15 4:10:39

直播画面内容审核:实时识别违规视觉元素

直播画面内容审核&#xff1a;实时识别违规视觉元素 技术背景与行业挑战 随着直播电商、社交直播和在线教育的迅猛发展&#xff0c;实时内容安全审核已成为平台运营的核心需求。传统的人工审核模式在面对海量并发流媒体时&#xff0c;存在响应延迟高、人力成本大、覆盖不全面等…

作者头像 李华
网站建设 2026/4/15 0:42:48

为什么你的MCP加密仍不安全?揭秘8大常见配置陷阱

第一章&#xff1a;为什么你的MCP加密仍不安全&#xff1f;揭秘8大常见配置陷阱许多企业认为只要启用了MCP&#xff08;Message Confidentiality Protocol&#xff09;加密&#xff0c;通信数据就自动处于安全状态。然而&#xff0c;错误的配置会严重削弱加密机制的实际防护能力…

作者头像 李华
网站建设 2026/4/13 20:58:10

基于单片机的塑料厂房气体检测系统设计

摘 要 目前&#xff0c;在社会主义现代化建设中&#xff0c;火灾不断的增多&#xff0c;而在塑料厂房中大部分火灾都是因为生产塑料的机器不断的运转导致机体周围的温度不断升高&#xff0c;超过了可燃气的燃点&#xff0c;或者是生产塑料的机器昌盛的可燃气过多&#xff0c;从…

作者头像 李华
网站建设 2026/4/14 7:34:35

万物识别模型轻量化:在低配GPU上运行中文AI

万物识别模型轻量化&#xff1a;在低配GPU上运行中文AI 如果你是一位开发者&#xff0c;想要将中文物体识别功能集成到移动应用中&#xff0c;但发现云端API成本太高&#xff0c;那么这篇文章就是为你准备的。本文将详细介绍如何在消费级GPU上高效运行轻量化的万物识别模型&…

作者头像 李华
网站建设 2026/4/14 10:53:58

华为宣布鸿蒙星河版四季度商用,“生态进入第二阶段”

鸿蒙devecostudio 1月18日&#xff0c;在鸿蒙生态千帆启航仪式上&#xff0c;华为宣布原生鸿蒙操作系统星河版&#xff08;HarmonyOSNEXT&#xff0c;鸿蒙星河版&#xff09;面向开发者开放申请。观察者网在现场了解到&#xff0c;鸿蒙星河版将在今年二季度推出开发者Beta版&am…

作者头像 李华
网站建设 2026/4/12 3:47:00

十分钟搭建属于你的视觉搜索引擎

十分钟搭建属于你的视觉搜索引擎 作为一名收藏爱好者&#xff0c;你是否曾为整理数千张古董照片而头疼&#xff1f;想要快速找到特定材质或年代的藏品&#xff0c;却苦于没有合适的工具&#xff1f;本文将介绍如何利用现成的视觉识别服务&#xff0c;十分钟内搭建一个专属于你…

作者头像 李华