跳到主要内容

Apache Iceberg

Introduced or updated: v1.2.668

Databend 支持集成 Apache Iceberg catalog,从而增强了其数据管理和分析的兼容性和多功能性。通过将 Apache Iceberg 强大的元数据和存储管理功能无缝集成到平台中,扩展了 Databend 的功能。

Apache Iceberg 快速入门

如果您想快速试用 Apache Iceberg 并在本地进行表操作实验,可以使用 基于 Docker 的入门项目。此设置允许您:

  • 运行支持 Iceberg 的 Spark
  • 使用 REST catalog (Iceberg REST Fixture)
  • 使用 MinIO 模拟 S3 兼容的对象存储
  • 将示例 TPC-H 数据加载到 Iceberg 表中以进行查询测试

前提条件

在开始之前,请确保您的系统上已安装 Docker 和 Docker Compose。

启动 Iceberg 环境

git clone https://github.com/databendlabs/iceberg-quick-start.git
cd iceberg-quick-start
docker compose up -d

这将启动以下服务:

  • spark-iceberg: 带有 Iceberg 的 Spark 3.4
  • rest: Iceberg REST Catalog
  • minio: S3 兼容的对象存储
  • mc: MinIO 客户端(用于设置存储桶)
WARN[0000] /Users/eric/iceberg-quick-start/docker-compose.yml: the attribute `version` is obsolete, it will be ignored, please remove it to avoid potential confusion
[+] Running 5/5
✔ Network iceberg-quick-start_iceberg_net Created 0.0s
✔ Container iceberg-rest-test Started 0.4s
✔ Container minio Started 0.4s
✔ Container mc Started 0.6s
✔ Container spark-iceberg S... 0.7s

通过 Spark Shell 加载 TPC-H 数据

运行以下命令以生成示例 TPC-H 数据并将其加载到 Iceberg 表中:

