天天看点

Flink 问题总结

1.local运行报错

1. Memory manager has been shut down.(org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.)

现象:在执行过程中,一开始流处理都可以执行,但是几秒后就停住了,经过debug和日志分析,Mini Cluster被停了。

但是相同的代码、依赖同学又可以执行,所以是环境问题,但具体又不知道是哪儿。

原因:jdk版本问题,同样是jdk1.8的小版本之间的差异也存在,我的是1.8.0_25

更换1.8.0_162解决!!!

没有导日志包时报错:

[Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1)] ERROR org.apache.flink.runtime.taskmanager.Task - FATAL - exception in resource cleanup of task Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1) (5794d243fa9118faec953fa5ef3499c2).
java.lang.IllegalStateException: Memory manager has been shut down.
	at org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:821)
	at java.lang.Thread.run(Thread.java:745)
[Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1)] ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor - FATAL - exception in resource cleanup of task Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1) (5794d243fa9118faec953fa5ef3499c2).
java.lang.IllegalStateException: Memory manager has been shut down.
	at org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:821)
	at java.lang.Thread.run(Thread.java:745)

           

开启info报错日志

[main] INFO org.apache.flink.runtime.minicluster.MiniCluster - Shutting down Flink Mini Cluster
[main] INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Shutting down rest endpoint.
[flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopping TaskExecutor akka://flink/user/taskmanager_0.
[flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager.
[flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1) (5794d243fa9118faec953fa5ef3499c2) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)
	at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
	at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)
	at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
	at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
	at akka.actor.ActorCell.terminate(ActorCell.scala:374)
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job WindowTest (018d08d18bbbca742a9eeabcb2dcdd07) switched from state RUNNING to FAILING.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)
	at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
	at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)
	at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
	at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
	at akka.actor.ActorCell.terminate(ActorCell.scala:374)
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager removed spill file directory C:\Users\zhoujie\AppData\Local\Temp\flink-io-d025a7f2-95de-47e8-b13a-cf8abbe67ccf
[flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.io.network.NetworkEnvironment - Shutting down the network environment and its components.
[flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Window(SlidingEventTimeWindows(10000, 5000), EventTimeTrigger, ScalaReduceFunction, PassThroughWindowFunction) -> Map -> Sink: Unnamed (1/1) (3863b23bd83cd0e8fd6b38f7ed543d5c) switched from RUNNING to CANCELING.
[flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Window(SlidingEventTimeWindows(10000, 5000), EventTimeTrigger, ScalaReduceFunction, PassThroughWindowFunction) -> Map -> Sink: Unnamed (1/1) (3863b23bd83cd0e8fd6b38f7ed543d5c) switched from CANCELING to CANCELED.
[flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Try to restart or fail the job WindowTest (018d08d18bbbca742a9eeabcb2dcdd07) if no longer possible.
[flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job WindowTest (018d08d18bbbca742a9eeabcb2dcdd07) switched from state FAILING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)
	at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
	at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)
	at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
	at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
	at akka.actor.ActorCell.terminate(ActorCell.scala:374)
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[ForkJoinPool.commonPool-worker-9] INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Removing cache directory C:\Users\zhoujie\AppData\Local\Temp\flink-web-ui
[flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Could not restart the job WindowTest (018d08d18bbbca742a9eeabcb2dcdd07) because the restart strategy prevented it.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)
	at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
	at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)
	at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
	at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
	at akka.actor.ActorCell.terminate(ActorCell.scala:374)
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[ForkJoinPool.commonPool-worker-9] INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Shut down complete.
[flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job 018d08d18bbbca742a9eeabcb2dcdd07.
[flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Shutting down
[flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Closing the SlotManager.
[flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Suspending the SlotManager.
[flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Stop job leader service.
[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection a60ed1ebdf3f91a90bb6f837a2956c32: ResourceManager leader changed to new address null.
[flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopped TaskExecutor akka://flink/user/taskmanager_0.
[Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1) (5794d243fa9118faec953fa5ef3499c2) switched from RUNNING to FAILED.
java.lang.RuntimeException: Buffer pool is destroyed.
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:67)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:244)
	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:236)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:229)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:149)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:101)
	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
	... 34 more
[Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1) (5794d243fa9118faec953fa5ef3499c2).
[Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1)] ERROR org.apache.flink.runtime.taskmanager.Task - FATAL - exception in resource cleanup of task Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1) (5794d243fa9118faec953fa5ef3499c2).
java.lang.IllegalStateException: Memory manager has been shut down.
	at org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:821)
	at java.lang.Thread.run(Thread.java:745)
[Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1)] ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor - FATAL - exception in resource cleanup of task Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1) (5794d243fa9118faec953fa5ef3499c2).
java.lang.IllegalStateException: Memory manager has been shut down.
	at org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:821)
	at java.lang.Thread.run(Thread.java:745)

           

继续阅读