整合 KAFKA+Flink 实例(第一部分,趟坑记录)

2017年后,一大波网络喧嚣,说流式处理如何牛叉,如何高大上,抱歉,工作满负荷,没空玩那个;

今年疫情隔离在家,无聊,开始学习 KAFKA+Flink ,目前的打算是用爬虫抓取网页数据,传递到Kafka中,再用Flink计算。

个人性格原因,我不愿意过分沉迷于纸质或者电子教程材料,也不是特别喜欢网上某些培训机构已经过时了的所谓培训视频,

喜欢动手直接写代码,所以简单翻看一点PDF教程,看了两集“培训视频”,也没说Kafka、flink两组件咋结合使用,不耐烦,直接开码(码农的糙性);

之前我写过的随笔已经有在windows上装Kafka、flink组件了,之前写了一个入门的Kafka使用代码;

算是有简单的 Kafka使用成功案例;从昨天开始,我开始重新码代码;

我先叨逼叨几句,顺便把趟坑的过程一并写上,后续我会把相关代码以及整体思路,整理好后,一并再发出来。

  • 一   叨逼叨

网上的KAFKA+Flink 的例子百分之90都是读取文本文件,或者弄个循环10000次,查内存数据,这两种案例,来讲解Flink的处理机制;

例如: https://www.cnblogs.com/huxi2b/p/7219792.html     https://blog.csdn.net/weixin_44575542/article/details/88594773

我迅速浏览代码后,忍俊不禁,为什么做10000次的循环,超过10000次就不跑了?这是什么应用场景?

有人说,做一个while(true)循环不就得了,是,可以,还是为了达到写例子而写代码;请问,如果你用了无限循环,某天用户说,我临时决定,暂停下,过会再跑,你怎么弄?停掉整个应用?有人又会说,接入前端信号就行。

因为应用场景决定了应用的架构、功能以及开发的方向,对此我不想抬杠,这个话题就到此结束吧。

  • 二   接着 叨逼叨

大多网友传递对象时,都是自己手动序列化对象,甚至直接用字符串,中间用逗号分隔;例如

 1 String value = String.format("%d,%s,%d", System.currentTimeMillis(), "machine-1", currentMemSize());
 2 System.out.println("发送数据-->"+value);
 3 producer.send(new ProducerRecord<Object, String>("demo", value), new Callback() {
 4 @Override
 5 public void onCompletion(RecordMetadata metadata, Exception exception) {
 6 if (exception != null) {
 7 System.out.println("Failed to send message with exception " + exception);
 8 }
 9 }
10 });

我看了,就想问,你这对象挺简单的哈,要是字符串对象中的值,本身就有逗号,你咋办?

另外,有些对象属性类型复杂,既有String,又有 BigDecimal ,甚至里面有嵌入 ArrayList对象,你这咋整啊?

不序列化了?或者都toString了,再整一起??

  • 三  反省

网友大多使用对象 FlinkKafkaConsumer011  来接收处理Flink数据,说实话,我挺不屑的,感觉都在互相抄代码,没劲;

我第一次弄的时候,用的是  FlinkKafkaConsumer082 ,我误以为082比011版本高,直到我在某一阶段全部报有关 FlinkKafkaConsumer082 的错误时,我才开始意识到我的错误;

Apache组织命名对象的版本时,真心会让我混乱,难道82不比11大吗?难道版本号不是越大越新吗?最后,事实我告诉自己,真的,真的不是!

所以在抄别人代码前,还是自己去官网确定版本吧,甚至都不能信阿里的maven站,当以 artifactId  名称来搜索排序时,一定要多往下拉数据看,阿里的排序不是最高版本在最前面,往往既不在最前面,也不在最后面;

  • 四  处理异常

截止到现在,我先写取Kafka数据输出到本地文本,使用了对象  SingleOutputStreamOperator   但是

出现异常:

17:48:59.084 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
17:48:59.084 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
17:48:59.558 [main] WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - Log file environment variable ‘log.file‘ is not set.
17:48:59.558 [main] WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable ‘log.file‘ or configuration key ‘Key: ‘web.log.path‘ , default: null (fallback keys: [{key=jobmanager.web.log.path, isDeprecated=true}])‘.
17:48:59.646 [main] DEBUG org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Failed to load web based job submission extension.
org.apache.flink.util.FlinkException: The module flink-runtime-web could not be found in the class path. Please add this jar in order to enable web based job submission.
	at org.apache.flink.runtime.webmonitor.WebMonitorUtils.loadWebSubmissionExtension(WebMonitorUtils.java:192)
	at org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.initializeHandlers(DispatcherRestEndpoint.java:98)
	at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:141)
	at org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:161)
	at org.apache.flink.runtime.minicluster.MiniCluster.createDispatcherResourceManagerComponents(MiniCluster.java:378)
	at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:313)
	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:114)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
	at com.kafkastudy.kafka01.FlinkDealWithKafka.main(FlinkDealWithKafka.java:93)
