flink - sql-client连接kafka报错
1. 背景
测试环境连接kafka
2. 步骤
将以下两个jar包上传至./lib目录
flink-connector-kafka_2.11-1.12.7.jar
kafka-clients-2.4.1.jar
启动sql-client
./bin/sql-client.sh embedded
> use catalog default_catalog;
> use default_database;
> CREATE TABLE kafka_source(
inspectResult STRING,
resultDesc STRING,
tradeId STRING,
`timestamp` BIGINT,
joinTime TIMESTAMP(3),
resultTime TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'dw-test-result',
'properties.bootstrap.servers' = 'ip:6667',
'properties.group.id' = 'flinksqlGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json');
> select *from kafka_source;
<模拟生产数据,此处实时消费到。mode: tableau>
维表最好放到HBase或mysql,避免频繁连接hive,太慢
3. 引用
classnotfoundexception org.apache.kafka.common.serialization.ByteArrayDeserializer
flink sql-client 使用记录
Flink SQL Client的使用