Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Flink native k8s application mode recovery failed from S3(s3p) savepoint #3013

Open
3 tasks done
lukeyan2023 opened this issue Sep 2, 2023 · 6 comments
Open
3 tasks done
Labels
bug Something isn't working

Comments

@lukeyan2023
Copy link

lukeyan2023 commented Sep 2, 2023

Search before asking

  • I had searched in the issues and found no similar issues.

Java Version

1.8

Scala Version

2.12.x

StreamPark Version

2.1.1

Flink Version

1.17.1

deploy mode

kubernetes-application

What happened

Flink failed to recover from savepoints that automatically saved by streampark, Through reviewing the logs, it was found that the value of the savepoint submitted during streampark's recovery of the flash is s3p://lakehouse/flink/sp/Platform-Link-Test-Security Log/savepoint-2b3ed0-f0c7ba51791f .
By checking the logs of the Flink app, it was found that Flink encountered an error when restoring from savepoint s3p://lakehouse/flink/sp/Platform-Link-Test-Security-Log/savepoint-2b3ed0-f0c7ba51791f .
Afterwards, manually submitting using the same savepoint s3p://lakehouse/flink/sp/Platform-Link-Test-Security Log/savepoint-2b3ed0-f0c7ba51791f through common cli encountered the same error

However, by modifying the savepoint format to s3p://lakehouse/flink/sp/Platform-Flink-Test-Security-Log/savepoint-2b3ed0-f0c7ba51791f/_metadata, both common cli and streampark submissions can be successful.

Error Exception

The error log of Flink is as follows:

2023-09-02 02:57:38,909 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No checkpoint found during restore.
2023-09-02 02:57:38,912 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Starting job ff31d8dfb89d0a2e3fdf618617af15bc from savepoint s3p://lakehouse/flink/sp/Platform-Flink-Test-Security-Log/savepoint-2b3ed0-f0c7ba51791f ()
2023-09-02 02:57:39,360 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job ff31d8dfb89d0a2e3fdf618617af15bc reached terminal state FAILED.
org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
	at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.io.EOFException
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
	... 3 more
Caused by: java.lang.RuntimeException: java.io.EOFException
	at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
	... 3 more
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.flink.runtime.checkpoint.Checkpoints.loadCheckpointMetadata(Checkpoints.java:113)
	at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:149)
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1849)
	at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:223)
	at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:198)
	at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365)
	at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:210)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:136)
	at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
	at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:371)
	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:348)
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
	... 4 more
2023-09-02 02:57:39,395 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job ff31d8dfb89d0a2e3fdf618617af15bc has been registered for cleanup in the JobResultStore after reaching a terminal state.
2023-09-02 02:57:40,108 WARN  org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application failed unexpectedly: 
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_382]
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[flink-dist-1.17.1.jar:1.17.1]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_382]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_382]
	at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_382]
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_382]
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_382]
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_382]
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
	... 13 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute sql
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
	... 12 more
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:938) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
	at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
	at com.entercom.china.itsdsi.security.iceberg.ProbeLogToIceberg.main(ProbeLogToIceberg.java:1389) ~[?:?]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_382]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_382]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_382]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_382]
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
	... 12 more
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'Platform-Flink-Test-security-Log'.
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:189) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) ~[?:?]
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:921) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
	at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
	at com.entercom.china.itsdsi.security.iceberg.ProbeLogToIceberg.main(ProbeLogToIceberg.java:1389) ~[?:?]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_382]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_382]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_382]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_382]
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
	... 12 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
	at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) ~[flink-dist-1.17.1.jar:1.17.1]
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_382]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
	at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist-1.17.1.jar:1.17.1]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) ~[?:1.8.0_382]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.io.EOFException
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) ~[?:1.8.0_382]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.lang.RuntimeException: java.io.EOFException
	at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114) ~[flink-dist-1.17.1.jar:1.17.1]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_382]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_382]
	at org.apache.flink.runtime.checkpoint.Checkpoints.loadCheckpointMetadata(Checkpoints.java:113) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:149) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1849) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:223) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:198) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:210) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:136) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:371) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:348) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist-1.17.1.jar:1.17.1]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_382]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
2023-09-02 02:57:40,132 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_382]
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[flink-dist-1.17.1.jar:1.17.1]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_382]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_382]
	at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_382]
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_382]
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_382]
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_382]
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
	... 13 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute sql
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
	... 12 more
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:938) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
	at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
	at com.entercom.china.itsdsi.security.iceberg.ProbeLogToIceberg.main(ProbeLogToIceberg.java:1389) ~[?:?]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_382]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_382]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_382]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_382]
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
	... 12 more
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'Platform-Flink-Test-security-Log'.
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:189) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) ~[?:?]
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:921) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
	at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
	at com.entercom.china.itsdsi.security.iceberg.ProbeLogToIceberg.main(ProbeLogToIceberg.java:1389) ~[?:?]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_382]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_382]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_382]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_382]
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
	... 12 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
	at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) ~[flink-dist-1.17.1.jar:1.17.1]
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_382]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
	at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist-1.17.1.jar:1.17.1]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) ~[?:1.8.0_382]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.io.EOFException
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_382]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) ~[?:1.8.0_382]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.lang.RuntimeException: java.io.EOFException
	at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114) ~[flink-dist-1.17.1.jar:1.17.1]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_382]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_382]
	at org.apache.flink.runtime.checkpoint.Checkpoints.loadCheckpointMetadata(Checkpoints.java:113) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:149) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1849) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:223) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:198) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:210) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:136) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:371) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:348) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist-1.17.1.jar:1.17.1]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_382]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
