跳到主要内容

湖仓一体 ETL(Lakehouse ETL)

场景(Scenario): EverDrive Smart Vision 的数据工程团队将每次路测批次导出为 Parquet 文件,以便统一工作负载在 Databend 内加载、查询并丰富同一份遥测数据。

EverDrive 的摄取循环非常简单:

对象存储导出(例如 Parquet)→ Stage → COPY INTO →(可选)Stream & Task

调整桶路径/凭据(如格式不同,把 Parquet 换成实际格式),然后粘贴下方命令。所有语法均与官方加载数据指南一致。


1. Stage

EverDrive 的数据工程团队每批次导出四个文件——sessions、frame events、detection payloads(含嵌套 JSON 字段)和 frame embeddings——到 S3 桶。本指南以 Parquet 为例,只需修改 FILE_FORMAT 即可接入 CSV、JSON 或其他支持的格式。一次性创建命名连接,后续所有 Stage 复用。

CREATE OR REPLACE CONNECTION everdrive_s3
STORAGE_TYPE = 's3'
ACCESS_KEY_ID = '<AWS_ACCESS_KEY_ID>'
SECRET_ACCESS_KEY = '<AWS_SECRET_ACCESS_KEY>';

CREATE OR REPLACE STAGE drive_stage
URL = 's3://everdrive-lakehouse/raw/'
CONNECTION = (CONNECTION_NAME = 'everdrive_s3')
FILE_FORMAT = (TYPE = 'PARQUET');

更多选项见创建 Stage

列出导出文件夹(本示例为 Parquet)确认可见:

LIST @drive_stage/sessions/;
LIST @drive_stage/frame-events/;
LIST @drive_stage/payloads/;
LIST @drive_stage/embeddings/;

2. Preview

加载前先查看 Parquet 文件,验证 schema 并抽样。

SELECT *
FROM @drive_stage/sessions/session_2024_08_16.parquet
LIMIT 5;

SELECT *
FROM @drive_stage/frame-events/frame_events_2024_08_16.parquet
LIMIT 5;

按需对 payloads 与 embeddings 重复预览。Databend 会自动使用 Stage 上指定的文件格式。


3. COPY INTO

将各文件加载到指南用到的表中。通过内联类型转换把输入列映射到表列;下方投影以 Parquet 为例,其他格式同理。

Sessions

COPY INTO drive_sessions (session_id, vehicle_id, route_name, start_time, end_time, weather, camera_setup)
FROM (
SELECT session_id::STRING,
vehicle_id::STRING,
route_name::STRING,
start_time::TIMESTAMP,
end_time::TIMESTAMP,
weather::STRING,
camera_setup::STRING
FROM @drive_stage/sessions/
)
FILE_FORMAT = (TYPE = 'PARQUET');

Frame Events

COPY INTO frame_events (frame_id, session_id, frame_index, captured_at, event_type, risk_score)
FROM (
SELECT frame_id::STRING,
session_id::STRING,
frame_index::INT,
captured_at::TIMESTAMP,
event_type::STRING,
risk_score::DOUBLE
FROM @drive_stage/frame-events/
)
FILE_FORMAT = (TYPE = 'PARQUET');

Detection Payloads

payload 文件含嵌套列(payload 列为 JSON 对象)。用相同投影复制到 frame_payloads 表。

COPY INTO frame_payloads (frame_id, run_stage, payload, logged_at)
FROM (
SELECT frame_id::STRING,
run_stage::STRING,
payload,
logged_at::TIMESTAMP
FROM @drive_stage/payloads/
)
FILE_FORMAT = (TYPE = 'PARQUET');

Frame Embeddings

COPY INTO frame_embeddings (frame_id, session_id, embedding, model_version, created_at)
FROM (
SELECT frame_id::STRING,
session_id::STRING,
embedding::VECTOR(4), -- 将 4 替换为实际嵌入维度
model_version::STRING,
created_at::TIMESTAMP
FROM @drive_stage/embeddings/
)
FILE_FORMAT = (TYPE = 'PARQUET');

下游所有指南(分析/搜索/向量/地理)均可看到本批次数据。


4. Stream(可选)

若希望下游作业在每次 COPY INTO 后感知新行,可在关键表(如 frame_events)上创建 Stream。用法参考持续 Pipeline → Stream

CREATE OR REPLACE STREAM frame_events_stream ON TABLE frame_events;

SELECT * FROM frame_events_stream; -- 显示上次消费后的新行

处理完毕后执行 CONSUME STREAM frame_events_stream;(或将行插入另一表)以推进偏移。


5. Task(可选)

Task 按调度执行一条 SQL 语句。可为每张表创建小 Task(或调用存储过程作为统一入口)。

CREATE OR REPLACE TASK task_load_sessions
WAREHOUSE = 'default'
SCHEDULE = 5 MINUTE
AS
COPY INTO drive_sessions (session_id, vehicle_id, route_name, start_time, end_time, weather, camera_setup)
FROM (
SELECT session_id::STRING,
vehicle_id::STRING,
route_name::STRING,
start_time::TIMESTAMP,
end_time::TIMESTAMP,
weather::STRING,
camera_setup::STRING
FROM @drive_stage/sessions/
)
FILE_FORMAT = (TYPE = 'PARQUET');

ALTER TASK task_load_sessions RESUME;

CREATE OR REPLACE TASK task_load_frame_events
WAREHOUSE = 'default'
SCHEDULE = 5 MINUTE
AS
COPY INTO frame_events (frame_id, session_id, frame_index, captured_at, event_type, risk_score)
FROM (
SELECT frame_id::STRING,
session_id::STRING,
frame_index::INT,
captured_at::TIMESTAMP,
event_type::STRING,
risk_score::DOUBLE
FROM @drive_stage/frame-events/
)
FILE_FORMAT = (TYPE = 'PARQUET');

ALTER TASK task_load_frame_events RESUME;

-- 对 frame_payloads 与 frame_embeddings 重复即可

cron 语法、依赖设置与错误处理见持续 Pipeline → Task