flink问题总结


目录
  • 多线程提交多个flinkJob
  • flink从kafka接收脏数据后一直restarting

多线程提交多个flinkJob

  • 如果代码只是flinkJob代码,要保证创建多个线程的这个线程(main)在代码执行结束前完成flinkJob的提交(main线程sleep 10s),否则main线程退出,提交flinkJob的线程还没有完成提交那么这些线程就会退出。在main线程退出后,这些子线程也会退出(为什么会退出?)
  • 如果是异步提交,即使main线程退出,子线程也会执行完再退出,在flink集群上提交没问题,在本地同样可能失败,也需要在job提交之间加上间隔500ms
  • springboot的applicationRunner多线程提交没有问题

flink从kafka接收脏数据后一直restarting

flink从kafka读取数据后做了一个map,也增加了异常保护,但只要是脏数据就会使flinkJob重启

原因:map方法所在的类没有实现Serializable接口,在并行度不是1的情况下需要传输算子

2022-04-13 20:39:57 Task.java WARN Source: Custom Source -> Map -> (Flat Map, Flat Map) (2/8) (9d545157586a67d28091360912033fc3) switched from RUNNING to FAILED.
java.lang.UnsupportedOperationException
        at java.base/java.util.ImmutableCollections.uoe(ImmutableCollections.java:71)
        at java.base/java.util.ImmutableCollections$AbstractImmutableCollection.add(ImmutableCollections.java:75)
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)

BD