2023-09-02 02:57:40,145 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2023-09-02 02:57:40,145 INFO  org.apache.flink.runtime.blob.BlobServer                     [] - Stopped BLOB server at 0.0.0.0:6124

Screenshots

cf4c0507a1334673f11b8a73a2d5f7c

491a22b05e31e741d5215ed1d252a0d

Are you willing to submit PR?

  • Yes I am willing to submit a PR!(您是否要贡献这个PR?)

Code of Conduct

@wolfboys
Copy link
Member

wolfboys commented Sep 2, 2023

Thank you for the detailed feedback. We will work quickly to identify and fix this bug. 💪🔧

@wolfboys wolfboys added the bug Something isn't working label Sep 2, 2023
@lukeyan2023
Copy link
Author

Thank you for the detailed feedback. We will work quickly to identify and fix this bug. 💪🔧

image

image

The above two screenshots are from the Flink 1.17.1 official document

In actual testing, the SP address in Figure 2 should use the format 2 in Figure 1 to function properly, but using the format 1 in Figure 1 will fail, which is also the reason for the current error.

By reviewing the source code, I understand that the process should be as follows:

First, trigger the generation of sp, then streampark will obtain the generated sp from Flink and write it to the database.
Second, when restoring from sp, it will obtain the sp parameters from the database.

So if we fix this issue, should we obtain the format 2 in Figure 1 when obtaining sp from Flink, so that the code in other places doesn't need to be changed?

@wolfboys
Copy link
Member

wolfboys commented Sep 2, 2023

Thank you for providing the information. We encourage you to fix this bug, how about it? 💪 We need to test and determine the savepoint path rules under different versions of Flink (1.12 ~ 1.17). We warmly welcome you to fix this bug. And we believe you can do it! 👍😊

@lukeyan2023
Copy link
Author

Thank you for providing the information. We encourage you to fix this bug, how about it? 💪 We need to test and determine the savepoint path rules under different versions of Flink (1.12 ~ 1.17). We warmly welcome you to fix this bug. And we believe you can do it! 👍😊

I am willing to fix this bug and am currently reading the relevant code, but due to my limited abilities, it may take some time

@lukeyan2023
Copy link
Author

image

By reviewing the relevant source code and Flink official documents, I believe that the correct savepoint format should be Format 1 in the screenshot

So I think this should be a problem with Flink, not Streampark。To prove this, I ran the following test

Flink Version 1.17.1

  1. Use HDFS to store the savepoint, and use the sp recovery task in format 1 as shown in the screenshot. Test result successful
  2. Use S3 to store the savepoint and select the s3a protocol, and use the sp restore task of format 1 in the screenshot. Test result successful
  3. Use S3 to store the savepoint and select the s3p protocol, and use the sp restore task of format 1 in the screenshot. test result failed
  4. Use S3 to store the savepoint and select the s3p protocol, and use the sp recovery task in format 2 in the screenshot. Test result successful

In summary, it should be that flink sp has unexpected behavior when using S3 storage and using the s3p protocol

If this is the design goal of flink, then maybe streampark needs to be optimized specifically for this scenario.
If this does not meet the design goals of flink, it seems that the BUG should be reported to the flink community

@wolfboys

@lukeyan2023 lukeyan2023 changed the title [Bug] Flink native k8s application mode recovery failed from S3 savepoint [Bug] Flink native k8s application mode recovery failed from S3(s3p) savepoint Sep 3, 2023
@wolfboys
Copy link
Member

wolfboys commented Sep 9, 2023

image

By reviewing the relevant source code and Flink official documents, I believe that the correct savepoint format should be Format 1 in the screenshot

So I think this should be a problem with Flink, not Streampark。To prove this, I ran the following test

Flink Version 1.17.1

  1. Use HDFS to store the savepoint, and use the sp recovery task in format 1 as shown in the screenshot. Test result successful
  2. Use S3 to store the savepoint and select the s3a protocol, and use the sp restore task of format 1 in the screenshot. Test result successful
  3. Use S3 to store the savepoint and select the s3p protocol, and use the sp restore task of format 1 in the screenshot. test result failed
  4. Use S3 to store the savepoint and select the s3p protocol, and use the sp recovery task in format 2 in the screenshot. Test result successful

In summary, it should be that flink sp has unexpected behavior when using S3 storage and using the s3p protocol

If this is the design goal of flink, then maybe streampark needs to be optimized specifically for this scenario. If this does not meet the design goals of flink, it seems that the BUG should be reported to the flink community

@wolfboys

Sorry for taking so long to get back here, based on your description, there is a preliminary suspicion that it might be a bug in Flink. We need further confirmation. If it is true, we can provide feedback to the Flink community.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants