FlinkCDC读取MySQL并写入Kafka案例(com.ververica)
该方法使用的是com.ververica版本的flink-connector-mysql-cdc,可以解决alibaba版本的以下两个问题:
1)可以有效避免锁表
2)当设置StartupOptions.latest()时做checkpoints可能出现的异常错误
因此不推荐使用alibaba的版本。
需要注意点,依赖的POM文件如下,标记为粗体的部分是需要注意的地方:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>gmall-flink-2021artifactId> <groupId>com.kinggroupId> <version>1.0-SNAPSHOTversion> parent> <modelVersion>4.0.0modelVersion> <artifactId>gmall-flink-cdc-ververicaartifactId> <version>1.0.0version> <properties> <java.version>1.8java.version> <maven.compiler.source>${java.version}maven.compiler.source> <maven.compiler.target>${java.version}maven.compiler.target> <flink.version>1.12.7flink.version> <scala.version>2.12scala.version> <hadoop.version>3.1.3hadoop.version> properties> <dependencies> <dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-javaartifactId> <version>${flink.version}version> dependency> <dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-table-api-scala_${scala.version}artifactId> <version>${flink.version}version> dependency> <dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-streaming-scala_${scala.version}artifactId> <version>${flink.version}version> dependency> <dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-connector-kafka_${scala.version}artifactId> <version>${flink.version}version> dependency> <dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-clients_${scala.version}artifactId> <version>${flink.version}version> dependency> <dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-cep_${scala.version}artifactId> <version>${flink.version}version> dependency> <dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-jsonartifactId> <version>${flink.version}version> dependency> <dependency> <groupId>com.alibabagroupId> <artifactId>fastjsonartifactId> <version>1.2.68version> dependency> <dependency> <groupId>org.apache.hadoopgroupId> <artifactId>hadoop-clientartifactId> <version>${hadoop.version}version> dependency> <dependency> <groupId>mysqlgroupId> <artifactId>mysql-connector-javaartifactId> <version>8.0.16version> dependency> <dependency> <groupId>org.apache.kafkagroupId> <artifactId>kafka-clientsartifactId> <version>2.7.0version> dependency> <dependency> <groupId>com.ververicagroupId> <artifactId>flink-connector-mysql-cdcartifactId> <version>2.1.1version> dependency> <dependency> <groupId>org.projectlombokgroupId> <artifactId>lombokartifactId> <version>1.18.20version> dependency> <dependency> <groupId>org.slf4jgroupId> <artifactId>slf4j-apiartifactId> <version>1.7.32version> dependency> <dependency> <groupId>org.slf4jgroupId> <artifactId>slf4j-log4j12artifactId> <version>1.7.32version> dependency> <dependency> <groupId>org.apache.logging.log4jgroupId> <artifactId>log4j-to-slf4jartifactId> <version>2.17.1version> dependency> dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.pluginsgroupId> <artifactId>maven-dependency-pluginartifactId> <version>3.2.0version> <executions> <execution> <id>copy-dependenciesid> <configuration> <outputDirectory>${project.build.directory}/bin/liboutputDirectory> <overWriteReleases>falseoverWriteReleases> <overWriteSnapshots>falseoverWriteSnapshots> <overWriteIfNewer>trueoverWriteIfNewer> configuration> execution> executions> plugin> <plugin> <groupId>org.apache.maven.pluginsgroupId> <artifactId>maven-jar-pluginartifactId> <configuration> <archive> <manifest> <addClasspath>trueaddClasspath> <classpathPrefix>lib/classpathPrefix> <mainClass>com.ApplicationmainClass> manifest> archive> <outputDirectory> ${project.build.directory}/bin outputDirectory> configuration> plugin> plugins> build> project>
这里直接上主程序:FlinkCdcWithVerverica
package com.king.app import com.king.config.{DBServerConstant, StateBackendConfig} import com.king.function.CustomerDeseriallization import com.king.util.MyKafkaUtil import com.ververica.cdc.connectors.mysql.MySqlSource import com.ververica.cdc.connectors.mysql.table.StartupOptions import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ /** * @Author: KingWang * @Date: 2022/1/15 * @Desc: **/ object FlinkCdcWithVerverica { def main(args: Array[String]): Unit = { //1. 获取执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //1.1 开启ck并指定状态后端fs env.setStateBackend(new FsStateBackend(StateBackendConfig.getFileCheckPointDir("cdc_ververica"))) env.enableCheckpointing(30000L) //头尾间隔:每5秒触发一次ck env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // env.getCheckpointConfig.setCheckpointTimeout(10000L) env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(10000l) //尾和头间隔时间3秒 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L)); //2. 通过flinkCDC构建SourceFunction并读取数据 val dbServer = DBServerConstant.mysql_gmall_flink() val sourceFunction = MySqlSource.builder[String]() .hostname(dbServer.hostname) .port(dbServer.port) .username(dbServer.username) .password(dbServer.password) .databaseList("gmall-210325-flink") //如果不添加该参数,则消费指定数据库中所有表的数据 //如果添加,则需要按照 数据库名.表名 的格式指定,多个表使用逗号隔开 .tableList("gmall-210325-flink.base_trademark") // .deserializer(new StringDebeziumDeserializationSchema()) .deserializer(new CustomerDeseriallization()) //监控的方式: // 1. initial 初始化全表拷贝,然后再比较 // 2. earliest 最早的 // 3. latest 指定最新的 // 4. specificOffset 指定offset // 3. timestamp 比指定的时间大的 .startupOptions(StartupOptions.latest()) .build() val dataStream = env.addSource(sourceFunction) //3. sink dataStream.print() dataStream.addSink(MyKafkaUtil.getKafkaProducer("test")) //4. 启动任务 env.execute("flink-cdc") } }
自定义的输出格式:CustomerDeseriallization
package com.king.function import com.alibaba.fastjson.JSONObject import com.ververica.cdc.debezium.DebeziumDeserializationSchema import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.util.Collector import org.apache.kafka.connect.data.{Schema, Struct} import org.apache.kafka.connect.source.SourceRecord /** * @Author: KingWang * @Date: 2021/12/29 * @Desc: **/ class CustomerDeseriallization extends DebeziumDeserializationSchema[String]{ /** * 封装的数据: * { * "database":"", * "tableName":"", * "type":"c r u d", * "before":"", * "after":"", * "ts": "" * * } * * @param sourceRecord * @param collector */ override def deserialize(sourceRecord: SourceRecord, collector: Collector[String]): Unit = { //1. 创建json对象用于保存最终数据 val result = new JSONObject() val value:Struct = sourceRecord.value().asInstanceOf[Struct] //2. 获取库名&表名 val source:Struct = value.getStruct("source") val database = source.getString("db") val table = source.getString("table") //3. 获取before val before = value.getStruct("before") val beforeObj = if(before != null) getJSONObjectBySchema(before.schema(),before) else new JSONObject() //4. 获取after val after = value.getStruct("after") val afterObj = if(after != null) getJSONObjectBySchema(after.schema(),after) else new JSONObject() //5. 获取操作类型 val op:String = value.getString("op") //6. 获取操作时间 val ts = source.getInt64("ts_ms") // val ts = value.getInt64("ts_ms") //7. 拼接结果 result.put("database", database) result.put("table", table) result.put("type", op) result.put("before", beforeObj) result.put("after", afterObj) result.put("ts", ts) collector.collect(result.toJSONString) } override def getProducedType: TypeInformation[String] = { BasicTypeInfo.STRING_TYPE_INFO } def getJSONObjectBySchema(schema:Schema,struct:Struct):JSONObject = { val fields = schema.fields() var jsonBean = new JSONObject() val iter = fields.iterator() while(iter.hasNext){ val field = iter.next() val key = field.name() val value = struct.get(field) jsonBean.put(key,value) } jsonBean } }
这里以设置每次从最新的开始读取,StartupOptions.latest() ,然后运行:
新增一条数据:
修改:
删除
到此,圆满结束。
通常情况下,执行步骤依次是:
1. 第一次初始化时,使用StartupOptions.initial(),将所有数据同步,
2. 再使用latest,取最新的记录,同时设置checkpoint检查点,以便于失败时,可以从检查点恢复。