17:48:59.808 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent - Platform: Windows
17:48:59.814 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
17:48:59.815 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0 - Java version: 8
17:48:59.822 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
17:48:59.826 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
17:48:59.830 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
17:48:59.834 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
17:48:59.838 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
17:48:59.838 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
17:48:59.838 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available
17:48:59.839 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
17:48:59.840 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\ADMINI~1\AppData\Local\Temp (java.io.tmpdir)
17:48:59.841 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
17:48:59.848 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 1888485376 bytes
17:48:59.849 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
17:48:59.855 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
17:48:59.856 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
。。。。此处省略10万字。。。。。。。。。。。。
java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:259)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:745)
17:49:02.797 [Map -> Sink: Unnamed (3/4)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Map -> Sink: Unnamed (3/4) (98cf4b7fc2b591cf937bc7a97aab620b).
17:49:02.797 [Map -> Sink: Unnamed (3/4)] DEBUG org.apache.flink.runtime.taskmanager.Task - Release task Map -> Sink: Unnamed (3/4) network resources (state: FAILED).
17:49:02.797 [Map -> Sink: Unnamed (3/4)] DEBUG org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate - Map -> Sink: Unnamed (3/4) (98cf4b7fc2b591cf937bc7a97aab620b): Releasing org.apa[email protected]18cbdfe5.
17:49:02.797 [Map -> Sink: Unnamed (3/4)] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - ReleaseOnConsumptionResultPartition [email protected]f [PIPELINED_BOUNDED, 4 subpartitions, 3 pending consumptions]: Received consumed notification for subpartition 2.
17:49:02.805 [Map -> Sink: Unnamed (4/4)] INFO org.apache.flink.runtime.taskmanager.Task - Map -> Sink: Unnamed (4/4) (6aac35ea1e046a7f786795f9f10aca80) switched from RUNNING to FAILED.
java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:259)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:745)
17:49:02.806 [Map -> Sink: Unnamed (4/4)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Map -> Sink: Unnamed (4/4) (6aac35ea1e046a7f786795f9f10aca80).
17:49:02.806 [Map -> Sink: Unnamed (4/4)] DEBUG org.apache.flink.runtime.taskmanager.Task - Release task Map -> Sink: Unnamed (4/4) network resources (state: FAILED).
17:49:02.806 [Map -> Sink: Unnamed (4/4)] DEBUG org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate - Map -> Sink: Unnamed (4/4) (6aac35ea1e046a7f786795f9f10aca80): Releasing org.apa[email protected]4e47cea0.
17:49:02.806 [Map -> Sink: Unnamed (4/4)] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - ReleaseOnConsumptionResultPartition [email protected]f [PIPELINED_BOUNDED, 4 subpartitions, 2 pending consumptions]: Received consumed notification for subpartition 3.
17:49:02.806 [Map -> Sink: Unnamed (1/4)] INFO org.apache.flink.runtime.taskmanager.Task - Map -> Sink: Unnamed (1/4) (617855a53333899f9ea43b4a5cbf89d7) switched from RUNNING to FAILED.
java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:259)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:745)
17:49:02.807 [Map -> Sink: Unnamed (1/4)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Map -> Sink: Unnamed (1/4) (617855a53333899f9ea43b4a5cbf89d7).
17:49:02.807 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.runtime.taskmanager.Task - Release task Map -> Sink: Unnamed (1/4) network resources (state: FAILED).
17:49:02.807 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate - Map -> Sink: Unnamed (1/4) (617855a53333899f9ea43b4a5cbf89d7): Releasing org.apa[email protected]d04e29b.
17:49:02.807 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - ReleaseOnConsumptionResultPartition [email protected]f [PIPELINED_BOUNDED, 4 subpartitions, 1 pending consumptions]: Received consumed notification for subpartition 0.
17:49:02.811 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Invoking Map -> Sink: Unnamed (2/4)
17:49:02.811 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Creating operator state backend for StreamSink_c09dc291fad93d575e015871097bfc60_(2/4) with empty state.
17:49:02.811 [Map -> Sink: Unnamed (2/4)] INFO org.apache.flink.runtime.taskmanager.Task - Map -> Sink: Unnamed (2/4) (c54ff28542a5211674c06383230451d2) switched from RUNNING to FAILED.
java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:259)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:745)
17:49:02.812 [Map -> Sink: Unnamed (2/4)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Map -> Sink: Unnamed (2/4) (c54ff28542a5211674c06383230451d2).
17:49:02.812 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.taskmanager.Task - Release task Map -> Sink: Unnamed (2/4) network resources (state: FAILED).
17:49:02.812 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate - Map -> Sink: Unnamed (2/4) (c54ff28542a5211674c06383230451d2): Releasing org.apa[email protected]54ec612c.
17:49:02.812 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - ReleaseOnConsumptionResultPartition [email protected]f [PIPELINED_BOUNDED, 4 subpartitions, 0 pending consumptions]: Received consumed notification for subpartition 1.
17:49:02.812 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionManager - Received consume notification from ReleaseOnConsumptionResultPartition [email protected]f [PIPELINED_BOUNDED, 4 subpartitions, 0 pending consumptions].
17:49:02.812 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - Source: Custom Source (1/1) (fda750c24ba9ee2e3576ba73b3fe76ef): Releasing ReleaseOnConsumptionResultPartition [email protected]f [PIPELINED_BOUNDED, 4 subpartitions, 0 pending consumptions].
17:49:02.812 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.io.network.partition.PipelinedSubpartition - Source: Custom Source (1/1) (fda750c24ba9ee2e3576ba73b3fe76ef): Released PipelinedSubpartition#0 [number of buffers: 0 (0 bytes), number of buffers in backlog: 0, finished? false, read view? false].
17:49:02.812 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.io.network.partition.PipelinedSubpartition - Source: Custom Source (1/1) (fda750c24ba9ee2e3576ba73b3fe76ef): Released PipelinedSubpartition#1 [number of buffers: 0 (0 bytes), number of buffers in backlog: 0, finished? false, read view? false].
17:49:02.813 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.io.network.partition.PipelinedSubpartition - Source: Custom Source (1/1) (fda750c24ba9ee2e3576ba73b3fe76ef): Released PipelinedSubpartition#2 [number of buffers: 0 (0 bytes), number of buffers in backlog: 0, finished? false, read view? false].
17:49:02.813 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.io.network.partition.PipelinedSubpartition - Source: Custom Source (1/1) (fda750c24ba9ee2e3576ba73b3fe76ef): Released PipelinedSubpartition#3 [number of buffers: 0 (0 bytes), number of buffers in backlog: 0, finished? false, read view? false].
17:49:02.814 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionManager - Released partition 6bf24c587ed797fcbedd1d758500d61c produced by fda750c24ba9ee2e3576ba73b3fe76ef.
17:49:02.828 [Map -> Sink: Unnamed (3/4)] INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Map -> Sink: Unnamed (3/4) (98cf4b7fc2b591cf937bc7a97aab620b) [FAILED]
17:49:02.845 [Source: Custom Source (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) (fda750c24ba9ee2e3576ba73b3fe76ef) switched from RUNNING to FAILED.
java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:259)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:745)
17:49:02.845 [Source: Custom Source (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source (1/1) (fda750c24ba9ee2e3576ba73b3fe76ef).
17:49:02.845 [Source: Custom Source (1/1)] DEBUG org.apache.flink.runtime.taskmanager.Task - Release task Source: Custom Source (1/1) network resources (state: FAILED).
17:49:02.845 [Source: Custom Source (1/1)] DEBUG org.apache.flink.runtime.io.network.TaskEventDispatcher - unregistering [email protected]f
17:49:02.854 [Map -> Sink: Unnamed (2/4)] INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Map -> Sink: Unnamed (2/4) (c54ff28542a5211674c06383230451d2) [FAILED]
17:49:02.864 [Map -> Sink: Unnamed (1/4)] INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Map -> Sink: Unnamed (1/4) (617855a53333899f9ea43b4a5cbf89d7) [FAILED]
17:49:02.872 [Map -> Sink: Unnamed (4/4)] INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Map -> Sink: Unnamed (4/4) (6aac35ea1e046a7f786795f9f10aca80) [FAILED]
17:49:02.875 [flink-akka.actor.default-dispatcher-6] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Map -> Sink: Unnamed c54ff28542a5211674c06383230451d2.
17:49:02.882 [Source: Custom Source (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Source: Custom Source (1/1) (fda750c24ba9ee2e3576ba73b3fe76ef) [FAILED]
17:49:02.916 [flink-akka.actor.default-dispatcher-6] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Map -> Sink: Unnamed 617855a53333899f9ea43b4a5cbf89d7.
17:49:02.922 [flink-akka.actor.default-dispatcher-6] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Map -> Sink: Unnamed 6aac35ea1e046a7f786795f9f10aca80.
17:49:02.925 [flink-akka.actor.default-dispatcher-6] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Map -> Sink: Unnamed 98cf4b7fc2b591cf937bc7a97aab620b.
17:49:02.929 [flink-akka.actor.default-dispatcher-6] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source fda750c24ba9ee2e3576ba73b3fe76ef.
17:49:02.943 [flink-akka.actor.default-dispatcher-6] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (2/4) (c54ff28542a5211674c06383230451d2) switched from RUNNING to FAILED.
java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:259)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:745)
17:49:02.944 [flink-akka.actor.default-dispatcher-6] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job haha haha haha  (cf694450927f961839bafbb133deb26a) switched from state RUNNING to FAILING.
java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:259)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:745)
。。。。。。。。此处省略10万字。。。。。。。。。
17:49:03.393 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
java.lang.reflect.InvocationTargetException: null
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	... 26 common frames omitted
17:49:03.397 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
java.lang.reflect.InvocationTargetException: null
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	... 26 common frames omitted
17:49:03.399 [FileCache shutdown hook] INFO org.apache.flink.runtime.filecache.FileCache - removed file cache directory C:\Users\ADMINI~1\AppData\Local\Temp\flink-dist-cache-bb7b3455-c09d-4f72-b976-700dbe04fa8b
17:49:03.400 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
java.lang.reflect.InvocationTargetException: null
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	... 26 common frames omitted
17:49:03.402 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
java.lang.reflect.InvocationTargetException: null
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	... 26 common frames omitted
17:49:03.404 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
java.lang.reflect.InvocationTargetException: null
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	... 26 common frames omitted
17:49:03.405 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution ca30abbed6fe422e447a2317fd6acebd.
17:49:03.406 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution 55c3c6e5f98f094b513124cfa9d2e535.
17:49:03.406 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution 55c3c6e5f98f094b513124cfa9d2e535.
17:49:03.406 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution ca30abbed6fe422e447a2317fd6acebd.
17:49:03.412 [BlobServer shutdown hook] INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:60925
17:49:03.413 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (b80fb3c7568cd61fb19014367644963a) switched from DEPLOYING to FAILED.
java.util.concurrent.CompletionException: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871)
	at akka.dispatch.OnComplete.internal(Future.scala:263)
	at akka.dispatch.OnComplete.internal(Future.scala:261)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
	at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
	at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
	at akka.actor.ActorRef.tell(ActorRef.scala:126)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:285)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	... 21 common frames omitted
