使用任务自动化数据加载
任务封装了特定的 SQL 语句,这些语句旨在按预定的时间间隔执行、由特定事件触发,或作为更广泛任务序列的一部分执行。在 Databend Cloud 中,任务通常用于定期从流中捕获数据变化,例如新增的记录,然后将这些数据与指定的目标位置同步。此外,任务还支持 Webhook 和其他消息系统,便于在需要时发送错误消息和通知。
创建任务
本主题分解了在 Databend Cloud 中创建任务的步骤。在 Databend Cloud 中,您使用 CREATE TASK 命令创建任务。创建任务时,请按照下图设计工作流程:
-
为任务设置一个名称。
-
指定一个计算集群来运行任务。要创建计算集群,请参阅 使用计算集群。
-
确定如何触发任务运行。
- 您可以通过指定分钟或秒的间隔,或使用带有可选时区的 CRON 表达式来安排任务运行,以实现更精确的调度。
示例:
-- 此任务每 2 分钟运行一次
CREATE TASK mytask
WAREHOUSE = 'default'
// highlight-next-line
SCHEDULE = 2 MINUTE
AS ...
-- 此任务每天在亚洲/东京时区的午夜(本地时间)运行
CREATE TASK mytask
WAREHOUSE = 'default'
// highlight-next-line
SCHEDULE = USING CRON '0 0 0 * * *' 'Asia/Tokyo'
AS ...
- 或者,您可以在任务之间建立依赖关系,将任务设置为 有向无环图 中的子任务。
示例:
-- 此任务依赖于 DAG 中 'task_root' 任务的完成
CREATE TASK mytask
WAREHOUSE = 'default'
// highlight-next-line
AFTER task_root
AS ...
- 指定任务执行的条件,允许您根据布尔表达式选择性地控制任务执行。
示例:
-- 此任务每 2 分钟运行一次,仅在 'mystream' 包含数据变化时执行 AS 后的 SQL
CREATE TASK mytask
WAREHOUSE = 'default'
SCHEDULE = 2 MINUTE
// highlight-next-line
WHEN STREAM_STATUS('mystream') = TRUE
AS ...
- 指定任务出错时的处理方式 ,包括设置连续失败次数以暂停任务,并指定错误通知的通知集成。有关设置错误通知的更多信息,请参阅 配置通知集成。
示例:
-- 此任务在连续失败 3 次后将暂停
CREATE TASK mytask
WAREHOUSE = 'default'
// highlight-next-line
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
AS ...
-- 此任务将使用 'my_webhook' 集成进行错误通知。
CREATE TASK mytask
WAREHOUSE = 'default'
// highlight-next-line
ERROR_INTEGRATION = 'my_webhook'
AS ...
- 指定任务将执行的 SQL 语句。
示例:
-- 此任务每年更新 'employees' 表中的 'age' 列,将其增加 1。
CREATE TASK mytask
WAREHOUSE = 'default'
SCHEDULE = USING CRON '0 0 1 1 * *' 'UTC'
// highlight-next-line
AS
UPDATE employees
SET age = age + 1;