Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.flink.enumerator;

import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
import java.io.IOException;
import org.apache.flink.core.io.SimpleVersionedSerializer;

public class SplitEnumeratorCheckpointSerializer
implements SimpleVersionedSerializer<SplitEnumeratorCheckpoint> {
@Override
public int getVersion() {
return 0;
}

@Override
public byte[] serialize(SplitEnumeratorCheckpoint message) throws IOException {
return message.toByteArray();
}

@Override
public SplitEnumeratorCheckpoint deserialize(int i, byte[] bytes) throws IOException {
return SplitEnumeratorCheckpoint.parseFrom(bytes);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mind just making one of these for protos? "BackwardsCompatibleProtoSerializer"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent a bit trying to do this. It sounds like the right thing to do, but the results weren't great.

We end up needing two deserializers anyway because SubscriptionPartitionSplit (the other type needing a serializer) isn't a proto, it's just serialized using a proto, so you end up with a serializer that takes a proto serializer and proxies.

Also type erased generics make the implementation awkward because you have to pass ProtoType::parseFrom into the generic proto serializer constructor as a lambda.

I think this is simpler as is

}
23 changes: 23 additions & 0 deletions src/main/proto/checkpoints.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,27 @@ message SubscriptionPartitionSplitProto {
int64 partition = 2;
// The cursor for this split.
Cursor start = 3;
}

message SplitEnumeratorCheckpoint {
message Subtask {
int32 id = 1;
}
message Discovery {
// The subscription for which to discover splits.
string subscription = 1;
// The topic corresponding to the subscription.
string topic = 2;
}
message Assignment {
// The subtask this split is assigned to. Not present if the
// split is unassigned.
Subtask subtask = 1;
// The split.
SubscriptionPartitionSplitProto split = 2;
}
// State for split discovery.
Discovery discovery = 1;
// Discovered splits, and their assignment state.
repeated Assignment assignments = 2;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.flink.enumerator;

import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
import java.io.IOException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class SplitEnumeratorCheckpointSerializerTest {

@Test
public void testSerialization() throws IOException {
SplitEnumeratorCheckpoint proto =
SplitEnumeratorCheckpoint.newBuilder()
.setDiscovery(SplitEnumeratorCheckpoint.Discovery.newBuilder().setSubscription("sub"))
.build();
SplitEnumeratorCheckpointSerializer serializer = new SplitEnumeratorCheckpointSerializer();
assertThat(serializer.deserialize(serializer.getVersion(), serializer.serialize(proto)))
.isEqualTo(proto);
}
}