17:49:03.414 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job haha haha haha  (cf694450927f961839bafbb133deb26a) switched from state RUNNING to FAILING.
java.util.concurrent.CompletionException: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871)
	at akka.dispatch.OnComplete.internal(Future.scala:263)
	at akka.dispatch.OnComplete.internal(Future.scala:261)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
	at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
	at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
	at akka.actor.ActorRef.tell(ActorRef.scala:126)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:285)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	... 21 common frames omitted
17:49:03.421 [IOManagerAsync shutdown hook] INFO org.apache.flink.runtime.io.disk.FileChannelManagerImpl - FileChannelManager removed spill file directory C:\Users\ADMINI~1\AppData\Local\Temp\flink-io-d51c6c45-5dc5-4838-885c-57cba32545a8

Process finished with exit code 130

其实就两个,其他先不着急关注;

注意:

java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V

我看到它觉得有些亲切,让我回味以前我使用过的Apache的 IOUtils 包,它是那么的粗壮,尤其是它的读写速度

处理方式:

因为原来我这个程序最早用于网络爬虫,所以使用了Apache的IO包,我的pom.xml就有如下依赖

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-io</artifactId>
            <version>1.3.2</version>
        </dependency>

估计是我这个commons-io 版本太老了,没有对应Flink需要的那个方法,所以,我干脆去掉,让maven自己获取最新的依赖;

再跑一次,那个异常不在了;

另外,加入了

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>1.9.2</version>
            <scope>provided</scope>
        </dependency>

我想理论上能够看到Flink的WEB管理页面;暂时没调通,以后玩;

接着出现如下问题

  • 五    再次爆出异常