docker exec spark-iceberg bash /home/iceberg/load_tpch.sh
Collecting duckdb
Downloading duckdb-1.2.2-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl (18.7 MB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 18.7/18.7 MB 5.8 MB/s eta 0:00:00
Requirement already satisfied: pyspark in /opt/spark/python (3.5.5)
Collecting py4j==0.10.9.7
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 200.5/200.5 kB 5.9 MB/s eta 0:00:00
Installing collected packages: py4j, duckdb
Successfully installed duckdb-1.2.2 py4j-0.10.9.7

[notice] A new release of pip is available: 23.0.1 -> 25.1.1
[notice] To update, run: pip install --upgrade pip
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/07 12:17:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/07 12:17:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
[2025-05-07 12:17:18] [INFO] Starting TPC-H data generation and loading process
[2025-05-07 12:17:18] [INFO] Configuration: Scale Factor=1, Data Dir=/home/iceberg/data/tpch_1
[2025-05-07 12:17:18] [INFO] Generating TPC-H data with DuckDB (Scale Factor: 1)
[2025-05-07 12:17:27] [INFO] Generated 8 Parquet files in /home/iceberg/data/tpch_1
[2025-05-07 12:17:28] [INFO] Loading data into Iceberg catalog
[2025-05-07 12:17:33] [INFO] Created Iceberg table: demo.tpch.part from part.parquet
[2025-05-07 12:17:33] [INFO] Created Iceberg table: demo.tpch.region from region.parquet
[2025-05-07 12:17:33] [INFO] Created Iceberg table: demo.tpch.supplier from supplier.parquet
[2025-05-07 12:17:35] [INFO] Created Iceberg table: demo.tpch.orders from orders.parquet
[2025-05-07 12:17:35] [INFO] Created Iceberg table: demo.tpch.nation from nation.parquet
[2025-05-07 12:17:40] [INFO] Created Iceberg table: demo.tpch.lineitem from lineitem.parquet
[2025-05-07 12:17:40] [INFO] Created Iceberg table: demo.tpch.partsupp from partsupp.parquet
[2025-05-07 12:17:41] [INFO] Created Iceberg table: demo.tpch.customer from customer.parquet
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
| tpch| customer| false|
| tpch| lineitem| false|
| tpch| nation| false|
| tpch| orders| false|
| tpch| part| false|
| tpch| partsupp| false|
| tpch| region| false|
| tpch| supplier| false|
+---------+---------+-----------+

[2025-05-07 12:17:42] [SUCCESS] TPCH data generation and loading completed successfully

在 Databend 中查询数据

加载 TPC-H 表后,您可以在 Databend 中查询数据:

  1. 在 Docker 中启动 Databend:
docker network create iceberg_net
docker run -d \
--name databend \
--network iceberg_net \
-p 3307:3307 \
-p 8000:8000 \
-p 8124:8124 \
-p 8900:8900 \
datafuselabs/databend
  1. 首先使用 BendSQL 连接到 Databend,然后创建一个 Iceberg catalog:
bendsql
Welcome to BendSQL 0.24.1-f1f7de0(2024-12-04T12:31:18.526234000Z).
Connecting to localhost:8000 as user root.
Connected to Databend Query v1.2.725-8d073f6b7a(rust-1.88.0-nightly-2025-04-21T11:49:03.577976082Z)
Loaded 1436 auto complete keywords from server.
Started web server at 127.0.0.1:8080
CREATE CATALOG iceberg TYPE = ICEBERG CONNECTION = (
TYPE = 'rest'
ADDRESS = 'http://host.docker.internal:8181'
warehouse = 's3://warehouse/wh/'
"s3.endpoint" = 'http://host.docker.internal:9000'
"s3.access-key-id" = 'admin'
"s3.secret-access-key" = 'password'
"s3.region" = 'us-east-1'
);
  1. 使用新创建的 catalog:
USE CATALOG iceberg;
  1. 显示可用的数据库:
SHOW DATABASES;
╭──────────────────────╮
│ databases_in_iceberg │
│ String │
├──────────────────────┤
│ tpch │
╰──────────────────────╯
  1. 运行示例查询以聚合 TPC-H 数据:
SELECT
l_returnflag,
l_linestatus,
SUM(l_quantity) AS sum_qty,
SUM(l_extendedprice) AS sum_base_price,
SUM(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
SUM(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
AVG(l_quantity) AS avg_qty,
AVG(l_extendedprice) AS avg_price,
AVG(l_discount) AS avg_disc,
COUNT(*) AS count_order
FROM
iceberg.tpch.lineitem
GROUP BY
l_returnflag,
l_linestatus
ORDER BY
l_returnflag,
l_linestatus;
┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ l_returnflag │ l_linestatus │ sum_qty │ sum_base_price │ sum_disc_price │ sum_charge │ avg_qty │ avg_price │ avg_disc │ count_order │
│ Nullable(String) │ Nullable(String) │ Nullable(Decimal(38, 2)) │ Nullable(Decimal(38, 2)) │ Nullable(Decimal(38, 4)) │ Nullable(Decimal(38, 6)) │ Nullable(Decimal(38, 8)) │ Nullable(Decimal(38, 8)) │ Nullable(Decimal(38, 8)) │ UInt64 │
├──────────────────┼──────────────────┼──────────────────────────┼──────────────────────────┼──────────────────────────┼──────────────────────────┼──────────────────────────┼──────────────────────────┼──────────────────────────┼─────────────┤
│ A │ F │ 37734107.0056586554400.7353758257134.870055909065222.82769225.5220058538273.129734620.049985301478493
│ N │ F │ 991417.001487504710.381413082168.05411469649223.19437525.5164719238284.467760850.0500934338854
│ N │ O │ 76633518.00114935210409.19109189591897.4720113561024263.01378225.5020196438248.015609060.050000263004998
│ R │ F │ 37719753.0056568041380.9053741292684.604055889619119.83193225.5057936138250.854626100.050009411478870
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

数据类型映射

下表映射了 Apache Iceberg 和 Databend 之间的数据类型。请注意,Databend 目前不支持表中未列出的 Iceberg 数据类型。

Apache IcebergDatabend
BOOLEANBOOLEAN
INTINT32
LONGINT64
DATEDATE
TIMESTAMP/TIMESTAMPZTIMESTAMP
FLOATFLOAT
DOUBLEDOUBLE
STRING/BINARYSTRING
DECIMALDECIMAL
ARRAY<TYPE>ARRAY, 支持嵌套
MAP<KEYTYPE, VALUETYPE>MAP
STRUCT<COL1: TYPE1, COL2: TYPE2, ...>TUPLE
LISTARRAY

管理 Catalogs

Databend 提供了以下命令来管理 catalogs:

CREATE CATALOG

在 Databend 查询引擎中定义并建立一个新的 catalog。

语法

CREATE CATALOG <catalog_name>
TYPE=ICEBERG
CONNECTION=(
TYPE='<connection_type>'
ADDRESS='<address>'
WAREHOUSE='<warehouse_location>'
"<connection_parameter>"='<connection_parameter_value>'
"<connection_parameter>"='<connection_parameter_value>'
...
);
参数是否必须?描述
<catalog_name>您要创建的 catalog 的名称。
TYPE指定 catalog 类型。对于 Iceberg,设置为 ICEBERG
CONNECTIONIceberg catalog 的连接参数。
TYPE (在 CONNECTION 中)连接类型。对于 Iceberg,通常设置为 rest 以进行基于 REST 的连接。
ADDRESSIceberg 服务的地址或 URL(例如,http://127.0.0.1:8181)。
WAREHOUSEIceberg 计算集群的位置,通常是 S3 bucket 或兼容的对象存储系统。
<connection_parameter>用于建立与外部存储连接的连接参数。所需的参数因特定的存储服务和身份验证方法而异。 有关可用参数的完整列表,请参见下表。
连接参数描述
s3.endpointS3 endpoint.
s3.access-key-idS3 access key ID.
s3.secret-access-keyS3 secret access key.
s3.session-tokenS3 session token,使用临时凭证时需要。
s3.regionS3 区域。
client.region用于 S3 客户端的区域,优先于 s3.region
s3.path-style-accessS3 Path Style Access.
s3.sse.typeS3 服务器端加密 (SSE) 类型。
s3.sse.keyS3 SSE 密钥。如果加密类型为 kms,则为 KMS 密钥 ID。如果加密类型为 custom,则为 base-64 AES256 对称密钥。
s3.sse.md5S3 SSE MD5 校验和。
client.assume-role.arn要承担的 IAM 角色的 ARN,而不是使用默认凭证链。
client.assume-role.external-id用于承担 IAM 角色的可选外部 ID。
client.assume-role.session-name用于承担 IAM 角色的可选会话名称。
s3.allow-anonymous允许匿名访问的选项(例如,对于公共存储桶/文件夹)。
s3.disable-ec2-metadata用于禁用从 EC2 元数据加载凭证的选项(通常与 s3.allow-anonymous 一起使用)。
s3.disable-config-load用于禁用从配置文件和环境变量加载配置的选项。

SHOW CREATE CATALOG

返回指定 catalog 的详细配置,包括其类型和存储参数。

语法

SHOW CREATE CATALOG <catalog_name>;

SHOW CATALOGS

显示所有已创建的 catalogs。

语法

SHOW CATALOGS [LIKE '<pattern>']

USE CATALOG

将当前会话切换到指定的 catalog。

语法

USE CATALOG <catalog_name>

缓存 Iceberg Catalog

Databend 提供了一个专门为 Iceberg catalogs 设计的 Catalog Metadata Cache。当第一次在 Iceberg 表上执行查询时,元数据会被缓存在内存中。默认情况下,此缓存保持有效 10 分钟,之后会异步刷新。这确保了对 Iceberg 表的查询更快,避免了重复的元数据检索。

如果需要最新的元数据,可以使用以下命令手动刷新缓存:

USE CATALOG iceberg;
ALTER DATABASE tpch REFRESH CACHE; -- 刷新 tpch 数据库的元数据缓存
ALTER TABLE tpch.lineitem REFRESH CACHE; -- 刷新 lineitem 表的元数据缓存

如果不想使用元数据缓存,可以通过在 databend-query.toml 配置文件中将 iceberg_table_meta_count 设置为 0 来完全禁用它:

...
# Cache config.
[cache]
...
iceberg_table_meta_count = 0
...

除了元数据缓存外,Databend 还支持 Iceberg catalog 表的表数据缓存,类似于 Fuse 表。有关数据缓存的更多信息,请参阅 Query Configurations 参考中的 [cache] Section

Iceberg Table Functions

Databend 提供了以下表函数来查询 Iceberg 元数据,允许用户有效地检查快照和清单:

使用示例

此示例展示了如何使用基于 REST 的连接创建 Iceberg catalog,指定服务地址、计算集群位置 (S3) 和可选参数,如 AWS 区域和自定义 endpoint:

CREATE CATALOG ctl
TYPE=ICEBERG
CONNECTION=(
TYPE='rest'
ADDRESS='http://127.0.0.1:8181'
WAREHOUSE='s3://iceberg-tpch'
"s3.region"='us-east-1'
"s3.endpoint"='http://127.0.0.1:9000'
);