Skip to content

Commit e4649d5

Browse files
feat: Add the checkpoint and serializer for the split enumerator (#12)
* SplitEnumerator checkpoint * fixes
1 parent 8b63ec4 commit e4649d5

File tree

3 files changed

+100
-0
lines changed

3 files changed

+100
-0
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.flink.enumerator;
17+
18+
import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
19+
import java.io.IOException;
20+
import org.apache.flink.core.io.SimpleVersionedSerializer;
21+
22+
public class SplitEnumeratorCheckpointSerializer
23+
implements SimpleVersionedSerializer<SplitEnumeratorCheckpoint> {
24+
@Override
25+
public int getVersion() {
26+
return 0;
27+
}
28+
29+
@Override
30+
public byte[] serialize(SplitEnumeratorCheckpoint message) throws IOException {
31+
return message.toByteArray();
32+
}
33+
34+
@Override
35+
public SplitEnumeratorCheckpoint deserialize(int i, byte[] bytes) throws IOException {
36+
return SplitEnumeratorCheckpoint.parseFrom(bytes);
37+
}
38+
}

src/main/proto/checkpoints.proto

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,27 @@ message SubscriptionPartitionSplitProto {
3232
int64 partition = 2;
3333
// The cursor for this split.
3434
Cursor start = 3;
35+
}
36+
37+
message SplitEnumeratorCheckpoint {
38+
message Subtask {
39+
int32 id = 1;
40+
}
41+
message Discovery {
42+
// The subscription for which to discover splits.
43+
string subscription = 1;
44+
// The topic corresponding to the subscription.
45+
string topic = 2;
46+
}
47+
message Assignment {
48+
// The subtask this split is assigned to. Not present if the
49+
// split is unassigned.
50+
Subtask subtask = 1;
51+
// The split.
52+
SubscriptionPartitionSplitProto split = 2;
53+
}
54+
// State for split discovery.
55+
Discovery discovery = 1;
56+
// Discovered splits, and their assignment state.
57+
repeated Assignment assignments = 2;
3558
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.flink.enumerator;
17+
18+
import static com.google.common.truth.Truth.assertThat;
19+
20+
import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
21+
import java.io.IOException;
22+
import org.junit.Test;
23+
import org.junit.runner.RunWith;
24+
import org.mockito.junit.MockitoJUnitRunner;
25+
26+
@RunWith(MockitoJUnitRunner.class)
27+
public class SplitEnumeratorCheckpointSerializerTest {
28+
29+
@Test
30+
public void testSerialization() throws IOException {
31+
SplitEnumeratorCheckpoint proto =
32+
SplitEnumeratorCheckpoint.newBuilder()
33+
.setDiscovery(SplitEnumeratorCheckpoint.Discovery.newBuilder().setSubscription("sub"))
34+
.build();
35+
SplitEnumeratorCheckpointSerializer serializer = new SplitEnumeratorCheckpointSerializer();
36+
assertThat(serializer.deserialize(serializer.getVersion(), serializer.serialize(proto)))
37+
.isEqualTo(proto);
38+
}
39+
}

0 commit comments

Comments
 (0)