跳到主要内容

Kafka

Apache Kafka is an open-source distributed event streaming platform that allows you to publish and subscribe to streams of records. It is designed to handle high-throughput, fault-tolerant, and real-time data feeds. Kafka enables seamless communication between various applications, making it an ideal choice for building data pipelines and streaming data processing applications.

Databend provides an efficient data ingestion tool (bend-ingest-kafka), specifically designed to load data from Kafka into Databend. With this tool, you can seamlessly transfer data from Kafka topics and insert it into your target Databend table, streamlining your data ingestion workflow.

Installing bend-ingest-kafka

To install the tool, make sure you have Go programming language installed on your computer, and then use the "go get" command as shown:

go get https://github.com/databendcloud/bend-ingest-kafka

Usage Example

This section assumes your data in Kafka appears as follows and explains how to load data into Databend using the tool bend-ingest-kafka.

{
"employee_id": 10,
"salary": 30000,
"rating": 4.8,
"name": "Eric",
"address": "123 King Street",
"skills": ["Java", "Python"],
"projects": ["Project A", "Project B"],
"hire_date": "2011-03-06",
"last_update": "2016-04-04 11:30:00"
}

Step 1. Create a Table in Databend

Before ingesting data, you need to create a table in Databend that matches the structure of your Kafka data.

CREATE TABLE employee_data (
employee_id Int64,
salary UInt64,
rating Float64,
name String,
address String,
skills Array(String),
projects Array(String),
hire_date Date,
last_update DateTime
);

Step 2. Run bend-ingest-kafka

Once the table is created, execute the bend-ingest-kafka command with the required parameters to initiate the data loading process. The command will start the data ingester, which continuously monitors your Kafka topic, consumes the data, and inserts it into the specified table in Databend.

bend-ingest-kafka \
--kafka-bootstrap-servers="127.0.0.1:9092,127.0.0.2:9092" \
--kafka-topic="Your Topic" \
--kafka-consumer-group="Consumer Group" \
--databend-dsn="http://root:root@127.0.0.1:8000" \
--databend-table="default.employee_data" \
--data-format="json" \
--batch-size=100000 \
--batch-max-interval=300s
ParameterDescription
--kafka-bootstrap-serversComma-separated list of Kafka bootstrap servers to connect to.
--kafka-topicThe Kafka topic from which the data will be ingested.
--kafka-consumer-groupThe consumer group for Kafka consumer to join.
--databend-dsnThe Data Source Name (DSN) to connect to Databend. Format: http(s)://username:password@host:port.
--databend-tableThe target Databend table where the data will be inserted.
--data-formatThe format of the data being ingested.
--batch-sizeThe number of records per batch during ingestion.
--batch-max-intervalThe maximum interval (in seconds) to wait before flushing a batch.
这篇文章对您有帮助吗?
Yes
No
开始使用 Databend Cloud
低成本
快速分析
多种数据源
弹性扩展
注册