-
Notifications
You must be signed in to change notification settings - Fork 73
Closed
Labels
affectVersion/0.1.0The bug affects the 0.1.0 version. The features are not needed.The bug affects the 0.1.0 version. The features are not needed.bug[Issue Type] Something isn't working as expected.[Issue Type] Something isn't working as expected.fixVersion/0.1.1The feature or bug should be implemented/fixed in the 0.1.1 version.The feature or bug should be implemented/fixed in the 0.1.1 version.fixVersion/0.2.0The feature or bug should be implemented/fixed in the 0.2.0 version.The feature or bug should be implemented/fixed in the 0.2.0 version.priority/majorDefault priority of the PR or issue.Default priority of the PR or issue.
Milestone
Description
Search before asking
- I searched in the issues and found nothing similar.
Description
PythonActionTask failed to serialize for Flink state.
Traceback (most recent call last):
File "/Users/sxnan/workspace/apache/flink-agents/python/flink_agents/e2e_tests/integrate_datastream_with_agent_example.py", line 84, in <module>
agents_env.execute()
File "/Users/sxnan/workspace/apache/flink-agents/python/flink_agents/runtime/remote_execution_environment.py", line 269, in execute
self.__env.execute()
File "/Users/sxnan/workspace/apache/flink-agents/python/.venv/lib/python3.11/site-packages/pyflink/datastream/stream_execution_environment.py", line 830, in execute
return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sxnan/workspace/apache/flink-agents/python/.venv/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
^^^^^^^^^^^^^^^^^
File "/Users/sxnan/workspace/apache/flink-agents/python/.venv/lib/python3.11/site-packages/pyflink/util/exceptions.py", line 146, in deco
return f(*a, **kw)
^^^^^^^^^^^
File "/Users/sxnan/workspace/apache/flink-agents/python/.venv/lib/python3.11/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o19.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079)
at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1315)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079)
at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47)
at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310)
at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307)
at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)
at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:624)
at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:33)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73)
at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:110)
at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59)
at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:61)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272)
at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265)
at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:788)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:765)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:272)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:233)
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:245)
... 5 more
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: jdk.internal.loader.ClassLoaders$AppClassLoader
Serialization trace:
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.flink.core.classloading.SubmoduleClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader)
contextClassLoader (pemja.core.PythonInterpreter$MainInterpreter$1)
thread (pemja.core.PythonInterpreter$MainInterpreter)
mainInterpreter (pemja.core.PythonInterpreter)
interpreter (org.apache.flink.agents.runtime.python.utils.PythonActionExecutor)
pythonActionExecutor (org.apache.flink.agents.runtime.python.operator.PythonActionTask)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:356)
at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:183)
at org.apache.flink.contrib.streaming.state.RocksDBListState.add(RocksDBListState.java:130)
at org.apache.flink.runtime.state.UserFacingListState.add(UserFacingListState.java:51)
at org.apache.flink.agents.runtime.operator.ActionExecutionOperator.processEvent(ActionExecutionOperator.java:325)
at org.apache.flink.agents.runtime.operator.ActionExecutionOperator.processElement(ActionExecutionOperator.java:293)
at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:64)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: jdk.internal.loader.ClassLoaders$AppClassLoader
at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48)
at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26)
at com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351)
at com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58)
at com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344)
at com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461)
at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:57)
... 59 more
Caused by: java.lang.reflect.InvocationTargetException
at jdk.internal.reflect.GeneratedConstructorAccessor24.newInstance(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35)
... 69 more
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field final jdk.internal.loader.URLClassPath jdk.internal.loader.ClassLoaders$AppClassLoader.ucp accessible: module java.base does not "opens jdk.internal.loader" to unnamed module @5c9c8b16
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:340)
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:280)
at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:176)
at java.base/java.lang.reflect.Field.setAccessible(Field.java:170)
at com.esotericsoftware.kryo.serializers.FieldSerializer.buildValidFields(FieldSerializer.java:282)
at com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:217)
at com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156)
at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133)
... 73 more
How to reproduce
Submit a Flink Agents job with checkpoint enabled.
Version and environment
Flink 1.20.3
Flink Agents 0.1.0
Are you willing to submit a PR?
- I'm willing to submit a PR!
Metadata
Metadata
Assignees
Labels
affectVersion/0.1.0The bug affects the 0.1.0 version. The features are not needed.The bug affects the 0.1.0 version. The features are not needed.bug[Issue Type] Something isn't working as expected.[Issue Type] Something isn't working as expected.fixVersion/0.1.1The feature or bug should be implemented/fixed in the 0.1.1 version.The feature or bug should be implemented/fixed in the 0.1.1 version.fixVersion/0.2.0The feature or bug should be implemented/fixed in the 0.2.0 version.The feature or bug should be implemented/fixed in the 0.2.0 version.priority/majorDefault priority of the PR or issue.Default priority of the PR or issue.