6) switched from CANCELING to CANCELED.
18:44:04.775 [flink-akka.actor.default-dispatcher-6] DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring transition of vertex Map -> Sink: Unnamed (3/4) - execution #64 to FAILED while being CANCELED.
18:44:04.775 [flink-akka.actor.default-dispatcher-6] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Releasing slot [SlotRequestId{42b886c8702b236d3f8170ef7539f93a}] because: Release multi task slot because all children have been released.
18:44:04.775 [flink-akka.actor.default-dispatcher-6] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Adding returned slot [e7657f78097e6f28e444f35fdc894b1f] to available slots
18:44:04.775 [flink-akka.actor.default-dispatcher-6] DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Try to restart or fail the job haha haha haha  (b2bddb8ca16ac234d435cdecc01659fa) if no longer possible.
org.apache.kafka.common.config.ConfigException: Invalid value true for configuration auto.offset.reset: Expected value to be a string, but it was a java.lang.Boolean
	at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:664)
	at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:473)
	at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:466)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
	at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:544)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:664)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:745)
18:44:04.775 [flink-akka.actor.default-dispatcher-6] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job haha haha haha  (b2bddb8ca16ac234d435cdecc01659fa) switched from state FAILING to RESTARTING.
18:44:04.775 [flink-akka.actor.default-dispatcher-6] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Restarting the job haha haha haha  (b2bddb8ca16ac234d435cdecc01659fa).
------。。。。。。。。。。。此处省略10万字。。。。。。。。。。。。----------18:44:04.787 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - Created PipelinedSubpartitionView(index: 0) of ResultPartition [email protected]e
18:44:04.787 [Map -> Sink: Unnamed (1/4)] INFO org.apache.flink.runtime.taskmanager.Task - Map -> Sink: Unnamed (1/4) (4451b752c6d7962b0e1bc728e050935d) switched from DEPLOYING to RUNNING.
18:44:04.787 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Initializing Map -> Sink: Unnamed (1/4).
18:44:04.787 [Map -> Sink: Unnamed (1/4)] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: ‘null‘, savepoints: ‘null‘, asynchronous: TRUE, maxStateSize: 5242880)
18:44:04.786 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - Found existing local state store for b2bddb8ca16ac234d435cdecc01659fa - 20ba6b65f97481d5570070de90e4e791 - 2 under allocation id e7657f78097e6f28e444f35fdc894b1f: [email protected]c77
18:44:04.789 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory - Map -> Sink: Unnamed (3/4) (6c2a4ed7a7a5f3340c223b8ff8abde09): Created 1 input channels (local: 1, remote: 0, unknown: 0).
18:44:04.790 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task Map -> Sink: Unnamed (3/4).
18:44:04.786 [Source: Custom Source (1/1)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Initializing Source: Custom Source (1/1).
18:44:04.790 [Source: Custom Source (1/1)] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: ‘null‘, savepoints: ‘null‘, asynchronous: TRUE, maxStateSize: 5242880)
18:44:04.786 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (00d802a0655bc54031f3d5224b3cecde) switched from DEPLOYING to RUNNING.
18:44:04.790 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (1/4) (4451b752c6d7962b0e1bc728e050935d) switched from DEPLOYING to RUNNING.
18:44:04.791 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - Found existing local state store for b2bddb8ca16ac234d435cdecc01659fa - 20ba6b65f97481d5570070de90e4e791 - 3 under allocation id 9a9df885ea71c266458b3e8f0b91680b: [email protected]4
18:44:04.791 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory - Map -> Sink: Unnamed (4/4) (7bced7a815a0d1395dd4ae86fbeb2d4d): Created 1 input channels (local: 1, remote: 0, unknown: 0).
18:44:04.791 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task Map -> Sink: Unnamed (4/4).
18:44:04.792 [Source: Custom Source (1/1)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Invoking Source: Custom Source (1/1)
18:44:04.792 [Source: Custom Source (1/1)] DEBUG org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Creating operator state backend for StreamSource_bc764cd8ddf7a0cff126f51c16239658_(1/1) with empty state.
18:44:04.792 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Invoking Map -> Sink: Unnamed (1/4)
18:44:04.792 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Creating operator state backend for StreamSink_c09dc291fad93d575e015871097bfc60_(1/4) with empty state.
18:44:04.792 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Creating operator state backend for StreamMap_20ba6b65f97481d5570070de90e4e791_(1/4) with empty state.
18:44:04.792 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.api.common.io.FileOutputFormat - Opening stream for output (1/4). WriteMode=NO_OVERWRITE, OutputDirectoryMode=PARONLY
18:44:04.792 [Source: Custom Source (1/1)] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition does not contain a setter for field topic
18:44:04.792 [Source: Custom Source (1/1)] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
18:44:04.793 [Map -> Sink: Unnamed (1/4)] INFO org.apache.flink.runtime.taskmanager.Task - Map -> Sink: Unnamed (1/4) (4451b752c6d7962b0e1bc728e050935d) switched from RUNNING to FAILED.
java.nio.file.FileAlreadyExistsException: File already exists: D:/temp/flink.txt/1
	at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:264)
	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
	at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:248)
	at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:88)
	at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:745)
18:44:04.794 [Map -> Sink: Unnamed (1/4)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Map -> Sink: Unnamed (1/4) (4451b752c6d7962b0e1bc728e050935d).
18:44:04.794 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.runtime.taskmanager.Task - Release task Map -> Sink: Unnamed (1/4) network resources (state: FAILED).
......................此处省略10万字...........................
8:44:04.795 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (2/4) (eb60a3819acf31d006784b95bb003336) switched from DEPLOYING to RUNNING.
18:44:04.795 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Map -> Sink: Unnamed 4451b752c6d7962b0e1bc728e050935d.
18:44:04.796 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (1/4) (4451b752c6d7962b0e1bc728e050935d) switched from RUNNING to FAILED.
java.nio.file.FileAlreadyExistsException: File already exists: D:/temp/flink.txt/1
	at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:264)
	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
	at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:248)
	at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:88)
	at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:745)
18:44:04.796 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job haha haha haha  (b2bddb8ca16ac234d435cdecc01659fa) switched from state RUNNING to FAILING.
java.nio.file.FileAlreadyExistsException: File already exists: D:/temp/flink.txt/1
	at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:264)
	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
	at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:248)
	at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:88)
	at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:745)
......................此处省略10万字...........................

18:44:06.042 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Map -> Sink: Unnamed (4/4) (attempt #86) to 7bd8e6bd-ed0d-4db4-89a3-13dbdfe20ea3 @ 127.0.0.1 (dataPort=-1)
18:44:06.045 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
java.lang.reflect.InvocationTargetException: null
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	... 25 common frames omitted
18:44:06.047 [FileCache shutdown hook] INFO org.apache.flink.runtime.filecache.FileCache - removed file cache directory C:\Users\ADMINI~1\AppData\Local\Temp\flink-dist-cache-7995f667-7f90-4b54-90f1-c9664767a3f5
18:44:06.047 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
java.lang.reflect.InvocationTargetException: null
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	... 25 common frames omitted
18:44:06.050 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
java.lang.reflect.InvocationTargetException: null
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	... 25 common frames omitted
18:44:06.051 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
java.lang.reflect.InvocationTargetException: null
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	... 25 common frames omitted
18:44:06.052 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
java.lang.reflect.InvocationTargetException: null
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	... 25 common frames omitted
18:44:06.053 [BlobServer shutdown hook] INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:62829
18:44:06.056 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (3a1619b08946aa7586897c84bf1131ea) switched from DEPLOYING to FAILED.
java.util.concurrent.CompletionException: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871)
	at akka.dispatch.OnComplete.internal(Future.scala:263)
	at akka.dispatch.OnComplete.internal(Future.scala:261)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
	at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
	at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
	at akka.actor.ActorRef.tell(ActorRef.scala:126)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:285)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	... 21 common frames omitted
18:44:06.057 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job haha haha haha  (b2bddb8ca16ac234d435cdecc01659fa) switched from state RUNNING to FAILING.
java.util.concurrent.CompletionException: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871)
	at akka.dispatch.OnComplete.internal(Future.scala:263)
	at akka.dispatch.OnComplete.internal(Future.scala:261)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
	at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
	at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
	at akka.actor.ActorRef.tell(ActorRef.scala:126)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:285)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	... 21 common frames omitted
18:44:06.058 [IOManagerAsync shutdown hook] INFO org.apache.flink.runtime.io.disk.FileChannelManagerImpl - FileChannelManager removed spill file directory C:\Users\ADMINI~1\AppData\Local\Temp\flink-io-d082b78e-7493-40ec-b8ff-fc90c6732b2a
18:44:06.062 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (1/4) (e5733c10c04ba481354fe3d79af5ef6b) switched from DEPLOYING to CANCELING.
18:44:06.062 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (2/4) (22926cc493593a6a463e5f786cb2cd43) switched from DEPLOYING to CANCELING.
18:44:06.063 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (3/4) (4a02b51aa4dec540ee746206b06f785e) switched from DEPLOYING to CANCELING.
18:44:06.063 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (4/4) (80c7dad2388100174846eb73d5a4fbe9) switched from DEPLOYING to CANCELING.
18:44:06.063 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Discarding the results produced by task execution 3a1619b08946aa7586897c84bf1131ea.
18:44:06.063 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Discarding the results produced by task execution 3a1619b08946aa7586897c84bf1131ea.
18:44:06.063 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (1/4) (e5733c10c04ba481354fe3d79af5ef6b) switched from CANCELING to CANCELED.
18:44:06.063 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring transition of vertex Map -> Sink: Unnamed (1/4) - execution #86 to FAILED while being CANCELED.
18:44:06.064 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Releasing slot [SlotRequestId{e7ce722f0c66508acf6c6ea7a5713852}] because: Release multi task slot because all children have been released.
18:44:06.064 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Adding returned slot [1299eb96fe855b9d78f625318b25968f] to available slots
18:44:06.064 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Discarding the results produced by task execution e5733c10c04ba481354fe3d79af5ef6b.
18:44:06.064 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (2/4) (22926cc493593a6a463e5f786cb2cd43) switched from CANCELING to CANCELED.
18:44:06.064 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring transition of vertex Map -> Sink: Unnamed (2/4) - execution #86 to FAILED while being CANCELED.
18:44:06.064 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Releasing slot [SlotRequestId{2761215e5670bc554a8f4d405a61f4ae}] because: Release multi task slot because all children have been released.
18:44:06.064 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Adding returned slot [d2e43c266e45fbb8540f8880c5788bb7] to available slots
18:44:06.064 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Discarding the results produced by task execution 22926cc493593a6a463e5f786cb2cd43.
18:44:06.064 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (3/4) (4a02b51aa4dec540ee746206b06f785e) switched from CANCELING to CANCELED.
18:44:06.064 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring transition of vertex Map -> Sink: Unnamed (3/4) - execution #86 to FAILED while being CANCELED.
18:44:06.064 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Releasing slot [SlotRequestId{a58502798a0689a46dd943b8ecd6ee04}] because: Release multi task slot because all children have been released.
18:44:06.064 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Adding returned slot [e7657f78097e6f28e444f35fdc894b1f] to available slots
18:44:06.064 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Discarding the results produced by task execution 4a02b51aa4dec540ee746206b06f785e.
18:44:06.065 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (4/4) (80c7dad2388100174846eb73d5a4fbe9) switched from CANCELING to CANCELED.
18:44:06.065 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring transition of vertex Map -> Sink: Unnamed (4/4) - execution #86 to FAILED while being CANCELED.
18:44:06.065 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Releasing slot [SlotRequestId{2c9855ad47db7b18db00ad5b52837df4}] because: Release multi task slot because all children have been released.
18:44:06.065 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Adding returned slot [9a9df885ea71c266458b3e8f0b91680b] to available slots
18:44:06.065 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Try to restart or fail the job haha haha haha  (b2bddb8ca16ac234d435cdecc01659fa) if no longer possible.
java.util.concurrent.CompletionException: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871)
	at akka.dispatch.OnComplete.internal(Future.scala:263)
	at akka.dispatch.OnComplete.internal(Future.scala:261)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
	at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
	at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
	at akka.actor.ActorRef.tell(ActorRef.scala:126)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:285)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	... 21 common frames omitted
