跳到主要内容

使用 Flink CDC 从 MySQL 迁移

在本教程中,我们将引导你完成使用 Apache Flink CDC 从 MySQL 迁移到 Databend Cloud 的过程。

开始之前

在开始之前,请确保你已准备好以下先决条件:

  • 你的本地机器上安装了 Docker,因为它将用于启动 MySQL。
  • 你的本地机器上安装了 Java 8 或 11,这是 Flink Databend Connector 所必需的。
  • 你的本地机器上安装了 BendSQL。有关如何使用各种包管理器安装 BendSQL 的说明,请参阅 安装 BendSQL

步骤 1:在 Docker 中启动 MySQL

  1. 创建一个名为 mysql.cnf 的配置文件,内容如下,并将此文件保存在将映射到 MySQL 容器的本地目录中,例如 /Users/eric/Downloads/mysql.cnf
[mysqld]
# Basic settings
server-id=1
log-bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL
expire_logs_days=3

# Character set settings
character_set_server=utf8mb4
collation-server=utf8mb4_unicode_ci

# Authentication settings
default-authentication-plugin=mysql_native_password
  1. 在你的本地机器上启动一个 MySQL 容器。下面的命令启动一个名为 mysql-server 的 MySQL 容器,创建一个名为 mydb 的数据库,并将 root 密码设置为 root
docker run \
--platform linux/amd64 \
--name mysql-server \
-v /Users/eric/Downloads/mysql.cnf:/etc/mysql/conf.d/custom.cnf \
-e MYSQL_ROOT_PASSWORD=root \
-e MYSQL_DATABASE=mydb \
-e MYSQL_ROOT_HOST=% \
-p 3306:3306 \
-d mysql:5.7
  1. 验证 MySQL 是否正在运行:
docker ps

检查输出中是否有名为 mysql-server 的容器:

CONTAINER ID   IMAGE       COMMAND                  CREATED        STATUS        PORTS                               NAMES
aac4c28be56e mysql:5.7 "docker-entrypoint.s…" 17 hours ago Up 17 hours 0.0.0.0:3306->3306/tcp, 33060/tcp mysql-server

步骤 2:使用示例数据填充 MySQL

  1. 登录到 MySQL 容器,并在出现提示时输入密码 root
docker exec -it mysql-server mysql -u root -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 71
Server version: 5.7.44-log MySQL Community Server (GPL)

Copyright (c) 2000, 2023, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
  1. 切换到 mydb 数据库:
mysql> USE mydb;
Database changed
  1. 复制并粘贴以下 SQL 以创建一个名为 products 的表并插入数据:
CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));

ALTER TABLE products AUTO_INCREMENT = 10;

INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","black wind breaker"),
(default,"cloud","test for databend"),
(default,"spare tire","24 inch spare tire");
  1. 验证数据:
mysql> select * from products;
+----+--------------------+---------------------------------------------------------+
| id | name | description |
+----+--------------------+---------------------------------------------------------+
| 10 | scooter | Small 2-wheel scooter |
| 11 | car battery | 12V car battery |
| 12 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 |
| 13 | hammer | 12oz carpenter's hammer |
| 14 | hammer | 14oz carpenter's hammer |
| 15 | hammer | 16oz carpenter's hammer |
| 16 | rocks | box of assorted rocks |
| 17 | jacket | black wind breaker |
| 18 | cloud | test for databend |
| 19 | spare tire | 24 inch spare tire |
+----+--------------------+---------------------------------------------------------+
10 rows in set (0.01 sec)

步骤 3:在 Databend Cloud 中设置目标

  1. 使用 BendSQL 连接到 Databend Cloud。如果你不熟悉 BendSQL,请参阅本教程:使用 BendSQL 连接到 Databend Cloud

  2. 复制并粘贴以下 SQL 以创建一个名为 products 的目标表:

CREATE    TABLE products (
id INT NOT NULL,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
  1. 下载并解压 Flink 1.17.1:
curl -O https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
tar -xvzf flink-1.17.1-bin-scala_2.12.tgz
cd flink-1.17.1
  1. 将 Databend 和 MySQL 连接器下载到 lib 文件夹中:
curl -Lo lib/flink-connector-databend.jar https://github.com/databendcloud/flink-connector-databend/releases/latest/download/flink-connector-databend.jar

curl -Lo lib/flink-sql-connector-mysql-cdc-2.4.1.jar https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.1/flink-sql-connector-mysql-cdc-2.4.1.jar
  1. 打开 flink-1.17.1/conf/ 下的 flink-conf.yaml 文件,将 taskmanager.memory.process.size 更新为 4096m,然后保存该文件。
taskmanager.memory.process.size: 4096m
  1. 启动 Flink 集群:
./bin/start-cluster.sh

现在,如果你在浏览器中访问 http://localhost:8081,则可以打开 Apache Flink Dashboard:

Alt text

步骤 5:开始迁移

  1. 启动 Flink SQL Client:
./bin/sql-client.sh

你将看到 Flink SQL Client 启动横幅,确认客户端已成功启动。



▒▓██▓██▒ ▓████▒▒█▓▒▓███▓▒ ▓███▓░░ ▒▒▒▓██▒ ▒ ░██▒ ▒▒▓▓█▓▓▒░ ▒████ ██▒ ░▒▓███▒ ▒█▒█▒ ░▓█ ███ ▓░▒██ ▓█ ▒▒▒▒▒▓██▓░▒░▓▓█ █░ █ ▒▒░ ███▓▓█ ▒█▒▒▒ ████░ ▒▓█▓ ██▒▒▒ ▓███▒ ░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░ ▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒ ███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒ ░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒ ███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░ ██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓ ▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒ ▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒ ▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█ ██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █ ▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓ █▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓ ██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓ ▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒ ██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒ ▓█ ▒█▓ ░ █░ ▒█ █▓ █▓ ██ █░ ▓▓ ▒█▓▓▓▒█░ █▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█ ██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓ ▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██ ░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓ ░▓██▒ ▓░ ▒█▓█ ░░▒▒▒ ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░

______ _ _ _ _____ ____ _ _____ _ _ _ BETA | | () | | / |/ __ | | / | () | | | | | | _ __ | | __ | ( | | | | | | | | | ___ _ __ | | | | | | | ' | |/ / _ | | | | | | | | | |/ _ \ ' | | | | | | | | | | < ____) | || | |____ | || | | / | | | | || |||| ||_|_\ |/ ________| _||_|_|| ||__|

Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.


2. 将检查点间隔设置为 3 秒。

```bash
Flink SQL> SET execution.checkpointing.interval = 3s;
  1. 在 Flink SQL Client 中创建带有 MySQL 和 Databend 连接器的相应表(将占位符替换为您的实际值):
CREATE TABLE mysql_products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED)
WITH ('connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'mydb',
'table-name' = 'products',
'server-time-zone' = 'UTC'
);

CREATE TABLE databend_products (id INT,name String,description String, PRIMARY KEY (`id`) NOT ENFORCED)
WITH ('connector' = 'databend',
'url'='databend://cloudapp:{password}@{host}:443/{database}?warehouse={warehouse_name}&ssl=true',
'database-name'='{database}',
'table-name'='products',
'sink.batch-size' = '1',
'sink.flush-interval' = '1000',
'sink.ignore-delete' = 'false',
'sink.max-retries' = '3');
  1. 在 Flink SQL Client 中,将数据从 mysql_products 表同步到 databend_products 表:
Flink SQL> INSERT INTO databend_products SELECT * FROM mysql_products;
>
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 5b505d752b7c211cbdcb5566175b9182

现在您可以在 Apache Flink Dashboard 中看到一个正在运行的作业:

Alt text

一切就绪!如果您返回到 BendSQL 终端并查询 Databend Cloud 中的 products 表,您将看到 MySQL 中的数据已成功同步:

SELECT * FROM products;

┌──────────────────────────────────────────────────────────────────────────────────────┐
│ id │ name │ description │
│ Int32 │ String │ Nullable(String)
├───────┼────────────────────┼─────────────────────────────────────────────────────────┤
18 │ cloud │ test for databend │
19 │ spare tire │ 24 inch spare tire │
16 │ rocks │ box of assorted rocks │
17 │ jacket │ black wind breaker │
14 │ hammer │ 14oz carpenter's hammer │
│ 15 │ hammer │ 16oz carpenter's hammer │
1212-pack drill bits │ 12-pack of drill bits with sizes ranging from #40 to #3 │
13 │ hammer │ 12oz carpenter's hammer │
10 │ scooter │ Small 2-wheel scooter │
11 │ car battery │ 12V car battery │
└──────────────────────────────────────────────────────────────────────────────────────┘
  1. 返回到 MySQL 终端并插入一个新产品:
INSERT INTO products VALUES (default, "bicycle", "Lightweight road bicycle");

接下来,在 BendSQL 终端中,再次查询 products 表以验证新产品是否已同步:

SELECT * FROM products;
┌──────────────────────────────────────────────────────────────────────────────────────┐
│ id │ name │ description │
│ Int32 │ String │ Nullable(String) │
├───────┼────────────────────┼─────────────────────────────────────────────────────────┤
│ 12 │ 12-pack drill bits │ 12-pack of drill bits with sizes ranging from #40 to #3 │
│ 11 │ car battery │ 12V car battery │
│ 14 │ hammer │ 14oz carpenter's hammer │
│ 13 │ hammer │ 12oz carpenter's hammer │
│ 10 │ scooter │ Small 2-wheel scooter │
│ 20 │ bicycle │ Lightweight road bicycle │
│ 19 │ spare tire │ 24 inch spare tire │
│ 16 │ rocks │ box of assorted rocks │
│ 15 │ hammer │ 16oz carpenter's hammer │
│ 18 │ cloud │ test for databend │
│ 17 │ jacket │ black wind breaker │
└──────────────────────────────────────────────────────────────────────────────────────┘