Loading from Kafka with databend-kafka-connect
在本教程中,我们将使用 Kafka Connect sink connector 插件 databend-kafka-connect 建立 Confluent Cloud 中 Kafka 和 Databend Cloud 之间的连接。然后,我们将演示如何生成消息并将其加载到 Databend Cloud 中。
Step 1: Setting up Kafka Environment
在开始之前,请确保您的 Kafka 环境已在 Confluent Cloud 中正确设置。
-
注册一个免费的 Confluent Cloud 帐户。注册并创建帐户后,登录到您的 Confluent Cloud 帐户。
-
按照 Confluent Quick Start 在您的默认环境中创建并启动一个基本的 Kafka 集群。
-
按照 Install Confluent CLI 指南在您的本地机器上安装 Confluent CLI。安装完成后,登录到您的 Confluent Cloud 帐户以连接到 Confluent Cloud:
confluent login --save
- 使用 Confluent CLI 创建一个 API 密钥,并将其设置为活动的 API 密钥。
confluent kafka cluster list
Current | ID | Name | Type | Cloud | Region | Availability | Network | Status
----------+------------+-----------+-------+-------+-----------+--------------+---------+---------
* | lkc-jr57j2 | cluster_0 | BASIC | aws | us-east-2 | | | UP
confluent api-key create --resource lkc-jr57j2
It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+------------+------------------------------------------------------------------+
| API Key | <your-api-key> |
| API Secret | <your-api-secret> |
+------------+------------------------------------------------------------------+
confluent api-key use <your-api-key> --resource lkc-jr57j2
Step 2: Add Custom Connector Plugin
在此步骤中,您将 Kafka Connect sink connector 插件 databend-kafka-connect 上传到 Confluent Cloud。
-
从 GitHub repository 下载最新版本的 databend-kafka-connect。
-
在 Confluent Cloud 中,从导航菜单中,单击 Connectors > Add Connector > Add plugin。
-
填写插件详细信息,如下所示,然后上传 databend-kafka-connect 包。
Parameter | Description |
---|---|
Connector plugin name | 设置一个名称,例如 databend_plugin |
Custom plugin description | 描述插件,例如 Kafka Connect sink connector for Databend |
Connector class | com.databend.kafka.connect.DatabendSinkConnector |
Connector type | Sink |
Step 3: Create a Kafka Topic
在此步骤中,您将在 Confluent Cloud 中创建一个 Kafka topic。
-
在 Confluent Cloud 中,从导航菜单中,单击 Topics > Add topic。
-
设置 topic 名称,例如
databend_topic
,然后继续下一步。 -
选择 Create a schema for message values,然后单击 Create Schema。
- 在 Add new schema 页面上,选择 Avro 选项卡,然后复制以下 schema 并将其粘贴到编辑器中:
{
"doc": "Sample schema to help you get started.",
"fields": [
{
"doc": "The int type is a 32-bit signed integer.",
"name": "id",
"type": "int"
},
{
"doc": "The string is a unicode character sequence.",
"name": "name",
"type": "string"
},
{
"doc": "The string is a unicode character sequence.",
"name": "age",
"type": "int"
}
],
"name": "sampleRecord",
"type": "record"
}
Step 4: Add a Connector
在此步骤中,您将设置一个连接到 Databend Cloud 的 connector。
- 在 Confluent Cloud 中,从导航菜单中,单击 Connectors > Add Connector。搜索然后选择您上传的插件。
- 在 Kafka credentials 步骤中,选择 Use an existing API key,然后输入您使用 Confluent CLI 创建的 API 密钥和 secret。
- 在 Configuration 步骤中,选择 JSON 选项卡,然后复制以下配置并将其粘贴到编辑器中,将占位符替换为您的实际值:
{
"auto.create": "true",
"auto.evolve": "true",
"batch.size": "1",
"confluent.custom.schema.registry.auto": "true",
"connection.attempts": "3",
"connection.backoff.ms": "10000",
"connection.database": "<your-value>",
"connection.password": "<your-value>",
"connection.url": "jdbc:databend://<your-value>",
"connection.user": "cloudapp",
"errors.tolerance": "none",
"insert.mode": "upsert",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"max.retries": "10",
"pk.fields": "id",
"pk.mode": "record_value",
"table.name.format": "<your-value>.${topic}",
"topics": "databend_topic",
"value.converter": "io.confluent.connect.avro.AvroConverter"
}
-
在 Networking 步骤中,输入您的 Databend Cloud 计算集群 endpoint,例如
xxxxxxxxx--xxx.gw.aws-us-east-2.default.databend.com
。 -
在 Sizing 步骤中,将其设置为 1 task。
-
在 Review and launch 步骤中,设置一个名称,例如
databend_connector
。
Step 5: Produce Messages
在此步骤中,您将使用 Confluent CLI 生成消息,并验证它们是否已加载到 Databend Cloud 中。
- 在您的本地机器上,将用于创建 topic 的 schema 保存为 JSON 文件,例如
schema.json
。
{
"doc": "Sample schema to help you get started.",
"fields": [
{
"doc": "The int type is a 32-bit signed integer.",
"name": "id",
"type": "int"
},
{
"doc": "The string is a unicode character sequence.",
"name": "name",
"type": "string"
},
{
"doc": "The string is a unicode character sequence.",
"name": "age",
"type": "int"
}
],
"name": "sampleRecord",
"type": "record"
}
- 在 Confluent CLI 中,使用
confluent kafka topic produce <topic_name>
命令启动 Kafka producer,以将消息发送到您的 Kafka topic。
confluent kafka topic produce databend_topic --value-format avro --schema schema.json
Successfully registered schema with ID "100001".
Starting Kafka Producer. Use Ctrl-C or Ctrl-D to exit.
{"id":1, "name":"Alice", "age":30}
{"id":2, "name":"Bob", "age":25}
{"id":3, "name":"Charlie", "age":35}
- 在 Databend Cloud 中,验证数据是否已成功加载: