使用 bend-ingest-kafka 从 Kafka 加载数据
在本教程中,我们将指导您使用 Docker 设置 Kafka 环境,并使用 bend-ingest-kafka 将消息从 Kafka 加载到 Databend Cloud。
步骤 1:设置 Kafka 环境
在端口 9092 上运行 Apache Kafka Docker 容器:
MacBook-Air:~ eric$ docker run -d \
> --name kafka \
> -p 9092:9092 \
> apache/kafka:latest
Unable to find image 'apache/kafka:latest' locally
latest: Pulling from apache/kafka
690e87867337: Pull complete
5dddb19fae62: Pull complete
86caa4220d9f: Pull complete
7802c028acb4: Pull complete
16a3d1421c02: Pull complete
ab648c7f18ee: Pull complete
a917a90b7df6: Pull complete
4e446fc89158: Pull complete
f800ce0fc22f: Pull complete
a2e5e46262c3: Pull complete
Digest: sha256:c89f315cff967322c5d2021434b32271393cb193aa7ec1d43e97341924e57069
Status: Downloaded newer image for apache/kafka:latest
0261b8f3d5fde74f5f20340b58cb85d29d9b40ee4f48f1df2c41a68b616d22dc
步骤 2:创建 Topic 并生产消息
- 访问 Kafka 容器:
MacBook-Air:~ eric$ docker exec --workdir /opt/kafka/bin/ -it kafka sh
- 创建一个名为
test-topic
的新 Kafka topic:
/opt/kafka/bin $ ./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic
Created topic test-topic.
- 使用 Kafka 控制台生产者将消息发送到 test-topic:
/opt/kafka/bin $ ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
- 输入 JSON 格式的消息:
{"id": 1, "name": "Alice", "age": 30}
{"id": 2, "name": "Bob", "age": 25}
- 完成后,使用 Ctrl+C 停止生产者。
步骤 3:在 Databend Cloud 中创建表
在 Databend Cloud 中创建目标表:
CREATE DATABASE doc;
CREATE TABLE databend_topic (
id INT NOT NULL,
name VARCHAR NOT NULL,
age INT NOT NULL
) ENGINE=FUSE;
步骤 4:安装并运行 bend-ingest-kafka
- 运行以下命令安装 bend-ingest-kafka 工具:
go install github.com/databendcloud/bend-ingest-kafka@latest