Open
Description
Thanks for you work, it seems SlidingWindow doesn't support restore state from CheckPoint
2023-10-07 16:22:31
java.io.IOException: Exception while applying AggregateFunction in aggregating state
at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:404)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:942)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:578)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Serializer does not support named field positions.
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyNameBased(RowSerializer.java:173)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:144)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.transform(CopyOnWriteStateMap.java:375)
at org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:211)
at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
... 14 more
I guess it is caused by we created an name-based
row in aggregation function
but flink row serializer doesn't support name-based row.
Metadata
Metadata
Assignees
Labels
No labels