Hudi-SparkSQL增删改查Hudi表
进入spark-sql shell
./spark-sql --master local[2] --jars /Users/FengZhen/Desktop/Hadoop/spark/spark-3.0.3-bin-hadoop2.7/jars/spark-avro_2.12-3.0.3.jar,/Users/FengZhen/Desktop/Hadoop/hudi/spark3.0.3/hudi-0.10.1/packaging/hudi-spark-bundle/target/hudi-spark3.0.3-bundle_2.12-0.10.1.jar --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'hudi默认upsert/insert/delete的并发度是1500,对于演示小规模数据集设置更小的并发度
set hoodie.upsert.shuffle.parallelism = 1; set hoodie.insert.shuffle.parallelism = 1; set hoodie.delete.shuffle.parallelism = 1;设置不同步hudi表元数据
set hoodie.datasource.meta.sync.enable=false;
1.建表(带LOCATION表示为外部表)
创建分区表,表的类型为MOR,主键为id,分区字段为dt,合并字段为tsCREATE TABLE test_hudi_table( id INT, name STRING, price DOUBLE, ts LONG, dt STRING ) USING hudi PARTITIONED BY (dt) OPTIONS( primaryKey = 'id', preCombineField = 'ts', type = 'mor' ) LOCATION 'hdfs://localhost:9000/hudi-warehouse/test_hudi_table';
根据已有表创建
create table hudi_existing_tbl1 using hudi partitioned by (dt) location 'hdfs://localhost:9000/hudi-warehouse/test_hudi_table';
用查询结果创建新表
CREATE TABLE test_hudi_ctas_cow_tbl USING hudi TBLPROPERTIES (primaryKey = 'id') AS SELECT 1 AS id, 'a1' AS name, 10 AS price;
用查询结果创建分区表
CREATE TABLE test_hudi_ctas_cow_pt_tbl USING hudi TBLPROPERTIES (type = 'cow', primaryKey = 'id', preCombineField = 'ts') partitioned by (dt) AS SELECT 1 AS id, 'a1' AS name, 10 AS price, 1000 AS ts, '2022-03-07' AS dt;
创建parquet数据表
CREATE TABLE test_hudi_parquet_cow_tbl USING parquet location 'hdfs://localhost:9000/hudi-warehouse/test_hudi_table/*.parquet';
利用数据进行分区
CREATE TABLE test_hudi_ctas_cow_pt_tbl1 USING hudi location 'hdfs://localhost:9000/hudi-warehouse/test_hudi_table' options ( type = 'cow', primaryKey = 'id', preCombineField = 'ts' ) partitioned by (datestr) AS SELECT * from parquet_mngd;
查看建表语句
show create table test_hudi_table; CREATE TABLE `default`.`test_hudi_table` ( `_hoodie_commit_time` STRING, `_hoodie_commit_seqno` STRING, `_hoodie_record_key` STRING, `_hoodie_partition_path` STRING, `_hoodie_file_name` STRING, `id` INT, `name` STRING, `price` DOUBLE, `ts` BIGINT, `dt` STRING) USING hudi PARTITIONED BY (dt) LOCATION 'hdfs://localhost:9000/hudi-warehouse/test_hudi_table' TBLPROPERTIES ( 'primaryKey' = 'id', 'type' = 'mor', 'preCombineField' = 'ts')
2.插入数据
INSERT INTO test_hudi_table SELECT 1 AS id, 'hudi' AS name, 10 AS price, 1000 as ts, '2022-03-07' as dt; INSERT INTO test_hudi_table SELECT 2 AS id, 'hudi_2' AS name, 9 AS price, 900 as ts, '2022-03-07' as dt; INSERT INTO test_hudi_table SELECT 3 AS id, 'hudi_3' AS name, 8 AS price, 800 as ts, '2022-03-07' as dt;
3.查询
表结构spark-sql> desc test_hudi_table; _hoodie_commit_time string NULL _hoodie_commit_seqno string NULL _hoodie_record_key string NULL _hoodie_partition_path string NULL _hoodie_file_name string NULL id int NULL name string NULL price double NULL ts bigint NULL dt string NULL # Partition Information # col_name data_type comment dt string NULL Time taken: 0.341 seconds, Fetched 13 row(s)数据
SELECT * FROM test_hudi_table; 20220307150033176 20220307150033176_0_1 id:1 dt=2022-03-07 9b4ae2d5-956b-488d-b796-829f4cdac7d2-0_0-27-15_20220307150033176.parquet 1 hudi 10.0 1000 2022-03-07 20220307150243202 20220307150243202_0_2 id:2 dt=2022-03-07 9b4ae2d5-956b-488d-b796-829f4cdac7d2-0_0-62-33_20220307150243202.parquet 2 hudi_2 9.0 900 2022-03-07 20220307150302352 20220307150302352_0_3 id:3 dt=2022-03-07 9b4ae2d5-956b-488d-b796-829f4cdac7d2-0_0-97-51_20220307150302352.parquet 3 hudi_3 8.0 800 2022-03-07
4.更新数据
UPDATE test_hudi_table SET price = 100.0 WHERE id = 1;
5.删除数据
DELETE FROM test_hudi_table WHERE id = 1;
6.MergeInfo
依据判断条件,决定对数据操作时,属于插入insert,更新update,还是删除delete #按条件合并数据MERGE INTO tableIdentifier AS target_alias USING (sub_query | tableIdentifier) AS source_alias ON[ WHEN MATCHED [ AND ] THEN ] [ WHEN MATCHED [ AND ] THEN ] [ WHEN NOT MATCHED [ AND ] THEN ] /** =A equal bool condition */= DELETE | UPDATE SET * | UPDATE SET column1 = expression1 [, column2 = expression2 ...] = INSERT * | INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
MERGE INTO test_hudi_table AS t0 USING( SELECT 4 AS id, 'hadoop' AS name, 10 AS price, 9000 as ts, '2022-03-07' as dt ) AS s0 ON t0.id = s0.id WHEN NOT MATCHED THEN INSERT *; MERGE INTO test_hudi_table AS t0 USING( SELECT 4 AS id, 'hadoop' AS name, 10 AS price, 9999 as ts, '2022-03-07' as dt ) AS s0 ON t0.id = s0.id WHEN MATCHED THEN UPDATE SET *; MERGE INTO test_hudi_table AS t0 USING( SELECT 4 AS id, 'hadoop' AS name, 10 AS price, 9999 as ts, '2022-03-07' as dt ) AS s0 ON t0.id = s0.id WHEN MATCHED THEN DELETE;
7.修改表相关信息
修改表名 ALTER TABLE oldTableName RENAME TO newTableName 例子 ALTER TABLE test_hudi_cow_tbl RENAME TO test_hudi_cow_tbl1; 增加列簇 ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*) 例子 ALTER TABLE test_hudi_cow_tbl1 add columns(desc string); 修改列簇类型 ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType 例子 ALTER TABLE test_hudi_cow_tbl1 change column id id bigint; 修改数据 ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value') 例子 alter table test_hudi_cow_tbl set tblproperties (hoodie.keep.max.commits = '10');
8.分区操作
显示分区 SHOW PARTITIONS tableIdentifier 例程 show partitions test_hudi_cow_pt_tbl; 删除分区 ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name = partition_col_val [ , ... ] ) 例子 alter table test_hudi_cow_pt_tbl drop partition (dt='2022-03-07');