18:44:06.065 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job haha haha haha  (b2bddb8ca16ac234d435cdecc01659fa) switched from state FAILING to RESTARTING.
18:44:06.065 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Restarting the job haha haha haha  (b2bddb8ca16ac234d435cdecc01659fa).
18:44:06.066 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Discarding the results produced by task execution 80c7dad2388100174846eb73d5a4fbe9.
18:44:06.066 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Resetting execution vertex Source: Custom Source (1/1) for new execution.
18:44:06.066 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Resetting execution vertex Map -> Sink: Unnamed (1/4) for new execution.
18:44:06.066 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Resetting execution vertex Map -> Sink: Unnamed (2/4) for new execution.
18:44:06.066 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Resetting execution vertex Map -> Sink: Unnamed (3/4) for new execution.
18:44:06.066 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Resetting execution vertex Map -> Sink: Unnamed (4/4) for new execution.
18:44:06.066 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job haha haha haha  (b2bddb8ca16ac234d435cdecc01659fa) switched from state RESTARTING to CREATED.
18:44:06.066 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Status of the shared state registry of job b2bddb8ca16ac234d435cdecc01659fa after restore: SharedStateRegistry{registeredStates={}}.
18:44:06.066 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Resetting the master hooks.
18:44:06.066 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job haha haha haha  (b2bddb8ca16ac234d435cdecc01659fa) switched from state CREATED to RUNNING.
18:44:06.066 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (e509d37e69519c5126ce763f709b514f) switched from CREATED to SCHEDULED.
18:44:06.066 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl - Received slot request [SlotRequestId{36e8c23b1f554f12782ec6bd9fe7881f}] for task: Attempt #87 (Source: Custom Source (1/1)) @ (unassigned) - [SCHEDULED]
18:44:06.067 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager - Create multi task slot [SlotRequestId{6ed9dce9aa3d0b63ac39aa64adeeb50c}] in slot [SlotRequestId{a80a7d860657fd3f2408d910a1948f4f}].
18:44:06.067 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager - Create single task slot [SlotRequestId{36e8c23b1f554f12782ec6bd9fe7881f}] in multi task slot [SlotRequestId{6ed9dce9aa3d0b63ac39aa64adeeb50c}] for group bc764cd8ddf7a0cff126f51c16239658.
18:44:06.067 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (1/4) (32c647da54b68216287e3af7897f1fe8) switched from CREATED to SCHEDULED.
18:44:06.067 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl - Received slot request [SlotRequestId{60d1d48b8d82f3a460984e1d71c511af}] for task: Attempt #87 (Map -> Sink: Unnamed (1/4)) @ (unassigned) - [SCHEDULED]
18:44:06.067 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager - Create single task slot [SlotRequestId{60d1d48b8d82f3a460984e1d71c511af}] in multi task slot [SlotRequestId{6ed9dce9aa3d0b63ac39aa64adeeb50c}] for group 20ba6b65f97481d5570070de90e4e791.
18:44:06.067 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (2/4) (f101383519c2b52acac6ce60d8335a89) switched from CREATED to SCHEDULED.
18:44:06.067 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl - Received slot request [SlotRequestId{fc96b58a7e13b5caaf0cb6568550cecc}] for task: Attempt #87 (Map -> Sink: Unnamed (2/4)) @ (unassigned) - [SCHEDULED]
18:44:06.068 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager - Create multi task slot [SlotRequestId{9d4d30268f9725d6fcfefc6a01c2c182}] in slot [SlotRequestId{eda37c4f67dea8ff5a341c6048fdd896}].
18:44:06.068 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager - Create single task slot [SlotRequestId{fc96b58a7e13b5caaf0cb6568550cecc}] in multi task slot [SlotRequestId{9d4d30268f9725d6fcfefc6a01c2c182}] for group 20ba6b65f97481d5570070de90e4e791.
18:44:06.068 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (3/4) (a4887b0224bde5500b7d1f3db22e2186) switched from CREATED to SCHEDULED.
18:44:06.068 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl - Received slot request [SlotRequestId{4de428154f254c8e5786987bb4a0bc29}] for task: Attempt #87 (Map -> Sink: Unnamed (3/4)) @ (unassigned) - [SCHEDULED]
18:44:06.068 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager - Create multi task slot [SlotRequestId{5ea2907ae0aeca543dab8e8460a1a8ef}] in slot [SlotRequestId{a0e448e422666b9692c336a9270ecf72}].
18:44:06.068 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager - Create single task slot [SlotRequestId{4de428154f254c8e5786987bb4a0bc29}] in multi task slot [SlotRequestId{5ea2907ae0aeca543dab8e8460a1a8ef}] for group 20ba6b65f97481d5570070de90e4e791.
18:44:06.068 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (4/4) (7c1942c8daac0ed36e33b5901eb67537) switched from CREATED to SCHEDULED.
18:44:06.068 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl - Received slot request [SlotRequestId{1df78274ae56dc2181d919d23373b187}] for task: Attempt #87 (Map -> Sink: Unnamed (4/4)) @ (unassigned) - [SCHEDULED]
18:44:06.068 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager - Create multi task slot [SlotRequestId{4761a4a490ae2e10e1e677d9459747d0}] in slot [SlotRequestId{1a4b49f10dba1f9827543c80b2e74856}].
18:44:06.068 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager - Create single task slot [SlotRequestId{1df78274ae56dc2181d919d23373b187}] in multi task slot [SlotRequestId{4761a4a490ae2e10e1e677d9459747d0}] for group 20ba6b65f97481d5570070de90e4e791.
18:44:06.068 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (e509d37e69519c5126ce763f709b514f) switched from SCHEDULED to DEPLOYING.
18:44:06.068 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source (1/1) (attempt #87) to 7bd8e6bd-ed0d-4db4-89a3-13dbdfe20ea3 @ 127.0.0.1 (dataPort=-1)
18:44:06.069 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (1/4) (32c647da54b68216287e3af7897f1fe8) switched from SCHEDULED to DEPLOYING.
18:44:06.069 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Map -> Sink: Unnamed (1/4) (attempt #87) to 7bd8e6bd-ed0d-4db4-89a3-13dbdfe20ea3 @ 127.0.0.1 (dataPort=-1)
18:44:06.069 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (2/4) (f101383519c2b52acac6ce60d8335a89) switched from SCHEDULED to DEPLOYING.
18:44:06.069 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Map -> Sink: Unnamed (2/4) (attempt #87) to 7bd8e6bd-ed0d-4db4-89a3-13dbdfe20ea3 @ 127.0.0.1 (dataPort=-1)
18:44:06.069 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (3/4) (a4887b0224bde5500b7d1f3db22e2186) switched from SCHEDULED to DEPLOYING.
18:44:06.069 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Map -> Sink: Unnamed (3/4) (attempt #87) to 7bd8e6bd-ed0d-4db4-89a3-13dbdfe20ea3 @ 127.0.0.1 (dataPort=-1)
18:44:06.069 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (4/4) (7c1942c8daac0ed36e33b5901eb67537) switched from SCHEDULED to DEPLOYING.
18:44:06.069 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Map -> Sink: Unnamed (4/4) (attempt #87) to 7bd8e6bd-ed0d-4db4-89a3-13dbdfe20ea3 @ 127.0.0.1 (dataPort=-1)
18:44:06.069 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution e5733c10c04ba481354fe3d79af5ef6b.
18:44:06.069 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution 22926cc493593a6a463e5f786cb2cd43.
18:44:06.069 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution 4a02b51aa4dec540ee746206b06f785e.
18:44:06.069 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution 80c7dad2388100174846eb73d5a4fbe9.
18:44:06.070 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution e5733c10c04ba481354fe3d79af5ef6b.
18:44:06.070 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution 22926cc493593a6a463e5f786cb2cd43.
18:44:06.070 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution 4a02b51aa4dec540ee746206b06f785e.
18:44:06.070 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution 80c7dad2388100174846eb73d5a4fbe9.
18:44:06.071 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution e5733c10c04ba481354fe3d79af5ef6b.
18:44:06.071 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution 22926cc493593a6a463e5f786cb2cd43.
18:44:06.071 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution 4a02b51aa4dec540ee746206b06f785e.
18:44:06.071 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution 80c7dad2388100174846eb73d5a4fbe9.
18:44:06.071 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution e5733c10c04ba481354fe3d79af5ef6b.
18:44:06.071 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution 22926cc493593a6a463e5f786cb2cd43.
18:44:06.072 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution 4a02b51aa4dec540ee746206b06f785e.
18:44:06.072 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution 80c7dad2388100174846eb73d5a4fbe9.
18:44:06.072 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring transition of vertex Map -> Sink: Unnamed (1/4) - execution #86 to FAILED while being CANCELED.
18:44:06.072 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring transition of vertex Map -> Sink: Unnamed (2/4) - execution #86 to FAILED while being CANCELED.
18:44:06.072 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring transition of vertex Map -> Sink: Unnamed (3/4) - execution #86 to FAILED while being CANCELED.
18:44:06.072 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring transition of vertex Map -> Sink: Unnamed (4/4) - execution #86 to FAILED while being CANCELED.
18:44:06.073 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
java.lang.reflect.InvocationTargetException: null
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	... 25 common frames omitted
18:44:06.076 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
java.lang.reflect.InvocationTargetException: null
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	... 25 common frames omitted
18:44:06.077 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
java.lang.reflect.InvocationTargetException: null
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	... 25 common frames omitted
18:44:06.078 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
java.lang.reflect.InvocationTargetException: null
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	... 25 common frames omitted
18:44:06.080 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
java.lang.reflect.InvocationTargetException: null
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
	at akka.actor.Actor$class.aroundReceive(Actor.scala)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.
	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547)
	... 25 common frames omitted

Process finished with exit code 130

目测 还行,至少离目标的距离缩短了,至少反馈打文件了哈;

对于   org.apache.kafka.common.config.ConfigException: Invalid value true for configuration auto.offset.reset: Expected value to be a string

处理方式 :  properties.put("auto.offset.reset", "latest");

我理解这个参数错误,要用最新的数据,而不是    properties.put("auto.offset.reset", "true");;

对于那个报什么文件已经存在,很简单,直接改代码;

singleOutputStreamOperator.writeAsText("D:\\temp\\flink.txt", FileSystem.WriteMode.OVERWRITE);

嚯嚯。。应该会好了吧。。。。最后那几个异常,估计是忘记打开Flink服务了,打开本地服务,继续。再没看到类似异常;

  • 六 其他异常
19:11:44.470 [Kafka 0.10 Fetcher for Source: Custom Source (1/1)] ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-2, groupId=com.kafkaStudy] Offset commit failed on partition userTest-0 at offset 2500: The coordinator is not aware of this member.
19:11:44.474 [Kafka 0.10 Fetcher for Source: Custom Source (1/1)] WARN org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - Committing offsets to Kafka failed. This does not compromise Flink‘s checkpoints.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:900)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:840)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:978)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:958)
	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:578)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:539)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:267)
