Spark2.4-cdh6.2.1集成hudi0.10初探
一、hudi编译
1)下载0.10版本的hudi,因为cdh6..2自带spark是2.4.0版本的,需要改下代码,注释掉整个if内容,否则会报错
2)将编译完成的hudi-spark-bundle_2.11-0.10.0.jar放到spark home的jars下
# 编译
mvn clean package -DskipTests
# 移动jar到spark home
mv ./hudi-spark-bundle_2.11-0.10.0.jar ${SPARK_HOME}/jars
二、spark-shell测试
1)启动spark shell
# cdh上使用默认的spark即可,已经配置在环境变量里面了
spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.4 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
# 使用其他版本的spark,例如spark-2.4.4, on yarn模式
spark-shell --master yarn-client --driver-memory 1g --num-executors 2 --executor-cores 2 --executor-memory 2g --jars ../jars/hudi-spark-bundle_2.11-0.10.0.jar --packages org.apache.spark:spark-avro_2.11:2.4.4 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
2)官方测试用例
// shell中执行 import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ val tableName = "hudi_trips_cow" // 这里使用hdfs路径,不适用本地路径 // val basePath = "file:///tmp/hudi_trips_cow" val basePath = "/tmp/hudi_trips_cow" val dataGen = new DataGenerator val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) // 插入数据 df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath) // 查询数据 val frame: DataFrame = spark.read.format("hudi") .load(basePath) frame.show() frame.where("fare > 20.0").select("fare", "begin_lon", "begin_lat", "ts").show() frame.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show() // 按照时间轴进行查询 spark.read.format("hudi") .option("as.of.instant", "2021-12-24") .load(basePath)
3)Idea上测试代码,需要将编译的hudi-spark-bundle_2.11-0.10.0.jar放入自己的工程下
- pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0modelVersion> <groupId>com.shydowgroupId> <artifactId>spark-hudi-tutorialartifactId> <version>1.0-SNAPSHOTversion> <dependencies> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-core_2.11artifactId> <version>2.4.0-cdh6.2.1version> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-sql_2.11artifactId> <version>2.4.0-cdh6.2.1version> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-hive_2.11artifactId> <version>2.4.0-cdh6.2.1version> dependency> <dependency> <groupId>org.scala-langgroupId> <artifactId>scala-libraryartifactId> <version>2.11.12version> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-avro_2.11artifactId> <version>2.4.4version> dependency> <dependency> <groupId>org.apache.hadoopgroupId> <artifactId>hadoop-clientartifactId> <version>3.0.0-cdh6.2.1version> dependency> <dependency> <groupId>mysqlgroupId> <artifactId>mysql-connector-javaartifactId> <version>8.0.25version> dependency> dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.pluginsgroupId> <artifactId>maven-compiler-pluginartifactId> <version>3.5.1version> <configuration> <source>1.8source> <target>1.8target> configuration> plugin> <plugin> <groupId>net.alchim31.mavengroupId> <artifactId>scala-maven-pluginartifactId> <version>3.2.2version> <executions> <execution> <goals> <goal>compilegoal> <goal>testCompilegoal> goals> <configuration> <args> <arg>-dependencyfilearg> <arg>${project.build.directory}/.scala_dependenciesarg> args> configuration> execution> executions> plugin> <plugin> <groupId>org.apache.maven.pluginsgroupId> <artifactId>maven-assembly-pluginartifactId> <version>2.6version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependenciesdescriptorRef> descriptorRefs> configuration> <executions> <execution> <id>make-assembleid> <phase>packagephase> <goals> <goal>singlegoal> goals> execution> executions> plugin> plugins> build> project>
- 测试用例
package com.shydow.Hudi import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ /** * @author Shydow * @date 2021/12/25 15:13 * @desc Hudi写入测试: 阿里云服务器搭建的cdh需要设置hdfs-site.xml:dfs.client.use.datanode.hostname = true */ object HudiTest { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .master("local[*]") .appName("insert") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ /* 插入数据 */ // insertData(spark) /* 查询数据 */ queryData(spark, "/workspace/hudi_trips_cow") /* 按时间轴查询数据 */ queryWithTime(spark, "/workspace/hudi_trips_cow", "2021-12-24") spark.close() } /** * @param spark */ def insertData(spark: SparkSession) = { val dataGen = new DataGenerator() val inserts = convertToStringList(dataGen.generateInserts(10)) val frame: DataFrame = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) frame.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option("hoodie.table.name", "hudi_trips_cow"). mode(Overwrite). save("/workspace/hudi_trips_cow") } /** * @param spark * @param basePath */ def queryData(spark: SparkSession, basePath: String) = { val frame: DataFrame = spark.read.format("hudi") .load(basePath) frame.show() frame.where("fare > 20.0").select("fare", "begin_lon", "begin_lat", "ts").show() } /** * @param spark * @param basePath * @param time : "2021-07-28" -> "2021-07-28 00:00:00" * @return */ def queryWithTime(spark: SparkSession, basePath: String, time: String) = { val frame: DataFrame = spark.read.format("hudi") .option("as.of.instant", time) .load(basePath) frame.show() } }