通过canal+kafka将mysql数据导入StarRocks
背景
在支持客户中,我们发现有一些客户公司已经存在一些数据通道,不允许业务直接消费MySQL Binlog,所有的数据消费都是从Kafka中获取,所以写这篇文档分享下如何消费Kafka中canal格式的数据写到到starrocks,实现CDC。 数据流向Mysql Binlog-->Canal-->kafka-->Flink SQL-->StarRocks
环境准备
1. Canal
关于canal相关的配置这里就不赘述了,建议大家可以参考使用canal从mysql同步binlog导入StarRocks安装和配置下canal以及依赖环境2. Flink
这里我们需要利用flink sql完成数据的读取和写入,所以需要大家安装flink服务。以下介绍单机版Flink安装教程。(如果公司已有flink集群服务,可跳过这一部分)- 下载 Flink, 推荐使用1.13,最低支持版本1.11。
- 下载 Flink CDC connector,请注意下载对应Flink版本的Flink-MySQL-CDC。
- 下载 Flink StarRocks connector,请注意1.13版本和1.11/1.12版本使用不同的connector.
- 下载Flink SQL Kafka connector,请注意下载Flink对应版本的connector,我这里下载的1.13.3版本flink-sql-connector-kafka_2.12-1.13.3.jar
- 复制 flink-sql-connector-kafka_2.12-1.13.3.jar,flink-sql-connector-mysql-cdc-xxx.jar,flink-connector-starrocks-xxx.jar 到 flink-xxx/lib/
cd flink-xxx ./bin/start-cluster.sh
DDL
Kafka中数据样例{ "data":[ { "id":"2f2192e9-f8b5-4332-a96f-192b05c9e6bc", "agent_id":"16", "http_port":"8031", "rpc_port":"9020", "query_port":"8306", "edit_log_port":"9010", "meta_dir":"", "absolute_meta_dir":"/home/disk1/sr/data/sr/meta", "log_dir":"", "absolute_log_dir":"/home/disk1/sr/starrocks-manager-20211008/fe-2f2192e9-f8b5-4332-a96f-192b05c9e6bc/log", "role":"FOLLOWER", "install_path":"/home/disk1/sr/starrocks-manager-20211008", "absolute_migrate_path":"/home/disk1/sr/app/StarRocks/SE/StarRocks-1.18.3/fe", "deleted":"0", "deleted_at":"0", "created_at":"1633759183484", "updated_at":"1634240355691" } ], "database":"test", "es":1634240355000, "id":1076, "isDdl":false, "mysqlType":{ "id":"varchar(48)", "agent_id":"int(11)", "http_port":"int(11)", "rpc_port":"int(11)", "query_port":"int(11)", "edit_log_port":"int(11)", "meta_dir":"text", "absolute_meta_dir":"text", "log_dir":"text", "absolute_log_dir":"text", "role":"varchar(32)", "install_path":"text", "absolute_migrate_path":"text", "deleted":"tinyint(1)", "deleted_at":"bigint(20)", "created_at":"bigint(20)", "updated_at":"bigint(20)" }, "old":[ { "updated_at":"1634240295633" } ], "pkNames":[ "id" ], "sql":"", "sqlType":{ "id":12, "agent_id":4, "http_port":4, "rpc_port":4, "query_port":4, "edit_log_port":4, "meta_dir":2005, "absolute_meta_dir":2005, "log_dir":2005, "absolute_log_dir":2005, "role":12, "install_path":2005, "absolute_migrate_path":2005, "deleted":-7, "deleted_at":-5, "created_at":-5, "updated_at":-5 }, "table":"fe_instances", "ts":1634240355886, "type":"UPDATE" }
StarRocks
create database canaltest; CREATE TABLE IF NOT EXISTS `canaltest`.`canal_test_sink` ( `id` STRING NOT NULL, `agent_id` int(11) NULL, `http_port` int(11) NULL, `rpc_port` int(11) NULL, `query_port` int(11), `edit_log_port` int(11), `meta_dir` STRING, `absolute_meta_dir` STRING, `log_dir` STRING, `absolute_log_dir` STRING, `role` varchar(32), `install_path` STRING, `absolute_migrate_path` STRING, `deleted` tinyint(1), `deleted_at` bigint(20), `created_at` bigint(20), `updated_at` bigint(20) ) ENGINE=OLAP PRIMARY KEY(`id`) COMMENT "OLAP" DISTRIBUTED BY HASH(`id`) BUCKETS 3 PROPERTIES ( "replication_num" = "3", "in_memory" = "false", "storage_format" = "DEFAULT" );Flink SQL 可以写到文件flink-create.sql中
CREATE DATABASE IF NOT EXISTS `testdb`; CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_source` ( `id` STRING NOT NULL, `agent_id` int NULL, `http_port` int NULL, `rpc_port` int NULL, `query_port` int, `edit_log_port` int, `meta_dir` STRING, `absolute_meta_dir` STRING, `log_dir` STRING, `absolute_log_dir` STRING, `role` varchar, `install_path` STRING, `absolute_migrate_path` STRING, `deleted` tinyint, `deleted_at` bigint, `created_at` bigint, `updated_at` bigint ) with ( 'connector' = 'kafka', 'topic' = 'canal_test', #kafka topic名字 'properties.bootstrap.servers' = '$kafka_host:9092', #kafka主机名 'properties.group.id' = 'canal_group', #kafka消费组 'format' = 'canal-json' -- 使用 canal-json 格式 ); CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_sink` ( `id` STRING NOT NULL, `agent_id` int NULL, `http_port` int NULL, `rpc_port` int NULL, `query_port` int NULL, `edit_log_port` int, `meta_dir` STRING, `absolute_meta_dir` STRING, `log_dir` STRING, `absolute_log_dir` STRING, `role` STRING, `install_path` STRING, `absolute_migrate_path` STRING, `deleted` tinyint, `deleted_at` bigint, `created_at` bigint, `updated_at` bigint, PRIMARY KEY(`id`) NOT ENFORCED ) with ( 'load-url' = '$fe_host:8030', 'sink.properties.row_delimiter' = '\x02', 'username' = 'root', 'database-name' = 'canaltest', 'sink.properties.column_separator' = '\x01', 'jdbc-url' = 'jdbc:mysql://$fe_host:9030', 'password' = '', 'sink.buffer-flush.interval-ms' = '15000', 'connector' = 'starrocks', 'table-name' = 'canal_test_sink' #starrocks中的表名 ); INSERT INTO `testdb`.`canal_test_sink` SELECT * FROM `testdb`.`canal_test_source`;
启动任务测试
cd flink-xxx ./bin/sql-client.sh -f flink-create.sql#查看任务状态
./bin/flink list#输出如下图表示正常启动 Waiting for response... ------------------ Running/Restarting Jobs ------------------- 18.03.2022 09:48:34 : 4a2c5035ca292fef9691524c731122c2 : insert-into_default_catalog.test.canal_test_sink (RUNNING) -------------------------------------------------------------- No scheduled jobs. 确认数据是否已经导入starrocks中 select * from canaltest.canal_test_sink;
常见问题排查
1. Flink任务没有报错的时候
第一步:确认binlog是否开启,可以通过 SHOW VARIABLES LIKE 'log_bin'查看; 第二步:确认flink、flink-cdc、flink-starrocks-connector和mysql版本(MySQL版本为5.7和8.0.X)是否满足要求,flink、flink-cdc和flink-starrocks-connector的大版本需要一致,例如都是1.13版本 第三步:逐步判断是查源表还是写starrocks的问题,这里利用下面的sql文件演示一下,该文件是上面生成步骤生成的flink-create.sql 安装的Flink目录下执行下面语句进入flink-sql bin/sql-client.sh 首先验证读取source表是否正常 #分别把上面的sql粘贴进来判断是查询源表的问题还是写入到starrocks的问题CREATE DATABASE IF NOT EXISTS `testdb`; CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_source` ( `id` STRING NOT NULL, `agent_id` int NULL, `http_port` int NULL, `rpc_port` int NULL, `query_port` int, `edit_log_port` int, `meta_dir` STRING, `absolute_meta_dir` STRING, `log_dir` STRING, `absolute_log_dir` STRING, `role` varchar, `install_path` STRING, `absolute_migrate_path` STRING, `deleted` tinyint, `deleted_at` bigint, `created_at` bigint, `updated_at` bigint ) with ( 'connector' = 'kafka', 'topic' = 'canal_test', #kafka topic名字 'properties.bootstrap.servers' = '$kafka_host:9092', 'properties.group.id' = 'canal_group', 'format' = 'canal-json' -- 使用 canal-json 格式 );#验证source是否正常
select * from `testdb`.`canal_test_source`再验证写入starrocks是否正常
CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_sink` ( `id` STRING NOT NULL, `agent_id` int NULL, `http_port` int NULL, `rpc_port` int NULL, `query_port` int NULL, `edit_log_port` int, `meta_dir` STRING, `absolute_meta_dir` STRING, `log_dir` STRING, `absolute_log_dir` STRING, `role` STRING, `install_path` STRING, `absolute_migrate_path` STRING, `deleted` tinyint, `deleted_at` bigint, `created_at` bigint, `updated_at` bigint, PRIMARY KEY(`id`) NOT ENFORCED ) with ( 'load-url' = '$fe_host:8030', 'sink.properties.row_delimiter' = '\x02', 'username' = 'root', 'database-name' = 'canaltest', 'sink.properties.column_separator' = '\x01', 'jdbc-url' = 'jdbc:mysql://$fe_host:9030', 'password' = '', 'sink.buffer-flush.interval-ms' = '15000', 'connector' = 'starrocks', 'table-name' = 'canal_test_sink' ); INSERT INTO `testdb`.`canal_test_sink` SELECT * FROM `testdb`.`canal_test_source`;
2. Flink任务出错
第一步:确认flink集群是否有启动,可能有的同学本地下载的flink没有启动,需要./bin/start-cluster.sh启动下flink 第二步:根据具体的报错再具体分析