使用 bend-ingest-kafka 从 Kafka 加载数据
在本教程中,我们将指导您使用 Docker 设置 Kafka 环境,并通过 bend-ingest-kafka 将消息从 Kafka 加载到 Databend Cloud。
步骤 1:设置 Kafka 环境
在端口 9092 上运行 Apache Kafka Docker 容器:
MacBook-Air:~ eric$ docker run -d \
> --name kafka \
> -p 9092:9092 \
> apache/kafka:latest
Unable to find image 'apache/kafka:latest' locally
latest: Pulling from apache/kafka
690e87867337: Pull complete
5dddb19fae62: Pull complete
86caa4220d9f: Pull complete
7802c028acb4: Pull complete
16a3d1421c02: Pull complete
ab648c7f18ee: Pull complete
a917a90b7df6: Pull complete
4e446fc89158: Pull complete
f800ce0fc22f: Pull complete
a2e5e46262c3: Pull complete
Digest: sha256:c89f315cff967322c5d2021434b32271393cb193aa7ec1d43e97341924e57069
Status: Downloaded newer image for apache/kafka:latest
0261b8f3d5fde74f5f20340b58cb85d29d9b40ee4f48f1df2c41a68b616d22dc
步骤 2:创建主题并生成消息
- 访问 Kafka 容器:
MacBook-Air:~ eric$ docker exec --workdir /opt/kafka/bin/ -it kafka sh
- 创建一个名为
test-topic
的新 Kafka 主题:
/opt/kafka/bin $ ./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic
Created topic test-topic.
- 使用 Kafka 控制台生产者向
test-topic
生成消息:
/opt/kafka/bin $ ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
- 输入 JSON 格式的消息:
{"id": 1, "name": "Alice", "age": 30}
{"id": 2, "name": "Bob", "age": 25}
- 完成后使用 Ctrl+C 停止生产者。
步骤 3:在 Databend Cloud 中创建表
在 Databend Cloud 中创建目标表:
CREATE DATABASE doc;
CREATE TABLE databend_topic (
id INT NOT NULL,
name VARCHAR NOT NULL,
age INT NOT NULL
) ENGINE=FUSE;
步骤 4:安装并运行 bend-ingest-kafka
- 通过运行以下命令安装 bend-ingest-kafka 工具:
go install github.com/databendcloud/bend-ingest-kafka@latest
- 运行以下命令将消息从
test-topic
Kafka 主题加载到 Databend Cloud 中的目标表:
MacBook-Air:~ eric$ bend-ingest-kafka \
> --kafka-bootstrap-servers="localhost:9092" \
> --kafka-topic="test-topic" \
> --databend-dsn="<your-dsn>" \
> --databend-table="doc.databend_topic" \
> --data-format="json"
INFO[0000] Starting worker worker-0
WARN[0072] Failed to read message from Kafka: context deadline exceeded kafka_batch_reader=ReadBatch
2024/08/20 15:10:15 ingest 2 rows (1.225576 rows/s), 75 bytes (45.959100 bytes/s)
- 使用 BendSQL 连接到 Databend Cloud 并验证数据是否已成功加载:
Welcome to BendSQL 0.19.2-1e338e1(2024-07-17T09:02:28.323121000Z).
Connecting to tn3ftqihs--eric.gw.aws-us-east-2.default.databend.com:443 with warehouse eric as user cloudapp
Connected to Databend Query v1.2.626-nightly-a055124b65(rust-1.81.0-nightly-2024-08-27T15:49:08.376336236Z)
cloudapp@(eric)/doc> SELECT * FROM databend_topic;
SELECT * FROM databend_topic
-[ RECORD 1 ]-----------------------------------
id: 1
name: Alice
age: 30
-[ RECORD 2 ]-----------------------------------
id: 2
name: Bob
age: 25
- 要在 RAW 模式下加载消息,只需运行以下命令:
bend-ingest-kafka \
--kafka-bootstrap-servers="localhost:9092" \
--kafka-topic="test-topic" \
--databend-dsn="<your-dsn>" \
--is-json-transform=false
您将在 doc
数据库中获得一个新表,包含以下行:
cloudapp@(eric)/doc> SELECT * FROM test_ingest;
SELECT * FROM test_ingest
-[ RECORD 1 ]-----------------------------------
uuid: 17f9e56e-19ba-4d42-88a0-e16b27815d04
koffset: 0
kpartition: 0
raw_data: {"age":30,"id":1,"name":"Alice"}
record_metadata: {"create_time":"2024-08-27T19:10:45.888Z","key":"","offset":0,"partition":0,"topic":"test-topic"}
add_time: 2024-08-27 19:12:55.081444
-[ RECORD 2 ]-----------------------------------
uuid: 0f57f71a-32ee-4df3-b75e-d123b9a91543
koffset: 1
kpartition: 0
raw_data: {"age":25,"id":2,"name":"Bob"}
record_metadata: {"create_time":"2024-08-27T19:10:52.946Z","key":"","offset":1,"partition":0,"topic":"test-topic"}
add_time: 2024-08-27 19:12:55.081470