19:11:44.474 [Kafka 0.10 Fetcher for Source: Custom Source (1/1)] WARN org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 failed async Kafka commit.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:900)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:840)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:978)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:958)
	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:578)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:539)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:267)

头皮麻啊,翻看半天资料。

于是我试试  https://www.jianshu.com/p/271f88f06eb3

反复调配如下参数:

#如果‘enable.auto.commit‘为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=60000
#心跳与消费者协调员之间的预期时间(以毫秒为单位),默认值为3000
spring.kafka.consumer.heartbeat-interval=3000
#一次调用poll()操作时返回的最大记录数,默认值为500
spring.kafka.consumer.max-poll-records=600

呵呵,无论怎么样调整,故障依旧,最后我仔细看这段:

kafkaConsumer调用一次轮询方法只是拉取一次消息。客户端为了不断拉取消息,会用一个外部循环不断调用消费者的轮询方法。每次轮询到消息,在处理完这一批消息后,才会继续下一次轮询。但如果一次轮询返回的结构没办法及时处理完成,会有什么后果呢?服务端约定了和客户端max.poll.interval.ms,两次poll最大间隔。如果客户端处理一批消息花费的时间超过了这个限制时间,服务端可能就会把消费者客户端移除掉,并触发rebalance。

原来这样,我因为图省事(也是对Kafka原理不明晰),就把获取的值,第一页数据推入Kafka,很快就完成了;

