使用 databend-kafka-connect 从 Kafka 加载数据
在本教程中,我们将使用 Kafka Connect sink connector 插件 databend-kafka-connect 建立 Confluent Cloud 中的 Kafka 与 Databend Cloud 之间的连接。然后,我们将演示如何生成消息并将其加载到 Databend Cloud 中。
步骤 1:设置 Kafka 环境
在开始之前,请确保您的 Kafka 环境已在 Confluent Cloud 中正确设置。
注册一个免费的 Confluent Cloud 账户。注册并创建账户后,登录 到您的 Confluent Cloud 账户。
按照 Confluent 快速入门 创建并启动默认环境中的基本 Kafka 集群。
按照 安装 Confluent CLI 指南在本地机器上安装 Confluent CLI。安装后,登录到您的 Confluent Cloud 账户以连接到 Confluent Cloud:
confluent login --save
- 使用 Confluent CLI 创建一个 API 密钥,并将其设置为活动 API 密钥。
confluent kafka cluster list
Current | ID | Name | Type | Cloud | Region | Availability | Network | Status
* | lkc-jr57j2 | cluster_0 | BASIC | aws | us-east-2 | | | UP
confluent api-key create --resource lkc-jr57j2
It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
| API Key | <your-api-key> |
| API Secret | <your-api-secret> |
confluent api-key use <your-api-key> --resource lkc-jr57j2
步骤 2:添加自定义连接器插件
在这一步中,您将上传 Kafka Connect sink connector 插件 databend-kafka-connect 到 Confluent Cloud。
从 GitHub 仓库 下载最新版本的 databend-kafka-connect。
在 Confluent Cloud 中,从导航菜单点击 Connectors > Add Connector > Add plugin。
填写插件详情如下,然后上传 databend-kafka-connect 包。
参数 | 描述 |
Connector plugin name | 设置一个名称,例如 databend_plugin |
Custom plugin description | 描述插件,例如 Kafka Connect sink connector for Databend |
Connector class | com.databend.kafka.connect.DatabendSinkConnector |
Connector type | Sink |
步骤 3:创建 Kafka 主题
在这一步中,您将在 Confluent Cloud 中创建一个 Kafka 主题。
在 Confluent Cloud 中,从导航菜单点击 Topics > Add topic。
,然后继续下一步。 -
选择 Create a schema for message values,然后点击 Create Schema。
- 在 Add new schema 页面,选择 Avro 标签,然后将以下模式复制并粘贴到编辑器中:
"doc": "Sample schema to help you get started.",
"fields": [
"doc": "The int type is a 32-bit signed integer.",
"name": "id",
"type": "int"
"doc": "The string is a unicode character sequence.",
"name": "name",
"type": "string"
"doc": "The string is a unicode character sequence.",
"name": "age",
"type": "int"
"name": "sampleRecord",
"type": "record"
步骤 4:添加连接器
在这一步中,您将设置一个连接到 Databend Cloud 的连接器。
- 在 Confluent Cloud 中,从导航菜单点击 Connectors > Add Connector。搜索并选择您上传的插件。
- 在 Kafka credentials 步骤中,选择 Use an existing API key,然后输入您使用 Confluent CLI 创建的 API 密钥和密钥。
- 在 Configuration 步骤中,选择 JSON 标签,然后将以下配置复制并粘贴到编辑器中,用您的实际值替换占位符:
"auto.create": "true",
"auto.evolve": "true",
"batch.size": "1",
"confluent.custom.schema.registry.auto": "true",
"connection.attempts": "3",
"connection.backoff.ms": "10000",
"connection.database": "<your-value>",
"connection.password": "<your-value>",
"connection.url": "jdbc:databend://<your-value>",
"connection.user": "cloudapp",
"errors.tolerance": "none",
"insert.mode": "upsert",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"max.retries": "10",
"pk.fields": "id",
"pk.mode": "record_value",
"table.name.format": "<your-value>.${topic}",
"topics": "databend_topic",
"value.converter": "io.confluent.connect.avro.AvroConverter"
在 Networking 步骤中,输入您的 Databend Cloud 计算集群端点,例如
。 -
在 Sizing 步骤中,设置为 1 task。
在 Review and launch 步骤中,设置一个名称,例如
步骤 5:生成消息
在这一步中,您将使用 Confluent CLI 生成消息,并验证它们是否已加载到 Databend Cloud 中。
- 在本地机器上,将用于创建主题的模式保存为 JSON 文件,例如
"doc": "Sample schema to help you get started.",
"fields": [
"doc": "The int type is a 32-bit signed integer.",
"name": "id",
"type": "int"
"doc": "The string is a unicode character sequence.",
"name": "name",
"type": "string"
"doc": "The string is a unicode character sequence.",
"name": "age",
"type": "int"
"name": "sampleRecord",
"type": "record"
- 在 Confluent CLI 中,使用
confluent kafka topic produce <topic_name>
命令启动 Kafka 生产者,向您的 Kafka 主题发送消息。
confluent kafka topic produce databend_topic --value-format avro --schema schema.json
Successfully registered schema with ID "100001".
Starting Kafka Producer. Use Ctrl-C or Ctrl-D to exit.
{"id":1, "name":"Alice", "age":30}
{"id":2, "name":"Bob", "age":25}
{"id":3, "name":"Charlie", "age":35}
- 在 Databend Cloud 中,验证数据是否已成功加载: