Skip to content

Commit 5e5caf3

Browse files
fix: Fix errors caused by client library upgrade (#50)
1 parent 56ce037 commit 5e5caf3

File tree

3 files changed

+15
-5
lines changed

3 files changed

+15
-5
lines changed

src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSourceSettings.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ private static SubscriberServiceClient newSubscriberServiceClient(
104104
RoutingMetadata.of(path, partition),
105105
settingsBuilder);
106106
return SubscriberServiceClient.create(
107-
addDefaultSettings(path.location().region(), settingsBuilder));
107+
addDefaultSettings(path.location().extractRegion(), settingsBuilder));
108108
} catch (Throwable t) {
109109
throw toCanonical(t).underlying;
110110
}
@@ -127,7 +127,9 @@ AdminClient getAdminClient() {
127127
return adminClientSupplier().get();
128128
}
129129
return AdminClient.create(
130-
AdminClientSettings.newBuilder().setRegion(subscriptionPath().location().region()).build());
130+
AdminClientSettings.newBuilder()
131+
.setRegion(subscriptionPath().location().extractRegion())
132+
.build());
131133
}
132134

133135
CursorClient getCursorClient() {
@@ -136,7 +138,7 @@ CursorClient getCursorClient() {
136138
}
137139
return CursorClient.create(
138140
CursorClientSettings.newBuilder()
139-
.setRegion(subscriptionPath().location().region())
141+
.setRegion(subscriptionPath().location().extractRegion())
140142
.build());
141143
}
142144

src/main/java/com/google/cloud/pubsublite/flink/internal/sink/PerServerPublisherCache.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ private static PublisherServiceClient newServiceClient(TopicPath topic, Partitio
4848
settingsBuilder);
4949
try {
5050
return PublisherServiceClient.create(
51-
addDefaultSettings(topic.location().region(), settingsBuilder));
51+
addDefaultSettings(topic.location().extractRegion(), settingsBuilder));
5252
} catch (Throwable t) {
5353
throw toCanonical(t).underlying;
5454
}
@@ -69,7 +69,7 @@ private static Publisher<MessageMetadata> newPublisher(PublisherOptions options)
6969
.setServiceClient(newServiceClient(options.topicPath(), partition))
7070
.setBatchingSettings(PublisherSettings.DEFAULT_BATCHING_SETTINGS)
7171
.build())
72-
.setAdminClient(getAdminClient(options.topicPath().location().region()))
72+
.setAdminClient(getAdminClient(options.topicPath().location().extractRegion()))
7373
.build()
7474
.instantiate();
7575
}

src/main/java/com/google/cloud/pubsublite/flink/internal/sink/PublisherCache.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,14 @@ public void failed(State s, Throwable t) {
6767
@VisibleForTesting
6868
public synchronized void set(T options, Publisher<MessageMetadata> toCache) {
6969
livePublishers.put(options, toCache);
70+
toCache.addListener(
71+
new Listener() {
72+
@Override
73+
public void failed(State s, Throwable t) {
74+
evict(options);
75+
}
76+
},
77+
SystemExecutors.getAlarmExecutor());
7078
}
7179

7280
@Override

0 commit comments

Comments
 (0)