而拉取的部分,执行时间肯定是在它后面

而根据上端文章反馈,Flink结合Kafka的运行机制就是,不停的运算,计算两次poll时间差,而我只推给Kafka一次数据,早就完成了。

所以运行时,Kafka认为已经 rebalance 了,异常出现;

于是,我在 Kafka 制造端,羞羞的加上一个循环,就是本文一开始的那个我最不屑的无限循环,不停的推入数据进Kafka。。。结果成功,异常不再,数据打印到本地了。

我勒个去,原来Flink喜欢“不要停”啊。

明天再接着调代码,脱掉那个“循环” ,一定要让Flink爽到位。

原文地址:https://www.cnblogs.com/alexgl2008/p/12391131.html

时间: 2024-10-11 09:14:33

整合 KAFKA+Flink 实例(第一部分,趟坑记录)的相关文章

开发者如何快速搭建本地 Kubernetes 集群?Minikube趟坑记录

1.背景 为啥要在本地搭建 Kubernetes 集群?因为开发者可以在本地快速验证自己实现的功能,接口.众所周知,由于 Kubernetes 部署较为复杂,使得广大开发者和运维人员学习和试用 Kubernetes 的门槛很高,光是部署一套 Kubernetes 集群,就需要部署大量的组件,花费精力较大.为了降低用户体验 Kubernetes 的门槛,Minikube 项目应运而生,它是 Github 上的一个开源项目,提供了一键安装的 Kubernetes 本地集群,支持 MacOS,Linu

spring整合kafka项目生产和消费测试结果记录(一)

使用spring+springMVC+mybatis+kafka做了两个web项目,一个是生产者,一个是消费者. 通过JMeter测试工具模拟100个用户并发访问生产者项目,发送json数据给生产者的接口,生产者将json数据发送到kafka集群, 消费者监听到kafka集群中的消息就开始消费,并将json解析成对象存到MySQL数据库. 下面是使用JMeter测试工具模拟100个并发的线程设置截图: 请求所发送的数据: 下面是100个用户10000个请求的聚合报告: 下面是生产者截图生产完10

整合Kafka到Spark Streaming——代码示例和挑战

作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管.本文,Michael详细的演示了如何将Kafka整合到Spark Streaming中. 期间, Michael还提到了将Kafka整合到 Spark Streaming中的一些现状,非常值得阅读,虽然有一些信息在Spark 1.2版本中已发生了一些变化,比如HA策略: 通过Spark Contributor.Spark布道者陈超我

activiti自定义流程之Spring整合activiti-modeler5.16实例(二):创建流程模型

注:(1)环境搭建:activiti自定义流程之Spring整合activiti-modeler5.16实例(一):环境搭建 1.maven导包,这里就没有什么多的好说了,直接代码: [html] view plain copy <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11&

storm 整合 kafka之保存MySQL数据库

整合Kafka+Storm,消息通过各种方式进入到Kafka消息中间件,比如通过使用Flume来收集的日志数据,然后暂由Kafka中的路由暂存,然后在由实时计算程序Storm做实时分析,这时候我们需要讲Storm中的Spout中读取Kafka中的消息,然后交由具体的Bolt组件分析处理.实际上在 apache-storm-0.9.3这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依赖配置. 1.配置Maven依赖包 [ht

SpringBoot整合Kafka和Storm

前言 本篇文章主要介绍的是SpringBoot整合kafka和storm以及在这过程遇到的一些问题和解决方案. kafka和storm的相关知识 如果你对kafka和storm熟悉的话,这一段可以直接跳过!如果不熟,也可以看看我之前写的博客.一些相关博客如下. kafka 和 storm的环境安装 地址:http://www.panchengming.com/2018/01/26/pancm70/ kafka的相关使用 地址:http://www.panchengming.com/2018/01

SpringCloud+MyBatis+Redis整合—— 超详细实例(二)

2.SpringCloud+MyBatis+Redis redis是一种nosql数据库,以键值对<key,value>的形式存储数据,其速度相比于MySQL之类的数据库,相当于内存读写与硬盘读写的差别,所以常常用作缓存,用于少写多读的场景下,直接从缓存拿数据比从数据库(数据库要I/O操作)拿要快得多. 话不多说,接下来紧接上一章<SpringCloud+MyBatis+Redis整合-- 超详细实例(一)>搭建SpringCloud+MyBatis+Redis环境: 第一步:在p

Spark 系列(十六)—— Spark Streaming 整合 Kafka

一.版本说明 Spark 针对 Kafka 的不同版本,提供了两套整合方案:spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10,其主要区别如下: spark-streaming-kafka-0-8 spark-streaming-kafka-0-10 Kafka 版本 0.8.2.1 or higher 0.10.0 or higher AP 状态 Deprecated从 Spark 2.3.0 版本开始,Kafka 0.8 支持已被弃用

Spark Streaming整合Kafka

0)摘要 主要介绍了Spark Streaming整合Kafka,两种整合方式:Receiver-based和Direct方式.这里使用的是Kafka broker version 0.8.2.1,官方文档地址:(http://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html). 1)Kafka准备 启动zookeeper ./zkServer.sh start 启动kafka ./kafka-server-star