flink - sql-client连接kafka报错


1. 背景

测试环境连接kafka

2. 步骤

将以下两个jar包上传至./lib目录

flink-connector-kafka_2.11-1.12.7.jar
kafka-clients-2.4.1.jar

启动sql-client

./bin/sql-client.sh embedded

> use catalog default_catalog;
> use default_database;

> CREATE TABLE kafka_source( 
inspectResult STRING,  
resultDesc STRING,  
tradeId STRING,  
`timestamp` BIGINT, 
joinTime TIMESTAMP(3), 
resultTime TIMESTAMP(3)   
) WITH  (  
'connector' = 'kafka', 
'topic' = 'dw-test-result', 
'properties.bootstrap.servers' = 'ip:6667', 
'properties.group.id' = 'flinksqlGroup', 
'scan.startup.mode' = 'earliest-offset', 
'format' = 'json');

> select *from kafka_source;
<模拟生产数据,此处实时消费到。mode: tableau>

维表最好放到HBase或mysql,避免频繁连接hive,太慢

3. 引用

classnotfoundexception org.apache.kafka.common.serialization.ByteArrayDeserializer
flink sql-client 使用记录
Flink SQL Client的使用