Amazon SQS (S3) 集成任务 (Beta)
本页介绍如何创建 Amazon SQS (S3) 集成任务,消费 SQS 队列中的 S3 对象创建事件,并将对应对象数据写入云平台。
该任务面向 S3 事件驱动的数据接入场景:上游系统向 S3 写入对象后,S3 将 ObjectCreated 事件发送到 SQS,云平台通过 AssumeRole 消费 SQS 消息,并基于事件中的存储桶和对象 key 将数据写入云平台。
如需先创建可复用的 SQS (S3) 连接配置,请参见 Amazon SQS (S3) - IAM Role。
适用场景
- 基于 S3
ObjectCreated事件自动接入新写入的 S3 对象 - 使用 S3 事件通知驱动数据导入,降低新文件到达后的处理延迟
- 避免仅通过轮询 S3 路径发现新文件
工作流程
- 上游系统向 S3 存储桶写入对象。
- S3 Event Notification 将
ObjectCreated事件发送到 SQS 标准队列。 - 云平台通过用户配置的 IAM Role 从 SQS 队列读取消息。
- 任务解析消息中的 S3 事件记录。
- 任务根据 S3 事件记录中的存储桶、对象 key 和文件格式写入云平台目标表。
- 写入成功后,任务从 SQS 队列删除已处理消息。
S3 事件通知和 SQS 标准队列都可能产生重复消息。云平台会处理失败重试;如果业务需要严格去重,请结合对象信息、事件时间、sequencer 或 SQS 消息 ID 设计下游去重逻辑。
前置条件
在创建 SQS (S3) 集成任务前,请确保:
- 已创建 Amazon SQS (S3) - IAM Role 数据源
- S3 存储桶已配置
ObjectCreated事件通知,并将事件发送到目标 SQS 队列 - SQS 队列策略允许 Amazon S3 执行
sqs:SendMessage - 用户 IAM Role 允许云平台角色通过
sts:AssumeRole访问 - 用户 IAM Role 具有读取目标 S3 对象和消费目标 SQS 队列的权限
- SQS 队列中保存的是标准 S3 Event Notification 消息格式
- S3 notification 的存储桶、prefix 和 suffix 与数据源配置保持一致
创建 SQS (S3) 集成任务
步骤 1:基本信息
- 前往 Data > Data Integration,点击 Create Task。
- 选择一个 SQS (S3) 数据源,然后配置基本参数:
| 字段 | 是否必填 | 说明 |
|---|---|---|
| Data Source | 是 | 从下拉列表中选择已有的 Amazon SQS (S3) - IAM Role 数据源 |
| Name | 是 | 当前集成任务名称 |
| File Format | 是 | S3 对象的文件格式,例如 CSV、Parquet 或 NDJSON |
| Object Key Prefix | 否 | 仅处理指定前缀的对象事件,例如 raw/events/。应与数据源和 S3 notification filter 保持一致 |
| Object Key Suffix | 否 | 仅处理指定后缀的对象事件,例如 .json 或 .parquet。应与数据源和 S3 notification filter 保持一致 |
建议优先在 S3 Event Notification 中配置前缀或后缀过滤,并与数据源和任务中的过滤条件保持一致,减少进入 SQS 的无关消息数量。
步骤 2:预览数据
完成基本设置后,点击 Next 预览源数据。
预览结果与 Amazon S3 集成任务 一致。系统会根据 SQS (S3) 配置定位对应的 S3 对象,读取文件内容并展示:
- 包含列名和数据类型的示例数据
- 匹配到的 S3 对象列表及其大小
如果当前路径范围内没有可预览的 S3 对象,预览页可能无法展示样例数据。您可以先向目标 S3 路径上传一个匹配 prefix / suffix 的测试对象后再重试预览。
步骤 3:设置目标表
配置云平台中的目标位置:
| 字段 | 说明 |
|---|---|
| Warehouse | 选择用于运行 SQS (S3) 集成任务的云平台 Warehouse |
| Target Database | 选择云平台中的目标数据库 |
| Target Table | 写入数据的目标表名 |
系统会根据预览到的 S3 对象内容推断列名和数据类型。继续之前,您可以检查并编辑目标表结构;如果写入已有表,请从现有表中选择目标表并确认列映射无误。
点击 Create 完成集成任务创建。
任务行为
SQS (S3) 集成任务是持续运行任务。启动后,它会周期性从 SQS 队列读取消息并写入目标表,直到被手动停止。
| 场景 | 行为 |
|---|---|
| 队列中有消息 | 读取消息,解析 S3 事件记录,并按事件中的对象信息写入目标表 |
| 写入成功 | 删除对应 SQS 消息,避免重复处理 |
| 写入失败 | 不删除对应 SQS 消息,保留消息用于后续重试 |
| 消息格式不符合 S3 Event Notification | 记录错误,并跳过或停止处理 |
| 手动停止任务 | 任务停止轮询,并保存当前运行状态 |
与 Amazon S3 集成任务的区别
| 任务类型 | 处理对象 | 写入云平台的内容 | 典型用途 |
|---|---|---|---|
| Amazon S3 集成任务 | S3 文件内容 | CSV、Parquet 或 NDJSON 文件中的业务数据 | 文件数据导入 |
| Amazon SQS (S3) 集成任务 | SQS 中的 S3 ObjectCreated 事件 | 事件对应的 S3 对象数据 | 新对象自动接入、事件驱动导入 |
如果您的目标是定期扫描某个 S3 路径并导入文件内容,请使用 Amazon S3 集成任务。如果您的目标是基于 S3 ObjectCreated 事件触发接入,请使用 Amazon SQS (S3) 集成任务。

