在自定义Flink1.10 Sql Sink遇到的问题
1.org.apache.flink.table.api.TableException: Table sink does not implement a table schema.
问题:在RedisTableSink中没有重写getTableSchema方法
解决:增加重写getTableSchema
@Override
public TableSchema getTableSchema() {return tableSchema;}
2.org.apache.flink.table.api.TableException: Table sink does not implement a consumed data type.
问题:在RedisTableSink中没有重写getConsumedDataType方法
解决:增加重写getConsumedDataType
@Override
public DataType getConsumedDataType() {return tableSchema.toRowDataType(); }
3.org.apache.flink.api.common.InvalidProgramException: root
|-- pay_hour: STRING
|-- item_id: STRING
is not serializable. The object probably contains or references non serializable fields.
问题:在RedisTableSink的emitDataStream方法中将tableSchema传到RedisSinkFunction方法中去,而TableSchema未实现Serializable,出现序列化的问题
解决:因为传进去的tableSchema暂时没有用到,所以去掉不传,如有必要传入需要自己扩展一个Schema 然后implements Serializable
4.org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: The StreamTableSink#consumeDataStream(DataStream) must be implemented and return the sink transformation DataStreamSink. However, com.yunji.bigdata.connect.redis.RedisTableSink doesn't implement this method.
问题:使用了废弃的emitDataStream方法,而且没有重写consumeDataStream,再本地单元测试跑是没问题,on yarn跑出现以上错误
解决:将核心代码写在consumeDataStream,emitDataStream再调用consumeDataStream