Iceberg在Cloudera CDP集群详细操作步骤
在Cloudera Data Platform(CDP)集群(包括CDP Private Cloud Base/Experience 7.1.7+或CDP Public Cloud)中详细、可落地的 Apache Iceberg操作步骤,涵盖环境准备、表创建、数据操作、高级功能及运维验证,适用于生产部署。
✅一、前提条件确认
1. CDP环境要求
项目 | 要求 |
CDP版本 | Private Cloud ≥ 7.1.7 SP1(推荐7.2+);Public Cloud已启用DE/DW |
Spark版本 | Spark 3.1+(CDP默认提供Spark 3) |
Hive Metastore | 必须运行(Iceberg使用Hive Catalog) |
存储 | HDFS / S3A / Ozone(需配置读写权限) |
用户权限 | 具备Spark/Hive数据库创建权限+ HDFS/S3写权限 |
⚠️ 注意:截至 CDP 7.2,仅支持Hive Catalog,不支持 Nessie或自定义 Catalog。
🔧二、在 CDP Private Cloud中启用 Iceberg(Cloudera Manager配置)
步骤 1:登录 Cloudera Manager(CM)
- 地址:https://<cm-host>:7183
- 用户:具备Full Administrator权限
步骤 2:配置 Spark 3启用 Iceberg扩展
- 进入Spark 3 → Configuration
- 搜索并设置以下参数:
参数名 | 值 | 说明 |
spark.sql.extensions | org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions | 启用Iceberg SQL扩展(如MERGE INTO, CALL) |
spark.sql.catalog.spark_catalog | org.apache.iceberg.spark.SparkSessionCatalog | 将默认catalog替换为Iceberg |
spark.sql.catalog.spark_catalog.type | hive | 使用Hive Metastore作为元数据源 |
spark.sql.catalog.spark_catalog.warehouse | hdfs://nameservice1/warehouse/iceberg | (可选)指定Iceberg表默认存储路径 |
💡 提示:若使用 S3,路径应为 s3a://your-bucket/iceberg-warehouse
步骤 3:重启 Spark服务
- 在 CM 中重启 Spark 3 History Server和所有 Gateway角色
🧪三、详细操作流程(通过 Spark SQL)
以下操作可在Hue(Impala/Spark SQL)、Beeline、spark3-sql CLI或CDE Job中执行。
▶步骤 1:创建数据库和 Iceberg表
Sql:
--创建专用数据库
CREATE DATABASE IF NOT EXISTS iceberg_demo;
USE iceberg_demo;
--创建 Iceberg表(关键:USING iceberg)
CREATE TABLE sales (
order_id BIGINT,
customer_id STRING,
amount DECIMAL(10,2),
order_date DATE,
ts TIMESTAMP
)
USING iceberg
PARTITIONED BY (order_date);
✅ 验证是否为 Iceberg 表:
Sql:
DESCRIBE FORMATTED sales;
检查输出中:
- Table Type: ICEBERG
- Provider: iceberg
- Location: hdfs://.../iceberg_demo.db/sales
▶步骤 2:插入数据(Append)
Sql:
INSERT INTO sales VALUES
(1001, 'CUST-001', 299.99, DATE '2025-12-17', CURRENT_TIMESTAMP()),
(1002, 'CUST-002', 149.50, DATE '2025-12-17', CURRENT_TIMESTAMP());
▶步骤 3:查询与 Time Travel(时间旅行)
查看快照历史
Sql:
SELECT
committed_at,
snapshot_id,
parent_id,
operation
FROM sales.snapshots;
查询当前数据
Sql:
SELECT * FROM sales;
回溯到历史快照(按 ID)
Sql:
SELECT * FROM sales VERSION AS OF 123456789012345;
按时间戳查询
Sql:
SELECT * FROM sales FOR TIMESTAMP AS OF '2025-12-17 10:00:00';
▶步骤 4:更新与删除(Row-Level Operations)
要求:CDP ≥ 7.2,且表未开启 format-version=1(默认 v2 支持 delete)
删除数据
Sql:
DELETE FROM sales WHERE order_id = 1001;
更新数据(使用 MERGE INTO实现 Upsert)
Sql:
--创建临时表
CREATE OR REPLACE TEMP VIEW updates AS
SELECT 1002 AS order_id, 'CUST-002-NEW' AS customer_id, 159.99 AS amount;
--执行 Merge
MERGE INTO sales t
USING updates s
ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET
customer_id = s.customer_id,
amount = s.amount,
ts = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN INSERT *;
▶步骤 5:Schema Evolution(安全改表结构)
Sql:
-- 添加新列
ALTER TABLE sales ADD COLUMN channel STRING COMMENT '销售渠道';
-- 重命名列(CDP 7.2+)
ALTER TABLE sales RENAME COLUMN channel TO sales_channel;
-- 删除列(谨慎!)
ALTER TABLE sales DROP COLUMN sales_channel;
✅ 所有变更均记录在 metadata 中,旧快照仍可查询原始 schema。
▶步骤 6:分区演化(Partition Evolution)
假设原表按 order_date 分区,现需改为按 bucket(customer_id, 8):
Sql:
-- 添加新分区字段(隐式)
ALTER TABLE sales
ADD PARTITION FIELD bucket(customer_id, 8) AS customer_bucket;
-- 后续写入将自动使用新分区策略
INSERT INTO sales VALUES (1003, 'CUST-003', 99.99, DATE '2025-12-18', CURRENT_TIMESTAMP());
🔍 可通过 DESCRIBE sales; 查看分区字段变化。
📂四、目录结构验证(HDFS/S3)
在终端查看表物理结构:
Bash:
hdfs dfs -ls /warehouse/iceberg/iceberg_demo.db/sales/
应看到:
Text:
/metadata/
/data/
- /metadata/:包含 *.metadata.json、*.snap-*.avro 等元数据文件
- /data/:按分区组织的 Parquet 文件
🔄五、跨集群复制(Replication Manager)
前提:已安装 Replication Manager(见前文指南)
- 在 CM →Replication→Create Schedule
- 选择Hive Replication
- 配置:
- Source: 当前集群
- Destination: 目标集群
- Tables: iceberg_demo.sales
- ✅勾选 "Replicate Iceberg tables using metadata files"
- 设置调度频率(如每 30 分钟)
- 激活策略
✅ 目标集群将获得完整快照历史,支持 Time Travel。
🛠六、故障排查清单
问题现象 | 检查点 |
Table type is not ICEBERG | 是否漏写USING iceberg?是否配置了spark_catalog? |
Permission denied on HDFS path | 检查HDFS ACL:hdfs dfs -getfacl /warehouse/iceberg |
ClassNotFoundException: IcebergSparkSessionExtensions | 确认Spark 3 parcel包含Iceberg(CDP 7.1.7+默认包含) |
DELETE/MERGE not supported | 表是否为format-version=1?执行CALL spark_catalog.system.upgrade('iceberg_demo.sales')升级到v2 |
Replication失败 | 检查RM日志;确认目标集群Hive Metastore可写 |
📊七、性能与监控建议
优化项 | 建议 |
小文件合并 | 定期执行:CALL spark_catalog.system.rewrite_data_files('iceberg_demo.sales') |
过期快照清理 | CALL spark_catalog.system.expire_snapshots('iceberg_demo.sales', TIMESTAMP '2025-12-01 00:00:00') |
监控 | 通过CM → Spark History Server查看作业耗时;使用Hue查看表大小 |
✅总结:CDP中 Iceberg核心操作流