跳到主要内容

使用 Debezium 实时同步 MySQL

能力:CDC、全量
✅ 推荐:实时迁移并完整捕获变更

本教程将演示如何使用 Debezium 将 MySQL 数据同步到 Databend。请提前部署 Databend、MySQL 与 Debezium。

步骤 1:在 MySQL 中准备数据

创建数据库与表并插入示例数据:

CREATE DATABASE mydb;
USE mydb;

CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
ALTER TABLE products AUTO_INCREMENT = 10;

INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water-proof black wind breaker"),
(default,"cloud","test for databend"),
(default,"spare tire","24 inch spare tire");

步骤 2:在 Databend 中创建数据库

只需创建对应数据库即可,无需建表:

CREATE DATABASE debezium;

步骤 3:创建 application.properties

创建文件 application.properties 并启动 debezium-server-databend。安装与启动方法见 Installing debezium-server-databend

首次启动时会按配置的 Batch Size 对 MySQL 数据进行全量同步,成功后即可在 Databend 中看到这些数据。

application.properties
debezium.sink.type=databend
debezium.sink.databend.upsert=true
debezium.sink.databend.upsert-keep-deletes=false
debezium.sink.databend.database.databaseName=debezium
debezium.sink.databend.database.url=jdbc:databend://<your-databend-host>:<port>
debezium.sink.databend.database.username=<your-username>
debezium.sink.databend.database.password=<your-password>
debezium.sink.databend.database.primaryKey=id
debezium.sink.databend.database.tableName=products
debezium.sink.databend.database.param.ssl=true

# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# mysql source
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=60000

debezium.source.database.hostname=127.0.0.1
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=123456
debezium.source.database.dbname=mydb
debezium.source.database.server.name=from_mysql
debezium.source.include.schema.changes=false
debezium.source.table.include.list=mydb.products
# debezium.source.database.ssl.mode=required
# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN

完成配置后即可在 Databend 查询 products 表,验证 MySQL 数据是否同步。随后在 MySQL 中执行插入、更新或删除,也会实时体现在 Databend 中。

欢迎体验 Databend Cloud

基于 Rust + 对象存储构建的新一代多模态数仓,一个平台即可进行 BI、向量、全文检索及地理空间分析。

支持标准 SQL,自动弹性伸缩,助您快速构建现代化数据平台。

注册即领 ¥200 代金券。

注册体验