通过流跟踪和转换数据
Databend 中的流是表更改的动态和实时表示。创建流是为了捕获和跟踪对关联表的修改,从而可以持续使用和分析发生的数据更改。
Stream 的工作原理
流可以在两种模式下运行:Standard 和 Append-Only。使用 APPEND_ONLY
参数(默认为 true
)在 CREATE STREAM 时指定模式。
- Standard:捕获所有类型的数据更改,包括插入、更新和删除。
- Append-Only:在此模式下,流仅包含数据插入记录;不捕获数据更新或删除。
Databend 流的设计理念是专注于捕获数据的最终状态。例如,如果您插入一个值,然后多次更新它,则流仅保留该值在被使用之前的最新状态。以下示例说明了流的外观以及它在两种模式下的工作方式。
创建流以捕获更改
让我们首先创建两个表,然后为每个表创建具有不同模式的流,以捕获对表的更改。
-- 创建一个表并插入一个值
CREATE TABLE t_standard(a INT);
CREATE TABLE t_append_only(a INT);
-- 创建两个具有不同模式的流:Standard 和 Append_Only
CREATE STREAM s_standard ON TABLE t_standard APPEND_ONLY=false;
CREATE STREAM s_append_only ON TABLE t_append_only APPEND_ONLY=true;
您可以使用 SHOW FULL STREAMS 命令查看已创建的流及其模式:
SHOW FULL STREAMS;
┌─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ created_on │ name │ database │ catalog │ table_on │ owner │ comment │ mode │ invalid_reason │
├────────────────────────────┼───────────────┼──────────┼─────────┼───────────────────────┼──────────────────┼─────────┼─────────────┼────────────────┤
│ 2024-02-18 16:39:58.996763 │ s_append_only │ default │ default │ default.t_append_only │ NULL │ │ append_only │ │
│ 2024-02-18 16:39:58.966942 │ s_standard │ default │ default │ default.t_standard │ NULL │ │ standard │ │
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
现在,让我们向每个表中插入两个值,并观察流捕获的内容:
-- 插入两个新值
INSERT INTO t_standard VALUES(2), (3);
INSERT INTO t_append_only VALUES(2), (3);
SELECT * FROM s_standard;
┌────────────────────────────────────────────────────────────────────────────────────────────────┐
│ a │ change$action │ change$row_id │ change$is_update │
├─────────────────┼──────────────────┼────────────────────────────────────────┼──────────────────┤
│ 2 │ INSERT │ 8cd000827f8140d9921f897016e5a88e000000 │ false │
│ 3 │ INSERT │ 8cd000827f8140d9921f897016e5a88e000001 │ false │
└────────────────────────────────────────────────────────────────────────────────────────────────┘
SELECT * FROM s_append_only;
┌─────────────────────────────────────────────────────────────────────────────────────────────┐
│ a │ change$action │ change$is_update │ change$row_id │
├─────────────────┼───────────────┼──────────────────┼────────────────────────────────────────┤
│ 2 │ INSERT │ false │ 63dc9b84fe0a43528808c3304969b317000000 │
│ 3 │ INSERT │ false │ 63dc9b84fe0a43528808c3304969b317000001 │
└─────────────────────────────────────────────────────────────────────────────────────────────┘
以上结果表明,两个流都已成功捕获新的插入。有关结果中流列的详细信息,请参见 Stream Columns。现在,让我们更新然后删除一个新插入的值,并检查流的捕获中是否存在差异。
UPDATE t_standard SET a = 4 WHERE a = 2;
UPDATE t_append_only SET a = 4 WHERE a = 2;
SELECT * FROM s_standard;
┌────────────────────────────────────────────────────────────────────────────────────────────────┐
│ a │ change$action │ change$row_id │ change$is_update │
│ Nullable(Int32) │ Nullable(String) │ Nullable(String) │ Boolean │
├─────────────────┼──────────────────┼────────────────────────────────────────┼──────────────────┤
│ 4 │ INSERT │ 1dd5cab0b1b64328a112db89d602ca04000000 │ false │
│ 3 │ INSERT │ 1dd5cab0b1b64328a112db89d602ca04000001 │ false │
└────────────────────────────────────────────────────────────────────────────────────────────────┘
SELECT * FROM s_append_only;
┌─────────────────────────────────────────────────────────────────────────────────────────────┐
│ a │ change$action │ change$is_update │ change$row_id │
├─────────────────┼───────────────┼──────────────────┼────────────────────────────────────────┤
│ 4 │ INSERT │ false │ 63dc9b84fe0a43528808c3304969b317000000 │
│ 3 │ INSERT │ false │ 63dc9b84fe0a43528808c3304969b317000001 │
└─────────────────────────────────────────────────────────────────────────────────────────────┘
DELETE FROM t_standard WHERE a = 4;
DELETE FROM t_append_only WHERE a = 4;
SELECT * FROM s_standard;
┌────────────────────────────────────────────────────────────────────────────────────────────────┐
│ a │ change$action │ change$row_id │ change$is_update │
│ Nullable(Int32) │ Nullable(String) │ Nullable(String) │ Boolean │
├─────────────────┼──────────────────┼────────────────────────────────────────┼──────────────────┤
│ 3 │ INSERT │ 1dd5cab0b1b64328a112db89d602ca04000001 │ false │
└────────────────────────────────────────────────────────────────────────────────────────────────┘
SELECT * FROM s_append_only;
┌─────────────────────────────────────────────────────────────────────────────────────────────┐
│ a │ change$action │ change$is_update │ change$row_id │
├─────────────────┼───────────────┼──────────────────┼────────────────────────────────────────┤
│ 3 │ INSERT │ false │ bfed6c91f3e4402fa477b6853a2d2b58000001 │
└─────────────────────────────────────────────────────────────────────────────────────────────┘
到目前为止,由于我们尚未处理流,因此我们尚未注意到两种模式之间的任何显着差异。所有更改都已合并并体现为 INSERT 操作。流可以被任务、DML(数据操作语言)操作或带有 WITH CONSUME 或 WITH Stream Hints 的查询使用。使用后,流不包含任何数据,但可以继续捕获新的更改(如果有)。为了进一步分析差异,让我们继续使用流并检查输出。
使用流
让我们创建两个新表,并将流捕获的内容插入到其中。
CREATE TABLE t_consume_standard(b INT);
CREATE TABLE t_consume_append_only(b INT);
INSERT INTO t_consume_standard SELECT a FROM s_standard;
INSERT INTO t_consume_append_only SELECT a FROM s_append_only;
SELECT * FROM t_consume_standard;
┌─────────────────┐
│ b │
├─────────────────┤
│ 3 │
└─────────────────┘
SELECT * FROM t_consume_append_only;
┌─────────────────┐
│ b │
├─────────────────┤
│ 3 │
└─────────────────┘
如果您现在查询流,您会发现它们是空的,因为它们已被使用。
-- 空结果
SELECT * FROM s_standard;
-- 空结果
SELECT * FROM s_append_only;
捕获新更改
现在,让我们在每个表中将值从 3
更新为 4
,然后再次检查它们的流:
UPDATE t_standard SET a = 4 WHERE a = 3;
UPDATE t_append_only SET a = 4 WHERE a = 3;
SELECT * FROM s_standard;
┌────────────────────────────────────────────────────────────────────────────────────────────────┐
│ a │ change$action │ change$row_id │ change$is_update │
│ Nullable(Int32) │ Nullable(String) │ Nullable(String) │ Boolean │
├─────────────────┼──────────────────┼────────────────────────────────────────┼──────────────────┤
│ 3 │ DELETE │ 1dd5cab0b1b64328a112db89d602ca04000001 │ true │
│ 4 │ INSERT │ 1dd5cab0b1b64328a112db89d602ca04000001 │ true │
└────────────────────────────────────────────────────────────────────────────────────────────────┘
-- 空结果
SELECT * FROM s_append_only;
以上结果表明,Standard 流将 UPDATE 操作处理为两个操作的组合:一个 DELETE 操作,用于删除旧值(3
),以及一个 INSERT 操作,用于添加新值(4
)。当将 3
更新为 4
时,必须首先删除现有值 3
,因为它不再存在于最终状态中,然后插入新值 4
。此行为反映了 Standard 流如何仅捕获最终更改,将更新表示为同一行的删除(删除旧值)和插入(添加新值)的序列。
另一方面,Append_Only 流不捕获任何内容,因为它旨在仅记录新数据添加(INSERT),而忽略更新或删除。
如果现在删除值 4
,我们可以获得以下结果:
DELETE FROM t_standard WHERE a = 4;
DELETE FROM t_append_only WHERE a = 4;
SELECT * FROM s_standard;
┌────────────────────────────────────────────────────────────────────────────────────────────────┐
│ a │ change$action │ change$row_id │ change$is_update │
│ Nullable(Int32) │ Nullable(String) │ Nullable(String) │ Boolean │
├─────────────────┼──────────────────┼────────────────────────────────────────┼──────────────────┤
│ 3 │ DELETE │ 1dd5cab0b1b64328a112db89d602ca04000001 │ false │
└────────────────────────────────────────────────────────────────────────────────────────────────┘
-- 空结果
SELECT * FROM s_append_only;
我们可以看到,两种流模式都能够捕获插入,以及在使用流之前对插入值进行的任何后续更新和删除。但是,使用后,如果对先前插入的数据进行更新或删除,则只有 standard 流能够捕获这些更改,将其记录为 DELETE 和 INSERT 操作。
对流消费的事务支持
在 Databend 中,流消费在单语句事务中具有事务性。这意味着:
成功事务:如果事务已提交,则流会被消费。例如:
INSERT INTO table SELECT * FROM stream;
如果此 INSERT
事务提交,则流会被消费。
失败事务:如果事务失败,则流保持不变,可供将来消费。
并发访问:一次只有一个事务可以成功消费一个流。如果多个事务尝试消费同一个流,则只有第一个提交的事务成功,其他事务失败。
流的表元数据
流不存储表的任何数据。在为表创建流之后,Databend 会向该表引入特定的隐藏元数据列,以用于更改跟踪。这些列包括:
| 列 | 描述