通过canal+kafka将mysql数据导入StarRocks
背景
在支持客户中,我们发现有一些客户公司已经存在一些数据通道,不允许业务直接消费MySQL Binlog,所有的数据消费都是从Kafka中获取,所以写这篇文档分享下如何消费Kafka中canal格式的数据写到到starrocks,实现CDC。 数据流向Mysql Binlog-->Canal-->kafka-->Flink SQL-->StarRocks
环境准备
1. Canal
关于canal相关的配置这里就不赘述了,建议大家可以参考使用canal从mysql同步binlog导入StarRocks安装和配置下canal以及依赖环境2. Flink
这里我们需要利用flink sql完成数据的读取和写入,所以需要大家安装flink服务。以下介绍单机版Flink安装教程。(如果公司已有flink集群服务,可跳过这一部分)- 下载 Flink, 推荐使用1.13,最低支持版本1.11。
 - 下载 Flink CDC connector,请注意下载对应Flink版本的Flink-MySQL-CDC。
 - 下载 Flink StarRocks connector,请注意1.13版本和1.11/1.12版本使用不同的connector.
 - 下载Flink SQL Kafka connector,请注意下载Flink对应版本的connector,我这里下载的1.13.3版本flink-sql-connector-kafka_2.12-1.13.3.jar
 - 复制 flink-sql-connector-kafka_2.12-1.13.3.jar,flink-sql-connector-mysql-cdc-xxx.jar,flink-connector-starrocks-xxx.jar 到 flink-xxx/lib/
 
cd flink-xxx ./bin/start-cluster.sh
DDL
Kafka中数据样例{
"data":[
{
"id":"2f2192e9-f8b5-4332-a96f-192b05c9e6bc",
"agent_id":"16",
"http_port":"8031",
"rpc_port":"9020",
"query_port":"8306",
"edit_log_port":"9010",
"meta_dir":"",
"absolute_meta_dir":"/home/disk1/sr/data/sr/meta",
"log_dir":"",
"absolute_log_dir":"/home/disk1/sr/starrocks-manager-20211008/fe-2f2192e9-f8b5-4332-a96f-192b05c9e6bc/log",
"role":"FOLLOWER",
"install_path":"/home/disk1/sr/starrocks-manager-20211008",
"absolute_migrate_path":"/home/disk1/sr/app/StarRocks/SE/StarRocks-1.18.3/fe",
"deleted":"0",
"deleted_at":"0",
"created_at":"1633759183484",
"updated_at":"1634240355691"
}
],
"database":"test",
"es":1634240355000,
"id":1076,
"isDdl":false,
"mysqlType":{
"id":"varchar(48)",
"agent_id":"int(11)",
"http_port":"int(11)",
"rpc_port":"int(11)",
"query_port":"int(11)",
"edit_log_port":"int(11)",
"meta_dir":"text",
"absolute_meta_dir":"text",
"log_dir":"text",
"absolute_log_dir":"text",
"role":"varchar(32)",
"install_path":"text",
"absolute_migrate_path":"text",
"deleted":"tinyint(1)",
"deleted_at":"bigint(20)",
"created_at":"bigint(20)",
"updated_at":"bigint(20)"
},
"old":[
{
"updated_at":"1634240295633"
}
],
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":12,
"agent_id":4,
"http_port":4,
"rpc_port":4,
"query_port":4,
"edit_log_port":4,
"meta_dir":2005,
"absolute_meta_dir":2005,
"log_dir":2005,
"absolute_log_dir":2005,
"role":12,
"install_path":2005,
"absolute_migrate_path":2005,
"deleted":-7,
"deleted_at":-5,
"created_at":-5,
"updated_at":-5
},
"table":"fe_instances",
"ts":1634240355886,
"type":"UPDATE"
}
StarRocks
create database canaltest; CREATE TABLE IF NOT EXISTS `canaltest`.`canal_test_sink` ( `id` STRING NOT NULL, `agent_id` int(11) NULL, `http_port` int(11) NULL, `rpc_port` int(11) NULL, `query_port` int(11), `edit_log_port` int(11), `meta_dir` STRING, `absolute_meta_dir` STRING, `log_dir` STRING, `absolute_log_dir` STRING, `role` varchar(32), `install_path` STRING, `absolute_migrate_path` STRING, `deleted` tinyint(1), `deleted_at` bigint(20), `created_at` bigint(20), `updated_at` bigint(20) ) ENGINE=OLAP PRIMARY KEY(`id`) COMMENT "OLAP" DISTRIBUTED BY HASH(`id`) BUCKETS 3 PROPERTIES ( "replication_num" = "3", "in_memory" = "false", "storage_format" = "DEFAULT" );Flink SQL 可以写到文件flink-create.sql中
CREATE DATABASE IF NOT EXISTS `testdb`; CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_source` ( `id` STRING NOT NULL, `agent_id` int NULL, `http_port` int NULL, `rpc_port` int NULL, `query_port` int, `edit_log_port` int, `meta_dir` STRING, `absolute_meta_dir` STRING, `log_dir` STRING, `absolute_log_dir` STRING, `role` varchar, `install_path` STRING, `absolute_migrate_path` STRING, `deleted` tinyint, `deleted_at` bigint, `created_at` bigint, `updated_at` bigint ) with ( 'connector' = 'kafka', 'topic' = 'canal_test', #kafka topic名字 'properties.bootstrap.servers' = '$kafka_host:9092', #kafka主机名 'properties.group.id' = 'canal_group', #kafka消费组 'format' = 'canal-json' -- 使用 canal-json 格式 ); CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_sink` ( `id` STRING NOT NULL, `agent_id` int NULL, `http_port` int NULL, `rpc_port` int NULL, `query_port` int NULL, `edit_log_port` int, `meta_dir` STRING, `absolute_meta_dir` STRING, `log_dir` STRING, `absolute_log_dir` STRING, `role` STRING, `install_path` STRING, `absolute_migrate_path` STRING, `deleted` tinyint, `deleted_at` bigint, `created_at` bigint, `updated_at` bigint, PRIMARY KEY(`id`) NOT ENFORCED ) with ( 'load-url' = '$fe_host:8030', 'sink.properties.row_delimiter' = '\x02', 'username' = 'root', 'database-name' = 'canaltest', 'sink.properties.column_separator' = '\x01', 'jdbc-url' = 'jdbc:mysql://$fe_host:9030', 'password' = '', 'sink.buffer-flush.interval-ms' = '15000', 'connector' = 'starrocks', 'table-name' = 'canal_test_sink' #starrocks中的表名 ); INSERT INTO `testdb`.`canal_test_sink` SELECT * FROM `testdb`.`canal_test_source`;
启动任务测试
cd flink-xxx ./bin/sql-client.sh -f flink-create.sql#查看任务状态
./bin/flink list#输出如下图表示正常启动 Waiting for response... ------------------ Running/Restarting Jobs ------------------- 18.03.2022 09:48:34 : 4a2c5035ca292fef9691524c731122c2 : insert-into_default_catalog.test.canal_test_sink (RUNNING) -------------------------------------------------------------- No scheduled jobs. 确认数据是否已经导入starrocks中 select * from canaltest.canal_test_sink;
常见问题排查
1. Flink任务没有报错的时候
第一步:确认binlog是否开启,可以通过 SHOW VARIABLES LIKE 'log_bin'查看; 第二步:确认flink、flink-cdc、flink-starrocks-connector和mysql版本(MySQL版本为5.7和8.0.X)是否满足要求,flink、flink-cdc和flink-starrocks-connector的大版本需要一致,例如都是1.13版本 第三步:逐步判断是查源表还是写starrocks的问题,这里利用下面的sql文件演示一下,该文件是上面生成步骤生成的flink-create.sql 安装的Flink目录下执行下面语句进入flink-sql bin/sql-client.sh 首先验证读取source表是否正常 #分别把上面的sql粘贴进来判断是查询源表的问题还是写入到starrocks的问题CREATE DATABASE IF NOT EXISTS `testdb`; CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_source` ( `id` STRING NOT NULL, `agent_id` int NULL, `http_port` int NULL, `rpc_port` int NULL, `query_port` int, `edit_log_port` int, `meta_dir` STRING, `absolute_meta_dir` STRING, `log_dir` STRING, `absolute_log_dir` STRING, `role` varchar, `install_path` STRING, `absolute_migrate_path` STRING, `deleted` tinyint, `deleted_at` bigint, `created_at` bigint, `updated_at` bigint ) with ( 'connector' = 'kafka', 'topic' = 'canal_test', #kafka topic名字 'properties.bootstrap.servers' = '$kafka_host:9092', 'properties.group.id' = 'canal_group', 'format' = 'canal-json' -- 使用 canal-json 格式 );#验证source是否正常
select * from `testdb`.`canal_test_source`再验证写入starrocks是否正常
CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_sink` ( `id` STRING NOT NULL, `agent_id` int NULL, `http_port` int NULL, `rpc_port` int NULL, `query_port` int NULL, `edit_log_port` int, `meta_dir` STRING, `absolute_meta_dir` STRING, `log_dir` STRING, `absolute_log_dir` STRING, `role` STRING, `install_path` STRING, `absolute_migrate_path` STRING, `deleted` tinyint, `deleted_at` bigint, `created_at` bigint, `updated_at` bigint, PRIMARY KEY(`id`) NOT ENFORCED ) with ( 'load-url' = '$fe_host:8030', 'sink.properties.row_delimiter' = '\x02', 'username' = 'root', 'database-name' = 'canaltest', 'sink.properties.column_separator' = '\x01', 'jdbc-url' = 'jdbc:mysql://$fe_host:9030', 'password' = '', 'sink.buffer-flush.interval-ms' = '15000', 'connector' = 'starrocks', 'table-name' = 'canal_test_sink' ); INSERT INTO `testdb`.`canal_test_sink` SELECT * FROM `testdb`.`canal_test_source`;
2. Flink任务出错
第一步:确认flink集群是否有启动,可能有的同学本地下载的flink没有启动,需要./bin/start-cluster.sh启动下flink 第二步:根据具体的报错再具体分析