Skip to content

Commit b68d2b4

Browse files
feat: Add SplitDiscovery for use in split enumerator (#14)
* SplitDiscovery updated * respond to comments * test fix and formattiong * fix comments
1 parent 2c407e9 commit b68d2b4

File tree

3 files changed

+326
-0
lines changed

3 files changed

+326
-0
lines changed
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.flink.enumerator;
17+
18+
import com.google.api.gax.rpc.ApiException;
19+
import com.google.cloud.pubsublite.*;
20+
import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
21+
import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplit;
22+
import com.google.cloud.pubsublite.internal.CursorClient;
23+
import com.google.cloud.pubsublite.internal.ExtractStatus;
24+
import java.util.ArrayList;
25+
import java.util.Collection;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.Set;
29+
import java.util.TreeSet;
30+
31+
public class SingleSubscriptionSplitDiscovery implements SplitDiscovery {
32+
private final AdminClient adminClient;
33+
private final CursorClient cursorClient;
34+
private final TopicPath topicPath;
35+
private final SubscriptionPath subscriptionPath;
36+
private long partitionCount;
37+
38+
private SingleSubscriptionSplitDiscovery(
39+
AdminClient adminClient,
40+
CursorClient cursorClient,
41+
TopicPath topicPath,
42+
SubscriptionPath subscriptionPath,
43+
long partitionCount) {
44+
this.adminClient = adminClient;
45+
this.cursorClient = cursorClient;
46+
this.topicPath = topicPath;
47+
this.subscriptionPath = subscriptionPath;
48+
this.partitionCount = partitionCount;
49+
}
50+
51+
static SingleSubscriptionSplitDiscovery create(
52+
AdminClient adminClient,
53+
CursorClient cursorClient,
54+
TopicPath topicPath,
55+
SubscriptionPath subscriptionPath) {
56+
return new SingleSubscriptionSplitDiscovery(
57+
adminClient, cursorClient, topicPath, subscriptionPath, 0L);
58+
}
59+
60+
static SingleSubscriptionSplitDiscovery fromCheckpoint(
61+
SplitEnumeratorCheckpoint.Discovery proto,
62+
Collection<SubscriptionPartitionSplit> currentSplits,
63+
AdminClient adminClient,
64+
CursorClient cursorClient) {
65+
SubscriptionPath subscriptionPath = SubscriptionPath.parse(proto.getSubscription());
66+
TopicPath topicPath = TopicPath.parse(proto.getTopic());
67+
Set<Long> partitions = new TreeSet<>();
68+
for (SubscriptionPartitionSplit s : currentSplits) {
69+
if (!s.subscriptionPath().equals(subscriptionPath)) {
70+
throw new IllegalStateException(
71+
"Split discovery configured with subscription "
72+
+ subscriptionPath
73+
+ " but current splits contains a split from subscription "
74+
+ s);
75+
}
76+
partitions.add(s.partition().value());
77+
}
78+
long partitionCount = partitions.size();
79+
for (long p = 0; p < partitions.size(); p++) {
80+
if (!partitions.contains(p)) {
81+
throw new IllegalStateException(
82+
"Split set is not continuous, missing split for partition " + p + " " + currentSplits);
83+
}
84+
}
85+
return new SingleSubscriptionSplitDiscovery(
86+
adminClient, cursorClient, topicPath, subscriptionPath, partitionCount);
87+
}
88+
89+
public synchronized List<SubscriptionPartitionSplit> discoverNewSplits() throws ApiException {
90+
try {
91+
List<SubscriptionPartitionSplit> newSplits = new ArrayList<>();
92+
long newPartitionCount = adminClient.getTopicPartitionCount(topicPath).get();
93+
if (newPartitionCount == partitionCount) {
94+
return newSplits;
95+
}
96+
Map<Partition, Offset> cursorMap = cursorClient.listPartitionCursors(subscriptionPath).get();
97+
for (long p = partitionCount; p < newPartitionCount; p++) {
98+
Partition partition = Partition.of(p);
99+
Offset offset = cursorMap.getOrDefault(partition, Offset.of(0));
100+
newSplits.add(SubscriptionPartitionSplit.create(subscriptionPath, partition, offset));
101+
}
102+
partitionCount = newPartitionCount;
103+
return newSplits;
104+
} catch (Throwable t) {
105+
throw ExtractStatus.toCanonical(t).underlying;
106+
}
107+
}
108+
109+
public synchronized SplitEnumeratorCheckpoint.Discovery checkpoint() {
110+
return SplitEnumeratorCheckpoint.Discovery.newBuilder()
111+
.setSubscription(subscriptionPath.toString())
112+
.setTopic(topicPath.toString())
113+
.build();
114+
}
115+
116+
@Override
117+
public synchronized void close() {
118+
try (AdminClient a = adminClient;
119+
CursorClient c = cursorClient) {}
120+
}
121+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.flink.enumerator;
17+
18+
import com.google.api.gax.rpc.ApiException;
19+
import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
20+
import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplit;
21+
import java.util.List;
22+
23+
interface SplitDiscovery extends AutoCloseable {
24+
List<SubscriptionPartitionSplit> discoverNewSplits() throws ApiException;
25+
26+
SplitEnumeratorCheckpoint.Discovery checkpoint();
27+
}
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.flink.enumerator;
17+
18+
import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.exampleSubscriptionPath;
19+
import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.exampleTopicPath;
20+
import static com.google.common.truth.Truth.assertThat;
21+
import static org.junit.Assert.assertThrows;
22+
import static org.mockito.Mockito.verify;
23+
import static org.mockito.Mockito.when;
24+
25+
import com.google.api.core.ApiFutures;
26+
import com.google.api.gax.rpc.ApiException;
27+
import com.google.api.gax.rpc.StatusCode;
28+
import com.google.cloud.pubsublite.AdminClient;
29+
import com.google.cloud.pubsublite.Offset;
30+
import com.google.cloud.pubsublite.Partition;
31+
import com.google.cloud.pubsublite.SubscriptionPath;
32+
import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
33+
import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplit;
34+
import com.google.cloud.pubsublite.internal.CheckedApiException;
35+
import com.google.cloud.pubsublite.internal.CursorClient;
36+
import com.google.common.collect.ImmutableList;
37+
import com.google.common.collect.ImmutableMap;
38+
import java.util.List;
39+
import org.junit.Before;
40+
import org.junit.Test;
41+
import org.junit.runner.RunWith;
42+
import org.mockito.Mock;
43+
import org.mockito.junit.MockitoJUnitRunner;
44+
45+
@RunWith(MockitoJUnitRunner.class)
46+
public class SingleSubscriptionSplitDiscoveryTest {
47+
48+
@Mock CursorClient mockCursorClient;
49+
@Mock AdminClient mockAdminClient;
50+
51+
SplitDiscovery discovery;
52+
53+
@Before
54+
public void setUp() {
55+
discovery =
56+
SingleSubscriptionSplitDiscovery.create(
57+
mockAdminClient, mockCursorClient, exampleTopicPath(), exampleSubscriptionPath());
58+
}
59+
60+
@Test
61+
public void testDiscovery() {
62+
when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
63+
.thenReturn(ApiFutures.immediateFuture(2L));
64+
when(mockCursorClient.listPartitionCursors(exampleSubscriptionPath()))
65+
.thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(1), Offset.of(2))));
66+
List<SubscriptionPartitionSplit> splits = discovery.discoverNewSplits();
67+
assertThat(splits)
68+
.containsExactly(
69+
SubscriptionPartitionSplit.create(
70+
exampleSubscriptionPath(), Partition.of(0), Offset.of(0)),
71+
SubscriptionPartitionSplit.create(
72+
exampleSubscriptionPath(), Partition.of(1), Offset.of(2)));
73+
}
74+
75+
@Test
76+
public void testDiscovery_Incremental() {
77+
when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
78+
.thenReturn(ApiFutures.immediateFuture(2L))
79+
.thenReturn(ApiFutures.immediateFuture(3L))
80+
.thenReturn(ApiFutures.immediateFuture(3L));
81+
when(mockCursorClient.listPartitionCursors(exampleSubscriptionPath()))
82+
.thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(1), Offset.of(2))));
83+
assertThat(discovery.discoverNewSplits()).hasSize(2);
84+
assertThat(discovery.discoverNewSplits()).hasSize(1);
85+
assertThat(discovery.discoverNewSplits()).hasSize(0);
86+
}
87+
88+
@Test
89+
public void testDiscovery_AdminFailure() {
90+
when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
91+
.thenReturn(
92+
ApiFutures.immediateFailedFuture(
93+
new CheckedApiException("", StatusCode.Code.INTERNAL)));
94+
assertThrows(ApiException.class, () -> discovery.discoverNewSplits());
95+
}
96+
97+
@Test
98+
public void testDiscovery_CursorFailure() {
99+
when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
100+
.thenReturn(ApiFutures.immediateFuture(2L));
101+
when(mockCursorClient.listPartitionCursors(exampleSubscriptionPath()))
102+
.thenReturn(
103+
ApiFutures.immediateFailedFuture(
104+
new CheckedApiException("", StatusCode.Code.INTERNAL)));
105+
assertThrows(ApiException.class, () -> discovery.discoverNewSplits());
106+
}
107+
108+
@Test
109+
public void testCheckpoint() {
110+
SplitEnumeratorCheckpoint.Discovery proto = discovery.checkpoint();
111+
assertThat(proto.getSubscription()).isEqualTo(exampleSubscriptionPath().toString());
112+
assertThat(proto.getTopic()).isEqualTo(exampleTopicPath().toString());
113+
}
114+
115+
@Test
116+
public void testCheckpointRestore() {
117+
SplitEnumeratorCheckpoint.Discovery proto = discovery.checkpoint();
118+
119+
List<SubscriptionPartitionSplit> splits =
120+
ImmutableList.of(
121+
SubscriptionPartitionSplit.create(
122+
exampleSubscriptionPath(), Partition.of(0), Offset.of(4)),
123+
SubscriptionPartitionSplit.create(
124+
exampleSubscriptionPath(), Partition.of(1), Offset.of(4)),
125+
SubscriptionPartitionSplit.create(
126+
exampleSubscriptionPath(), Partition.of(2), Offset.of(4)));
127+
SplitDiscovery restored =
128+
SingleSubscriptionSplitDiscovery.fromCheckpoint(
129+
proto, splits, mockAdminClient, mockCursorClient);
130+
131+
when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
132+
.thenReturn(ApiFutures.immediateFuture(4L));
133+
when(mockCursorClient.listPartitionCursors(exampleSubscriptionPath()))
134+
.thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(3), Offset.of(2))));
135+
assertThat(restored.discoverNewSplits()).hasSize(1);
136+
}
137+
138+
@Test
139+
public void testCheckpointRestore_SubscriptionMismatch() {
140+
SplitEnumeratorCheckpoint.Discovery proto = discovery.checkpoint();
141+
142+
List<SubscriptionPartitionSplit> splits =
143+
ImmutableList.of(
144+
SubscriptionPartitionSplit.create(
145+
SubscriptionPath.parse(exampleSubscriptionPath().toString() + "-other"),
146+
Partition.of(0),
147+
Offset.of(4)));
148+
assertThrows(
149+
IllegalStateException.class,
150+
() -> {
151+
SingleSubscriptionSplitDiscovery.fromCheckpoint(
152+
proto, splits, mockAdminClient, mockCursorClient);
153+
});
154+
}
155+
156+
@Test
157+
public void testCheckpointRestore_NonContinuousPartitions() {
158+
SplitEnumeratorCheckpoint.Discovery proto = discovery.checkpoint();
159+
160+
List<SubscriptionPartitionSplit> splits =
161+
ImmutableList.of(
162+
SubscriptionPartitionSplit.create(
163+
exampleSubscriptionPath(), Partition.of(1), Offset.of(4)));
164+
assertThrows(
165+
IllegalStateException.class,
166+
() -> {
167+
SingleSubscriptionSplitDiscovery.fromCheckpoint(
168+
proto, splits, mockAdminClient, mockCursorClient);
169+
});
170+
}
171+
172+
@Test
173+
public void testClose() throws Exception {
174+
discovery.close();
175+
verify(mockAdminClient).close();
176+
verify(mockCursorClient).close();
177+
}
178+
}

0 commit comments

Comments
 (0)