跳到主要内容

使用 Stream 追踪与转换数据

Stream 是 Databend 用来记录行级变更的“增量表”。每次提交 INSERT/UPDATE/DELETE,Stream 都会缓存对应的最终状态,直到消费为止。本页沿用英语版的三个动手示例,帮助你快速体会两种模式以及增量计算的写法。

模式速览

模式捕获内容适用场景
Standard Stream(APPEND_ONLY = falseINSERT + UPDATE + DELETE,并在被消费前合并为每行的最新状态需要完整记录变更、可回放更新/删除
Append-Only Stream(默认,APPEND_ONLY = true只捕获 INSERT纯追加型事实/日志流水

Stream 不复制整张表,只保留“尚未消费的增量”。消费由谁触发、何时触发完全由你掌控。

示例 1:Append-Only Stream

步骤 1: 创建基表与 Stream

CREATE OR REPLACE TABLE sensor_readings (
sensor_id INT,
temperature DOUBLE
);

CREATE OR REPLACE STREAM sensor_readings_stream
ON TABLE sensor_readings; -- APPEND_ONLY 默认即为 true

步骤 2: 插入并查看增量

INSERT INTO sensor_readings VALUES (1, 21.5), (2, 19.7);

SELECT sensor_id, temperature, change$action, change$is_update
FROM sensor_readings_stream;
┌────────────┬───────────────┬───────────────┬──────────────────┐
│ sensor_id │ temperature │ change$action │ change$is_update │
├────────────┼───────────────┼───────────────┼──────────────────┤
│ 1 │ 21.5 │ INSERT │ false │
│ 2 │ 19.7 │ INSERT │ false │
└────────────┴───────────────┴───────────────┴──────────────────┘

步骤 3: 消费并写入目标表

CREATE OR REPLACE TABLE sensor_readings_latest AS
SELECT sensor_id, temperature
FROM sensor_readings_stream;

SELECT * FROM sensor_readings_stream; -- 已为空

示例 2:Standard Stream(含 UPDATE / DELETE)

步骤 1: 为同一张表建立 Standard Stream

CREATE OR REPLACE STREAM sensor_readings_stream_std
ON TABLE sensor_readings
APPEND_ONLY = false;

步骤 2: 执行 UPDATE / DELETE / INSERT 并比较两个 Stream

UPDATE sensor_readings SET temperature = 22 WHERE sensor_id = 1; -- 更新
DELETE FROM sensor_readings WHERE sensor_id = 2; -- 删除
INSERT INTO sensor_readings VALUES (3, 18.5); -- 新增

SELECT * FROM sensor_readings_stream; -- 仍为空(Append-Only Stream 模式忽略 UPDATE / DELETE)

SELECT sensor_id, temperature, change$action, change$is_update
FROM sensor_readings_stream_std
ORDER BY change$row_id;
┌────────────┬───────────────┬───────────────┬──────────────────┐
│ sensor_id │ temperature │ change$action │ change$is_update │
├────────────┼───────────────┼───────────────┼──────────────────┤
│ 1 │ 21.5 │ DELETE │ true │
│ 1 │ 22 │ INSERT │ true │
│ 2 │ 19.7 │ DELETE │ false │
│ 3 │ 18.5 │ INSERT │ false │
└────────────┴───────────────┴───────────────┴──────────────────┘

更新对应“删除旧值 + 写入新值”,独立的 DELETE/INSERT 则各自占一条,因而在消费前始终能拿到最终状态。

示例 3:增量 Join 与计算

多条 Stream 可以在需要时“一次性消费”。借助 WITH CONSUME,每次只处理尚未消费的增量,既适合批式作业也能容忍不同步到达的变更。

步骤 1: 创建基表与 Stream

CREATE OR REPLACE TABLE customers (
customer_id INT,
segment VARCHAR,
city VARCHAR
);

CREATE OR REPLACE TABLE orders (
order_id INT,
customer_id INT,
amount DOUBLE
);

CREATE OR REPLACE STREAM customers_stream ON TABLE customers;
CREATE OR REPLACE STREAM orders_stream ON TABLE orders;

步骤 2: 首批数据

INSERT INTO customers VALUES
(101, 'VIP', 'Shanghai'),
(102, 'Standard', 'Beijing'),
(103, 'VIP', 'Shenzhen');

INSERT INTO orders VALUES
(5001, 101, 199.0),
(5002, 101, 59.0),
(5003, 102, 89.0);

步骤 3: 第一次增量聚合

WITH
orders_delta AS (
SELECT customer_id, amount
FROM orders_stream WITH CONSUME
),
customers_delta AS (
SELECT customer_id, segment
FROM customers_stream WITH CONSUME
)
SELECT
o.customer_id,
c.segment,
SUM(o.amount) AS incremental_sales
FROM orders_delta AS o
JOIN customers_delta AS c
ON o.customer_id = c.customer_id
GROUP BY o.customer_id, c.segment
ORDER BY o.customer_id;
┌──────────────┬───────────┬────────────────────┐
│ customer_id │ segment │ incremental_sales │
├──────────────┼───────────┼────────────────────┤
│ 101 │ VIP │ 258.0 │
│ 102 │ Standard │ 89.0 │
└──────────────┴───────────┴────────────────────┘

步骤 4: 下一批到达后再次运行

INSERT INTO customers VALUES (104, 'Standard', 'Guangzhou');
INSERT INTO orders VALUES (5004, 101, 40.0), (5005, 104, 120.0);

WITH
orders_delta AS (
SELECT customer_id, amount
FROM orders_stream WITH CONSUME
),
customers_delta AS (
SELECT customer_id, segment
FROM customers_stream WITH CONSUME
)
SELECT
o.customer_id,
c.segment,
SUM(o.amount) AS incremental_sales
FROM orders_delta AS o
JOIN customers_delta AS c
ON o.customer_id = c.customer_id
GROUP BY o.customer_id, c.segment
ORDER BY o.customer_id;
┌──────────────┬───────────┬────────────────────┐
│ customer_id │ segment │ incremental_sales │
├──────────────┼───────────┼────────────────────┤
│ 101 │ VIP │ 40.0 │
│ 104 │ Standard │ 120.0 │
└──────────────┴───────────┴────────────────────┘

第二次运行自动只消费最新的增量,允许订单和客户信息在不同时间到达。

使用提示

消费语义

  • INSERT INTO target SELECT ... FROM stream 成功提交后才会清空 Stream,失败则回滚。
  • 同一条 Stream 同时只能被一个语句消费,其余会被回滚。

模式选择

  • Append-Only Stream 专注 INSERT,是事件、日志入湖的最佳拍档。
  • Standard Stream 能保留 UPDATE / DELETE 前后的最终状态,适合需要完整变更信息的场景。

隐藏列

  • 查询 Stream 时可使用 change$actionchange$is_updatechange$row_id 判断每条增量。
  • 基表上还有 _origin_version_origin_block_id_origin_block_row_num,方便排查“最终值从何而来”。

联动任务

  • 结合 Task 可以按计划自动消费 Stream,使用 task_history('<name>', <limit>) 查看执行记录。
  • SQL 中搭配 WITH CONSUME,即可在批式作业里只处理“本次新增”的数据。