Hudi,Hive Sync,实现湖仓一体操作
一、将Hudi数据同步到Hive
1)需要将编译好的hudi-hadoop-mr-bundle-0.10.0.jar,放到对应的环境中,../CDH/jars 和 ../CDH/lib/hive/lib下面,具体步骤可以参考Flink1.3.1+Hudi0.10初探
cd /app/hudi-0.10.0/packaging/hudi-hadoop-mr-bundle/target
cp hudi-hadoop-mr-bundle-0.10.0.jar /opt/cloudera/parcels/CDH/lib/hive/lib
cp hudi-hadoop-mr-bundle-0.10.0.jar /opt/cloudera/parcels/CDH/jars
# 在hive的辅助jar,auxlib目录下也需要放相应的jar
cp hudi-hadoop-mr-bundle-0.10.0.jar /usr/local/src/hook/hive
2)测试数据
uuid,name,addr,phone,update_date,bir_date 1,逝去的青春,上海市宝山区,183****1111,20200805,20020101 2,葬爱,上海市虹口区,183****2222,20200805,20020101 3,罙罙の回憶,上海市虹口区,183****3333,20200805,20020101 4,忘了天空的颜色,上海市虹口区,183****4444,20200805,20020101 5,李彦龙,上海市松江区,183****5555,20200801,20010101 6,李浩鹏,上海市松江区,183****6666,20200801,20010101 7,李天一,上海市松江区,183****7777,20200801,20010101 8,李朵雯,上海市松江区,183****8888,20200801,20010101 9,李雨杭,上海市松江区,183****9999,20200801,20010101 10,王满,杭州市西湖区,153****0000,20200802,20000101 11,王琳,杭州市西湖区,153****1111,20200802,20000101 12,王昕,杭州市西湖区,153****2222,20200802,20000101 13,贾一一,杭州市西湖区,153****3333,20200802,20000101 14,石浩,西安市莲湖区,137****4444,20200803,19970101 15,石子彤,西安市莲湖区,137****5555,20200803,19970101 16,许放炮的,西安市莲湖区,137****6666,20200803,19970101
3)pom.xml:这里我的hudi-spark-bundle_2.11已经引入到工程中,maven里没有写
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0modelVersion> <groupId>com.shydowgroupId> <artifactId>spark-hudi-tutorialartifactId> <version>1.0-SNAPSHOTversion> <dependencies> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-core_2.11artifactId> <version>2.4.5version> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-sql_2.11artifactId> <version>2.4.5version> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-hive_2.11artifactId> <version>2.4.5version> dependency> <dependency> <groupId>org.scala-langgroupId> <artifactId>scala-libraryartifactId> <version>2.11.12version> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-avro_2.11artifactId> <version>2.4.5version> dependency> <dependency> <groupId>org.apache.hivegroupId> <artifactId>hive-jdbcartifactId> <version>2.1.1version> dependency> <dependency> <groupId>mysqlgroupId> <artifactId>mysql-connector-javaartifactId> <version>8.0.25version> dependency> dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.pluginsgroupId> <artifactId>maven-compiler-pluginartifactId> <version>3.5.1version> <configuration> <source>1.8source> <target>1.8target> configuration> plugin> <plugin> <groupId>net.alchim31.mavengroupId> <artifactId>scala-maven-pluginartifactId> <version>3.2.2version> <executions> <execution> <goals> <goal>compilegoal> <goal>testCompilegoal> goals> <configuration> <args> <arg>-dependencyfilearg> <arg>${project.build.directory}/.scala_dependenciesarg> args> configuration> execution> executions> plugin> <plugin> <groupId>org.apache.maven.pluginsgroupId> <artifactId>maven-assembly-pluginartifactId> <version>2.6version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependenciesdescriptorRef> descriptorRefs> configuration> <executions> <execution> <id>make-assembleid> <phase>packagephase> <goals> <goal>singlegoal> goals> execution> executions> plugin> plugins> build> project>
二、在使用spark2.4.0-cdh6.2.1查询同步的hive表时,存在错误,因为先前修改源码为了写入数据,建议升级spark为2.4.3以上(以下实践在spark2.4.5上进行)
注:如果只是将上游数据写入hudi,同时同步到hive中,如果后续spark不读取生成rt或者ro表,使用2.4.0版本是没问题的,但如果需要使用spark继续对生成的hive进行处理,建议升级版本2.4.3以上,并将hive-site.xml拷贝到spark/conf下
1)写入数据
package com.shydow.Hudi import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieIndexConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.index.HoodieIndex import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} /** * @author Shydow * @date 2021/12/26 0:35 * @desc hudi同步数据到hive */ object HudiSyncHiveTest { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .master("local[*]") .appName("insert") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ /* 插入数据,同时同步写hive */ insert(spark, "file:///D:\\IdeaProjects\\spark-hudi-tutorial\\data\\test.csv") /* 读hive中同步的表 */ spark.read.table("default.member_rt").where("part=20200801").show() /* 读hudi表 */ spark.read.format("hudi") .load("/workspace/hudi/hive") .show() spark.close() } def insert(spark: SparkSession, path: String) = { import org.apache.spark.sql.functions._ val timestamp: String = System.currentTimeMillis().toString // 生成提交时间 val frame: DataFrame = spark.read.option("header", "true") .csv(path) val insertDF: DataFrame = frame.withColumn("ts", lit(timestamp)) .withColumn("part", col("update_date")) Class.forName("org.apache.hive.jdbc.HiveDriver") insertDF.write.format("hudi") .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) //选择表的类型 到底是MERGE_ON_READ 还是 COPY_ON_WRITE .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uuid") //设置主键 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") //数据更新时间戳的 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "part") //hudi分区列 .option("hoodie.table.name", "member") //hudi表名 "hoodie.table.name" .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://hadoop001:10000") //hiveserver2地址 .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "default") //设置hudi与hive同步的数据库 .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "member") //设置hudi与hive同步的表名 .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "part") //hive表同步的分区列 .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName) // 分区提取器 按/ 提取分区 .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") //设置数据集注册并同步到hive .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") //设置当分区变更时,当前数据的分区目录是否变更 .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name()) //设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM .option("hoodie.insert.shuffle.parallelism", "12") .option("hoodie.upsert.shuffle.parallelism", "12") .mode(SaveMode.Overwrite) .save("/workspace/hudi/hive") } }
spark-submit提交任务:
./bin/spark-submit --master yarn --driver-memory 1g --num-executors 2 --executor-memory 2g --executor-cores 2 --jars ./jars/hudi-spark-bundle_2.11-0.10.0.jar --class com.shydow.Launcher ./workspace/spark-hudi-tutorial-1.0-SNAPSHOT-jar-with-dependencies.jar
hive中生成的ro与rt表结构:
CREATE EXTERNAL TABLE `user_info_ro`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `uuid` string, `name` string, `addr` string, `phone` string, `update_date` string, `bir_date` string, `ts` string) PARTITIONED BY ( `part` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' WITH SERDEPROPERTIES ( 'hoodie.query.as.ro.table'='true', 'path'='/workspace/launcher2') STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://hadoop001:8020/workspace/launcher2' TBLPROPERTIES ( 'last_commit_time_sync'='20211227235552794', 'spark.sql.sources.provider'='hudi', 'spark.sql.sources.schema.numPartCols'='1', 'spark.sql.sources.schema.numParts'='1', 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"uuid","type":"string","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"addr","type":"string","nullable":true,"metadata":{}},{"name":"phone","type":"string","nullable":true,"metadata":{}},{"name":"update_date","type":"string","nullable":true,"metadata":{}},{"name":"bir_date","type":"string","nullable":true,"metadata":{}},{"name":"ts","type":"string","nullable":false,"metadata":{}},{"name":"part","type":"string","nullable":true,"metadata":{}}]}', 'spark.sql.sources.schema.partCol.0'='part', 'transient_lastDdlTime'='1640620564')
CREATE EXTERNAL TABLE `user_info_rt`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `uuid` string, `name` string, `addr` string, `phone` string, `update_date` string, `bir_date` string, `ts` string) PARTITIONED BY ( `part` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' WITH SERDEPROPERTIES ( 'hoodie.query.as.ro.table'='false', 'path'='/workspace/launcher2') STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://hadoop001:8020/workspace/launcher2' TBLPROPERTIES ( 'last_commit_time_sync'='20211227235552794', 'spark.sql.sources.provider'='hudi', 'spark.sql.sources.schema.numPartCols'='1', 'spark.sql.sources.schema.numParts'='1', 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"uuid","type":"string","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"addr","type":"string","nullable":true,"metadata":{}},{"name":"phone","type":"string","nullable":true,"metadata":{}},{"name":"update_date","type":"string","nullable":true,"metadata":{}},{"name":"bir_date","type":"string","nullable":true,"metadata":{}},{"name":"ts","type":"string","nullable":false,"metadata":{}},{"name":"part","type":"string","nullable":true,"metadata":{}}]}', 'spark.sql.sources.schema.partCol.0'='part', 'transient_lastDdlTime'='1640620564')
Hudi表类型选择DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL在sync hive时将生成ro和rt表,当选择DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL只会生成一张表
2)更新数据
def update(spark: SparkSession, path: String) = { import org.apache.spark.sql.functions._ val frame: DataFrame = spark.read.option("header", "true") .csv(path) // 修改部分数据 val timestamp: String = System.currentTimeMillis().toString val result: DataFrame = frame.where("bir_date='19970101'") .withColumn("bir_date", from_unixtime(unix_timestamp(col("bir_date"), "yyyyMMdd"), "yyyy/MM/dd")) .withColumn("update_date", lit("20211227")) .withColumn("name", lit("baron")) .withColumn("ts", lit(timestamp)) .withColumn("part", col("update_date")) result.write.format("hudi") .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) //选择表的类型 到底是MERGE_ON_READ 还是 COPY_ON_WRITE .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uuid") //设置主键 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") //数据更新时间戳的 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "part") //hudi分区列 .option("hoodie.table.name", "member") //hudi表名 "hoodie.table.name" .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://hadoop001:10000") //hiveserver2地址 .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "default") //设置hudi与hive同步的数据库 .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "member") //设置hudi与hive同步的表名 .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "part") //hive表同步的分区列 .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName) // 分区提取器 按/ 提取分区 .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") //设置数据集注册并同步到hive .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") //设置当分区变更时,当前数据的分区目录是否变更 .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name()) //设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM .option("hoodie.insert.shuffle.parallelism", "12") .option("hoodie.upsert.shuffle.parallelism", "12") .mode(SaveMode.Append) .save("/workspace/hudi/hive") }
3)查询增量视图
def incrementalQuery(spark: SparkSession) = { spark.read.format("hudi") .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), "20211227000657989") .option(DataSourceReadOptions.END_INSTANTTIME.key(), "20211228003845629") .load("/workspace/hudi/hive") .show(false) }