0.进入shell
./sql-client.sh embedded shell
1.建表关联kafka
CREATE TABLE order_kafka_source(
`orderId` STRING,
`userId` STRING,
`orderTime` STRING,
`ip` STRING,
`orderMoney` DOUBLE,
`orderStatus` INT
)
WITH(
'connector' = 'kafka',
'topic'='order-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'gid-1001',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
2.建表sink到hudi
CREATE TABLE order_hudi_sink(
`orderId` STRING PRIMARY KEY NOT ENFORCED,
`userId` STRING,
`orderTime` STRING,
`ip` STRING,
`orderMoney` DOUBLE,
`orderStatus` INT,
`ts` STRING,
`partition_day` STRING
)
PARTITIONED BY (partition_day)
WITH(
'connector' = 'hudi',
'path'='hdfs://localhost:9000/hudi-warehouse/flink_hudi_order',
'table.type' = 'MERGE_ON_READ',
'write.operation' = 'upsert',
'hoodie.datasource.write.recordkey.field' = 'orderId',
'write.precombine.field' = 'ts',
'write.tasks' = '1',
'compaction.tasks' = '1',
'compaction.async.enable' = 'true',
'compaction.trigger.strategy' = 'num_commits',
'compaction.delta_commits' = '1'
);
3.写入hudi表
INSERT INTO order_hudi_sink
SELECT
orderId,userId,orderTime,ip,orderMoney,orderStatus,
substring(orderId, 0, 17) AS ts, substring(orderTime, 0, 10) AS partition_day
FROM order_kafka_source;