Skip to content

Commit 52ab0ba

Browse files
feat: Add a publisher cache for the pubsub lite sink (#18)
* publishers refactoring tests * test improvements * respond to comments * respond to comments
1 parent 5e1bb41 commit 52ab0ba

File tree

5 files changed

+316
-0
lines changed

5 files changed

+316
-0
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.flink.sink;
17+
18+
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
19+
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
20+
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
21+
22+
import com.google.cloud.pubsublite.AdminClient;
23+
import com.google.cloud.pubsublite.AdminClientSettings;
24+
import com.google.cloud.pubsublite.CloudRegion;
25+
import com.google.cloud.pubsublite.MessageMetadata;
26+
import com.google.cloud.pubsublite.Partition;
27+
import com.google.cloud.pubsublite.TopicPath;
28+
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
29+
import com.google.cloud.pubsublite.internal.Publisher;
30+
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
31+
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
32+
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
33+
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
34+
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
35+
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
36+
37+
public class PerServerPublisherCache {
38+
static PublisherCache<PublisherOptions> cache =
39+
new PublisherCache<>(PerServerPublisherCache::newPublisher);
40+
41+
private static PublisherServiceClient newServiceClient(TopicPath topic, Partition partition) {
42+
PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder();
43+
settingsBuilder =
44+
addDefaultMetadata(
45+
PubsubContext.of(PubsubContext.Framework.of("FLINK")),
46+
RoutingMetadata.of(topic, partition),
47+
settingsBuilder);
48+
try {
49+
return PublisherServiceClient.create(
50+
addDefaultSettings(topic.location().region(), settingsBuilder));
51+
} catch (Throwable t) {
52+
throw toCanonical(t).underlying;
53+
}
54+
}
55+
56+
private static AdminClient getAdminClient(CloudRegion region) {
57+
return AdminClient.create(AdminClientSettings.newBuilder().setRegion(region).build());
58+
}
59+
60+
private static Publisher<MessageMetadata> newPublisher(PublisherOptions options) {
61+
return PartitionCountWatchingPublisherSettings.newBuilder()
62+
.setTopic(options.topicPath())
63+
.setPublisherFactory(
64+
partition ->
65+
SinglePartitionPublisherBuilder.newBuilder()
66+
.setTopic(options.topicPath())
67+
.setPartition(partition)
68+
.setServiceClient(newServiceClient(options.topicPath(), partition))
69+
.setBatchingSettings(PublisherSettings.DEFAULT_BATCHING_SETTINGS)
70+
.build())
71+
.setAdminClient(getAdminClient(options.topicPath().location().region()))
72+
.build()
73+
.instantiate();
74+
}
75+
76+
public static Publisher<MessageMetadata> getOrCreate(PublisherOptions options) {
77+
return cache.get(options);
78+
}
79+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.flink.sink;
18+
19+
import com.google.api.core.ApiService.Listener;
20+
import com.google.api.core.ApiService.State;
21+
import com.google.api.gax.rpc.ApiException;
22+
import com.google.cloud.pubsublite.MessageMetadata;
23+
import com.google.cloud.pubsublite.internal.Publisher;
24+
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
25+
import com.google.common.annotations.VisibleForTesting;
26+
import com.google.errorprone.annotations.concurrent.GuardedBy;
27+
import java.util.HashMap;
28+
29+
/** A map of working publishers by PublisherOptions. */
30+
class PublisherCache<T> implements AutoCloseable {
31+
interface PublisherFactory<T> {
32+
Publisher<MessageMetadata> New(T options);
33+
}
34+
35+
@GuardedBy("this")
36+
private final HashMap<T, Publisher<MessageMetadata>> livePublishers = new HashMap<>();
37+
38+
private final PublisherFactory<T> factory;
39+
40+
PublisherCache(PublisherFactory<T> factory) {
41+
this.factory = factory;
42+
}
43+
44+
private synchronized void evict(T options) {
45+
livePublishers.remove(options);
46+
}
47+
48+
synchronized Publisher<MessageMetadata> get(T options) throws ApiException {
49+
Publisher<MessageMetadata> publisher = livePublishers.get(options);
50+
if (publisher != null) {
51+
return publisher;
52+
}
53+
publisher = factory.New(options);
54+
livePublishers.put(options, publisher);
55+
publisher.addListener(
56+
new Listener() {
57+
@Override
58+
public void failed(State s, Throwable t) {
59+
evict(options);
60+
}
61+
},
62+
SystemExecutors.getAlarmExecutor());
63+
publisher.startAsync().awaitRunning();
64+
return publisher;
65+
}
66+
67+
@VisibleForTesting
68+
synchronized void set(T options, Publisher<MessageMetadata> toCache) {
69+
livePublishers.put(options, toCache);
70+
}
71+
72+
@Override
73+
public synchronized void close() {
74+
livePublishers.forEach(((options, publisher) -> publisher.stopAsync()));
75+
livePublishers.clear();
76+
}
77+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.flink.sink;
17+
18+
import com.google.auto.value.AutoValue;
19+
import com.google.cloud.pubsublite.TopicPath;
20+
21+
@AutoValue
22+
public abstract class PublisherOptions {
23+
24+
public abstract TopicPath topicPath();
25+
26+
public static PublisherOptions create(TopicPath path) {
27+
return new AutoValue_PublisherOptions(path);
28+
}
29+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.flink.sink;
17+
18+
import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.exampleTopicPath;
19+
import static com.google.common.truth.Truth.assertThat;
20+
21+
import com.google.cloud.pubsublite.MessageMetadata;
22+
import com.google.cloud.pubsublite.internal.Publisher;
23+
import org.junit.Test;
24+
import org.junit.runner.RunWith;
25+
import org.mockito.Mock;
26+
import org.mockito.junit.MockitoJUnitRunner;
27+
28+
@RunWith(MockitoJUnitRunner.class)
29+
public class PerServerPublisherCacheTest {
30+
31+
@Mock Publisher<MessageMetadata> publisher;
32+
33+
@Test
34+
public void testCachedOptions() {
35+
PublisherOptions options = PublisherOptions.create(exampleTopicPath());
36+
PerServerPublisherCache.cache.set(options, publisher);
37+
assertThat(PerServerPublisherCache.getOrCreate(options)).isEqualTo(publisher);
38+
}
39+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.flink.sink;
17+
18+
import static com.google.common.truth.Truth.assertThat;
19+
import static org.mockito.Mockito.*;
20+
import static org.mockito.Mockito.times;
21+
import static org.mockito.Mockito.verify;
22+
import static org.mockito.Mockito.when;
23+
24+
import com.google.api.core.ApiService.State;
25+
import com.google.cloud.pubsublite.MessageMetadata;
26+
import com.google.cloud.pubsublite.internal.Publisher;
27+
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
import org.junit.runner.RunWith;
31+
import org.mockito.Mock;
32+
import org.mockito.junit.MockitoJUnitRunner;
33+
34+
@RunWith(MockitoJUnitRunner.class)
35+
public class PublisherCacheTest {
36+
abstract static class FakePublisher extends FakeApiService
37+
implements Publisher<MessageMetadata> {}
38+
39+
@Mock PublisherCache.PublisherFactory<String> mockFactory;
40+
PublisherCache<String> cache;
41+
42+
@Before
43+
public void setUp() {
44+
cache = new PublisherCache<>(mockFactory);
45+
}
46+
47+
@Test
48+
public void testPublisherStarted() {
49+
FakePublisher pub = spy(FakePublisher.class);
50+
when(mockFactory.New("key")).thenReturn(pub);
51+
assertThat(cache.get("key")).isEqualTo(pub);
52+
assertThat(pub.state()).isEqualTo(State.RUNNING);
53+
}
54+
55+
@Test
56+
public void testPublisherCached() {
57+
FakePublisher pub = spy(FakePublisher.class);
58+
when(mockFactory.New("key")).thenReturn(pub);
59+
assertThat(cache.get("key")).isEqualTo(pub);
60+
assertThat(cache.get("key")).isEqualTo(pub);
61+
verify(mockFactory, times(1)).New("key");
62+
}
63+
64+
@Test
65+
public void testFailedPublisherEvicted() throws InterruptedException {
66+
FakePublisher pub1 = spy(FakePublisher.class);
67+
FakePublisher pub2 = spy(FakePublisher.class);
68+
when(mockFactory.New("key")).thenReturn(pub1).thenReturn(pub2);
69+
assertThat(cache.get("key")).isEqualTo(pub1);
70+
pub1.fail(new RuntimeException("failure"));
71+
while (cache.get("key").equals(pub1)) {
72+
Thread.sleep(100);
73+
}
74+
assertThat(cache.get("key")).isEqualTo(pub2);
75+
}
76+
77+
@Test
78+
public void testClose() {
79+
FakePublisher pub1 = spy(FakePublisher.class);
80+
when(mockFactory.New("key")).thenReturn(pub1);
81+
assertThat(cache.get("key")).isEqualTo(pub1);
82+
cache.close();
83+
verify(pub1).stopAsync();
84+
}
85+
86+
@Test
87+
public void testSet() {
88+
FakePublisher pub = spy(FakePublisher.class);
89+
cache.set("key", pub);
90+
assertThat(cache.get("key")).isEqualTo(pub);
91+
}
92+
}

0 commit comments

Comments
 (0)