Skip to content

Commit f9383e6

Browse files
kanyun-ruibinMartijnVisser
authored andcommitted
[FLINK-33863] Fix restoring compressed operator state
1 parent 9bb6fd3 commit f9383e6

File tree

2 files changed

+142
-2
lines changed

2 files changed

+142
-2
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,12 @@
3232
import javax.annotation.Nonnull;
3333

3434
import java.io.IOException;
35+
import java.util.ArrayList;
3536
import java.util.Collection;
37+
import java.util.Comparator;
3638
import java.util.List;
3739
import java.util.Map;
40+
import java.util.stream.Collectors;
3841

3942
/** Implementation of operator state restore operation. */
4043
public class OperatorStateRestoreOperation implements RestoreOperation<Void> {
@@ -168,9 +171,32 @@ public Void restore() throws Exception {
168171
}
169172
}
170173

174+
List<Map.Entry<String, OperatorStateHandle.StateMetaInfo>> entries =
175+
new ArrayList<>(stateHandle.getStateNameToPartitionOffsets().entrySet());
176+
177+
if (backendSerializationProxy.isUsingStateCompression()) {
178+
// sort state handles by offsets to avoid building SnappyFramedInputStream with
179+
// EOF stream.
180+
entries =
181+
entries.stream()
182+
.sorted(
183+
Comparator.comparingLong(
184+
entry -> {
185+
OperatorStateHandle.StateMetaInfo
186+
stateMetaInfo = entry.getValue();
187+
long[] offsets = stateMetaInfo.getOffsets();
188+
if (offsets == null
189+
|| offsets.length == 0) {
190+
return Long.MIN_VALUE;
191+
} else {
192+
return offsets[0];
193+
}
194+
}))
195+
.collect(Collectors.toList());
196+
}
197+
171198
// Restore all the states
172-
for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets :
173-
stateHandle.getStateNameToPartitionOffsets().entrySet()) {
199+
for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets : entries) {
174200

175201
final String stateName = nameToOffsets.getKey();
176202

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.state;
20+
21+
import org.apache.flink.api.common.ExecutionConfig;
22+
import org.apache.flink.api.common.state.BroadcastState;
23+
import org.apache.flink.api.common.state.ListStateDescriptor;
24+
import org.apache.flink.api.common.state.MapStateDescriptor;
25+
import org.apache.flink.core.fs.CloseableRegistry;
26+
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
27+
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
28+
29+
import org.junit.jupiter.api.Test;
30+
31+
import javax.annotation.Nullable;
32+
33+
import java.util.Arrays;
34+
import java.util.Collections;
35+
import java.util.HashMap;
36+
import java.util.List;
37+
38+
/** Tests for the {@link org.apache.flink.runtime.state.OperatorStateRestoreOperation}. */
39+
public class OperatorStateRestoreOperationTest {
40+
41+
@Nullable
42+
private static OperatorStateHandle createOperatorStateHandle(
43+
ExecutionConfig cfg,
44+
CloseableRegistry cancelStreamRegistry,
45+
ClassLoader classLoader,
46+
List<String> stateNames,
47+
List<String> broadcastStateNames)
48+
throws Exception {
49+
50+
try (OperatorStateBackend operatorStateBackend =
51+
new DefaultOperatorStateBackendBuilder(
52+
classLoader,
53+
cfg,
54+
false,
55+
Collections.emptyList(),
56+
cancelStreamRegistry)
57+
.build()) {
58+
CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
59+
60+
for (String stateName : stateNames) {
61+
ListStateDescriptor<String> descriptor =
62+
new ListStateDescriptor<>(stateName, String.class);
63+
PartitionableListState<String> state =
64+
(PartitionableListState<String>)
65+
operatorStateBackend.getListState(descriptor);
66+
state.add("value1");
67+
}
68+
69+
for (String broadcastStateName : broadcastStateNames) {
70+
MapStateDescriptor<String, String> descriptor =
71+
new MapStateDescriptor<>(broadcastStateName, String.class, String.class);
72+
BroadcastState<String, String> state =
73+
operatorStateBackend.getBroadcastState(descriptor);
74+
state.put("key1", "value1");
75+
}
76+
77+
SnapshotResult<OperatorStateHandle> result =
78+
operatorStateBackend
79+
.snapshot(
80+
1,
81+
1,
82+
streamFactory,
83+
CheckpointOptions.forCheckpointWithDefaultLocation())
84+
.get();
85+
return result.getJobManagerOwnedSnapshot();
86+
}
87+
}
88+
89+
@Test
90+
public void testRestoringMixedOperatorStateWhenSnapshotCompressionIsEnabled() throws Exception {
91+
ExecutionConfig cfg = new ExecutionConfig();
92+
cfg.setUseSnapshotCompression(true);
93+
CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
94+
ClassLoader classLoader = this.getClass().getClassLoader();
95+
96+
OperatorStateHandle handle =
97+
createOperatorStateHandle(
98+
cfg,
99+
cancelStreamRegistry,
100+
classLoader,
101+
Arrays.asList("s1", "s2"),
102+
Collections.singletonList("b2"));
103+
104+
OperatorStateRestoreOperation operatorStateRestoreOperation =
105+
new OperatorStateRestoreOperation(
106+
cancelStreamRegistry,
107+
classLoader,
108+
new HashMap<>(),
109+
new HashMap<>(),
110+
Collections.singletonList(handle));
111+
112+
operatorStateRestoreOperation.restore();
113+
}
114+
}

0 commit comments

Comments
 (0)