使用StarRocks内置工具Routine Load同步Mysql/TIDB/PG等增量更新数据到StarRocks
什么是StarRocks?
StarRocks是新一代极速统一的olap新型mpp分析型数据库,全面向量化引擎,全新的CBO优化器,性能强悍,单表查询媲美业界最强悍的clickhouse,支持多表join,支持数据秒级更新;
且同时支持高并发,架构简单,方便运维扩展,完全国产,安全可控,在国内外各行各业已经得到了广泛使用。
StarRocks提供了丰富的数据接入方式:stream load,routine load,broker load,spark load等,对接比如本地文件,对象存储,hdfs,数据库,kafka,还可以使用flink cdc方式同步数据到starrocks,
也支持开源工具比如datax,seatunnel等,也定制了flink connector source/sink 到starrocks。具体可以参考官网文档:https://docs.starrocks.com/zh-cn/main/loading/Loading_intro
本文示例如何通过Routine load工具通过kafka将TP类型的增量数据方便快捷同步到StarRocks中(除了下文使用到的方法,也可以使用flink cdc借助flink sql同步)
Routine Load原理:
导入流程如上图:
- 用户通过支持MySQL协议的客户端向 FE 提交一个Kafka导入任务。
- FE将一个导入任务拆分成若干个Task,每个Task负责导入指定的一部分数据。
- 每个Task被分配到指定的 BE 上执行。在 BE 上,一个 Task 被视为一个普通的导入任务, 通过 Stream Load 的导入机制进行导入。
- BE导入完成后,向 FE 汇报。
- FE 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。
- FE 会不断的产生新的 Task,来完成数据不间断的导入。
实验环境: Mysql +Canal + Kafka + StarRocks
测试步骤:
1.开启mysql binlog
确认mysql binlog已经开启:
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=2 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
也可以在mysql中通过show variables like '%xxx%'方式确认相关配置已经开启;
2.配置好canal环境,使数据sink到kakfa
配置两个文件,conf/canal.properties, conf/exmaple/instance.properties,启动canal;
3.准备好StarRocks集群
方便测试一个节点就可以,生产环境推荐至少3台服务器以上,分布式部署,多副本,保障数据的不丢失以及服务的高可用;
4.建好kafka topic
kafka中数据格式:
{"data":[{"id":"401","k1":"st","v1":"401"}],"database":"gong","es":1648790506000,"id":19,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648790506948,"type":"INSERT"} {"data":[{"id":"401","k1":"st","v1":"401"}],"database":"gong","es":1648790577000,"id":20,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648790577916,"type":"DELETE"} {"data":[{"id":"402","k1":"st","v1":"402"}],"database":"gong","es":1648790789000,"id":21,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648790797431,"type":"INSERT"} {"data":[{"id":"402","k1":"st","v1":"402"}],"database":"gong","es":1648790832000,"id":22,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648790832760,"type":"DELETE"} {"data":[{"id":"403","k1":"st","v1":"403"}],"database":"gong","es":1648791354000,"id":23,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648791354904,"type":"INSERT"} {"data":[{"id":"403","k1":"st","v1":"403"}],"database":"gong","es":1648791385000,"id":24,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648791395247,"type":"DELETE"}
可以看到在mysql binlog输出到kafka中的json数据,后面都会有一个type字段,类型为insert,update or delete,StarRocks正是通过去解析这个字段类型,来做后续在内部的添加,更新,删除数据。
5.建好routine load job
create routine load gong.cdc0401 on cdc_0401 columns(id,k1,v1,temp,__op=(CASE temp WHEN "DELETE" THEN 1 ELSE 0 END)) PROPERTIES ( "format"="json", "jsonpaths"="[\"$.data[0].id\",\"$.data[0].k1\",\"$.data[0].v1\",\"$.type\"]", "desired_concurrent_number"="1", "max_error_number"="1000", "max_batch_interval"="5", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list"= "cs01:9092,cs02:9092,cs03:9092", "kafka_topic" = "gong_test", "kafka_partitions"="0", "kafka_offsets"="OFFSET_BEGINNING" );
需要注意的是,columns和jsonpath部分较容易弄错,参考StarRocks 论坛文章:https://forum.starrocks.com/t/topic/851
6.验证:测试,在mysql中insert,update,delete数据,是否同步到starrocks
mysql中插入数据,再删除:
MariaDB [gong]> insert into gong_cdc values(98777777,"987",9888888); Query OK, 1 row affected (0.00 sec) MariaDB [gong]> delete from gong_cdc where id = 98777777; Query OK, 1 row affected (0.00 sec)
mysql> select * from cdc_0401; +----------+-----------+---------+ | id | k1 | v1 | +----------+-----------+---------+ | 321 | 3321 | 321 | | 444 | main | 1 | | 666 | starrocks | 666 | | 777 | 777 | 777 | | 888 | 777 | 888 | | 987 | 987 | 987 | | 10086 | cheng | 1 | | 11111 | sr | 1 | | 30003 | sr | 30 | | 88888 | 88888 | 8888 | | 100002 | march | 1 | | 100003 | gong | 1 | | 200000 | cheng | 1 | | 98777777 | 987 | 9888888 | +----------+-----------+---------+ 14 rows in set (0.01 sec)
mysql> show routine load\G; *************************** 1. row *************************** Id: 10252 Name: cdc0401 CreateTime: 2022-04-01 17:01:15 PauseTime: NULL EndTime: NULL DbName: default_cluster:gong TableName: cdc_0401 State: RUNNING DataSourceType: KAFKA CurrentTaskNum: 1 JobProperties: {"partitions":"*","columnToColumnExpr":"id,k1,v1,temp,__op=(CASE `temp` WHEN 'DELETE' THEN 1 ELSE 0 END)","maxBatchIntervalS":"5","whereExpr":"*","dataFormat":"json","timezone":"Asia/Shanghai","format":"json","json_root":"","strict_mode":"false","jsonpaths":"[\"$.data[0].id\",\"$.data[0].k1\",\"$.data[0].v1\",\"$.type\"]","desireTaskConcurrentNum":"1","maxErrorNum":"100000","strip_outer_array":"false","currentTaskConcurrentNum":"1","maxBatchRows":"200000"} DataSourceProperties: {"topic":"gong_test","currentKafkaPartitions":"0","brokerList":"cs01:9092,cs02:9092,cs03:9092"} CustomProperties: {} Statistic: {"receivedBytes":1787751,"errorRows":3001,"committedTaskNum":3,"loadedRows":41,"loadRowsRate":0,"abortedTaskNum":0,"totalRows":3042,"unselectedRows":0,"receivedBytesRate":198000,"taskExecuteTimeMs":9028} Progress: {"0":"3041"} ReasonOfStateChanged: ErrorLogUrls: http://172.26.194.184:29122/api/_load_error_log?file=__shard_5/error_log_insert_stmt_5752f798-7efa-47d8-b7ba-fcbc08dcfad5_5752f7987efa47d8_b7bafcbc08dcfad5 OtherMsg: 1 row in set (0.00 sec) ERROR: No query specified
mysql> select * from cdc_0401; +--------+-----------+------+ | id | k1 | v1 | +--------+-----------+------+ | 321 | 3321 | 321 | | 444 | main | 1 | | 666 | starrocks | 666 | | 777 | 777 | 777 | | 888 | 777 | 888 | | 987 | 987 | 987 | | 10086 | cheng | 1 | | 11111 | sr | 1 | | 30003 | sr | 30 | | 88888 | 88888 | 8888 | | 100002 | march | 1 | | 100003 | gong | 1 | | 200000 | cheng | 1 | +--------+-----------+------+ 13 rows in set (0.00 sec)
查看starrocks的数据,确实先进来,后删除了,同时也可以随时查看routie load job运行状况,确保任务没有异常,这样子数据才能同步进来;
7.测试过程碰到的问题
问题一:创建routine load方式如下
create routine load gong.cdc0401 on cdc_0401 columns(id,k1,v1) PROPERTIES ( "format"="json", "desired_concurrent_number"="1", "max_error_number"="100", "max_batch_interval"="5", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list"= "cs01:9092,cs02:9092,cs03:9092", "kafka_topic" = "gong_test", "kafka_partitions"="0", "kafka_offsets"="OFFSET_BEGINNING" );
发现不管如何测试,在mysql中的insert和update都可以同步到starrocks中,一度以为starrocks官网写得增量同步,只是同步新增或变更的数据,删除不了;
参考StarRoks论坛文档链接:https://docs.starrocks.com/zh-cn/main/loading/Json_loading
(当前官网在这一块讲叙得不是很详细;)
问题二:__op字段配置的问题
1. columns(id,k1,v1,temp,__op=(CASE temp WHEN "DELETE" THEN 1 ELSE 0 END)) 2. "jsonpaths"="[\"$.data[0].id\",\"$.data[0].k1\",\"$.data[0].v1\",\"$.type\"]",这两处配置是需要格外注意的地方,可以参考官网论坛链接:https://forum.starrocks.com/t/topic/851
字段配置错误,报错:
Reason: column count mismatch, expect=4 real=1. src line: [{"data":[{"id":"88888","k1":"88888","v1":"8888"}],"database":"gong","es":1648798201000,"id":30,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648798212928,"type":"INSERT"}];
3.由于数据质量等问题引起的null情况,需要配置参数"max_error_number"="100",可以配置为一个较大的值,否则routine load任务会paused
4.在建routine load任务时候,对应字段反引号``引起来了,会报错id取值为空,其他字段取到的值也都为空情况,定位了很久,应该是当前routine load作业的一个小bug,版本2.1.x:
需要将反引号``符号去掉: