Skip to content

Commit 8bc5058

Browse files
feat: Add a limit to the number of bytes that can be outstanding to a publisher (#54)
* sink outstanding limit * feat: Add a limit to the number of outstanding bytes for a publisher * address comments
1 parent 5e5caf3 commit 8bc5058

File tree

7 files changed

+82
-8
lines changed

7 files changed

+82
-8
lines changed

src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSink.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ public synchronized void open(Configuration parameters) throws Exception {
6363
publisher =
6464
new SerializingPublisher<>(
6565
new MessagePublisher(
66-
PerServerPublisherCache.getOrCreate(settings.getPublisherConfig())),
66+
PerServerPublisherCache.getOrCreate(settings.getPublisherConfig()),
67+
settings.maxBytesOutstanding()),
6768
settings.serializationSchema());
6869
}
6970

src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSinkSettings.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@
2323

2424
@AutoValue
2525
public abstract class PubsubLiteSinkSettings<InputT> implements Serializable {
26+
public static final int DEFAULT_MAX_BYTES_OUTSTANDING = 100 * 1024 * 1024;
2627
// Create a builder which will accept messages of type InputT and serialize them using the
2728
// provided serialization schema.
2829
public static <InputT> Builder<InputT> builder(PubsubLiteSerializationSchema<InputT> schema) {
29-
return new AutoValue_PubsubLiteSinkSettings.Builder<InputT>().setSerializationSchema(schema);
30+
return new AutoValue_PubsubLiteSinkSettings.Builder<InputT>()
31+
.setMaxBytesOutstanding(DEFAULT_MAX_BYTES_OUTSTANDING)
32+
.setSerializationSchema(schema);
3033
}
3134

3235
// Create a sink which will accept already serialized pubsub messages/
@@ -37,6 +40,9 @@ public static Builder<Message> messagesBuilder() {
3740
// Required. The path of the topic to publish messages to.
3841
public abstract TopicPath topicPath();
3942

43+
// Optional. The maximum number of bytes a sink task may have outstanding.
44+
public abstract Integer maxBytesOutstanding();
45+
4046
// Internal.
4147
abstract PubsubLiteSerializationSchema<InputT> serializationSchema();
4248

@@ -49,6 +55,9 @@ abstract static class Builder<InputT> {
4955
// Required.
5056
public abstract Builder<InputT> setTopicPath(TopicPath value);
5157

58+
// Optional.
59+
public abstract Builder<InputT> setMaxBytesOutstanding(Integer value);
60+
5261
// Internal.
5362
abstract Builder<InputT> setSerializationSchema(PubsubLiteSerializationSchema<InputT> value);
5463

src/main/java/com/google/cloud/pubsublite/flink/internal/sink/BulkWaitPublisher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
// Thread-compatible.
2121
public interface BulkWaitPublisher<T> {
2222

23-
void publish(T message);
23+
void publish(T message) throws InterruptedException;
2424

2525
void waitUntilNoOutstandingPublishes() throws CheckedApiException;
2626
}

src/main/java/com/google/cloud/pubsublite/flink/internal/sink/MessagePublisher.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,47 @@
2222
import com.google.cloud.pubsublite.internal.CheckedApiException;
2323
import com.google.cloud.pubsublite.internal.ExtractStatus;
2424
import com.google.cloud.pubsublite.internal.Publisher;
25+
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
2526
import java.util.ArrayList;
2627
import java.util.List;
28+
import java.util.concurrent.Semaphore;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
2731

2832
public class MessagePublisher implements BulkWaitPublisher<Message> {
33+
private static final Logger LOG = LoggerFactory.getLogger(MessagePublisher.class);
2934
private final Publisher<MessageMetadata> publisher;
3035
private final List<ApiFuture<MessageMetadata>> publishes;
3136

32-
public MessagePublisher(Publisher<MessageMetadata> publisher) {
37+
private final int maxBytesOutstanding;
38+
private final Semaphore bytesOutstanding;
39+
40+
public MessagePublisher(Publisher<MessageMetadata> publisher, int maxBytesOutstanding) {
3341
this.publisher = publisher;
3442
this.publishes = new ArrayList<>();
43+
this.maxBytesOutstanding = maxBytesOutstanding;
44+
this.bytesOutstanding = new Semaphore(maxBytesOutstanding);
45+
}
46+
47+
private int getAccountedSize(Message message) {
48+
long size = message.toProto().getSerializedSize();
49+
if (size > maxBytesOutstanding) {
50+
return maxBytesOutstanding;
51+
}
52+
return Math.toIntExact(size);
3553
}
3654

3755
@Override
38-
public void publish(Message message) {
39-
publishes.add(publisher.publish(message));
56+
public void publish(Message message) throws InterruptedException {
57+
final int size = getAccountedSize(message);
58+
if (!bytesOutstanding.tryAcquire()) {
59+
LOG.warn(
60+
"Publisher flow controlled due to too many bytes (>{}) outstanding", maxBytesOutstanding);
61+
bytesOutstanding.acquire(size);
62+
}
63+
ApiFuture<MessageMetadata> future = publisher.publish(message);
64+
future.addListener(() -> bytesOutstanding.release(size), SystemExecutors.getAlarmExecutor());
65+
publishes.add(future);
4066
}
4167

4268
@Override

src/main/java/com/google/cloud/pubsublite/flink/internal/sink/PublisherCache.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,13 @@
2525
import com.google.common.annotations.VisibleForTesting;
2626
import com.google.errorprone.annotations.concurrent.GuardedBy;
2727
import java.util.HashMap;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
2830

2931
/** A map of working publishers by PublisherOptions. */
3032
public class PublisherCache<T> implements AutoCloseable {
33+
private static final Logger LOG = LoggerFactory.getLogger(PublisherCache.class);
34+
3135
interface PublisherFactory<T> {
3236
Publisher<MessageMetadata> New(T options);
3337
}
@@ -46,6 +50,7 @@ private synchronized void evict(T options) {
4650
}
4751

4852
public synchronized Publisher<MessageMetadata> get(T options) throws ApiException {
53+
LOG.info("Requesting a publisher for options {}", options);
4954
Publisher<MessageMetadata> publisher = livePublishers.get(options);
5055
if (publisher != null) {
5156
return publisher;
@@ -55,11 +60,13 @@ public synchronized Publisher<MessageMetadata> get(T options) throws ApiExceptio
5560
new Listener() {
5661
@Override
5762
public void failed(State s, Throwable t) {
63+
LOG.error("Publisher for options {} failed with exception", options, t);
5864
evict(options);
5965
}
6066
},
6167
SystemExecutors.getAlarmExecutor());
6268
publisher.startAsync().awaitRunning();
69+
LOG.info("Successfully started publisher for options {}", options);
6370
livePublishers.put(options, publisher);
6471
return publisher;
6572
}
@@ -71,6 +78,7 @@ public synchronized void set(T options, Publisher<MessageMetadata> toCache) {
7178
new Listener() {
7279
@Override
7380
public void failed(State s, Throwable t) {
81+
LOG.error("Publisher for options {} failed with exception", options, t);
7482
evict(options);
7583
}
7684
},

src/main/java/com/google/cloud/pubsublite/flink/internal/sink/SerializingPublisher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public SerializingPublisher(
3232
}
3333

3434
@Override
35-
public void publish(Tuple<T, Instant> message) {
35+
public void publish(Tuple<T, Instant> message) throws InterruptedException {
3636
inner.publish(schema.serialize(message.x(), message.y()));
3737
}
3838

src/test/java/com/google/cloud/pubsublite/flink/internal/sink/MessagePublisherTest.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.google.cloud.pubsublite.internal.CheckedApiException;
3131
import com.google.cloud.pubsublite.internal.Publisher;
3232
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
33+
import com.google.protobuf.ByteString;
3334
import java.util.concurrent.Executors;
3435
import java.util.concurrent.Future;
3536
import org.junit.Before;
@@ -48,7 +49,7 @@ abstract static class FakePublisher extends FakeApiService
4849

4950
@Before
5051
public void setUp() {
51-
messagePublisher = new MessagePublisher(fakeInnerPublisher);
52+
messagePublisher = new MessagePublisher(fakeInnerPublisher, 1);
5253
}
5354

5455
@Test
@@ -105,4 +106,33 @@ public void testCheckpointWithOutstandingPublish() throws Exception {
105106
future.set(MessageMetadata.of(examplePartition(), exampleOffset()));
106107
checkpointFuture.get();
107108
}
109+
110+
@Test
111+
public void testPublishesOfMaximumSizeSerialized() throws Exception {
112+
Message message1 = Message.builder().setData(ByteString.copyFromUtf8("one")).build();
113+
Message message2 = Message.builder().setData(ByteString.copyFromUtf8("two")).build();
114+
SettableApiFuture<MessageMetadata> firstPublish = SettableApiFuture.create();
115+
when(fakeInnerPublisher.publish(message1)).thenReturn(firstPublish);
116+
when(fakeInnerPublisher.publish(message2))
117+
.thenReturn(
118+
ApiFutures.immediateFuture(MessageMetadata.of(examplePartition(), exampleOffset())));
119+
messagePublisher.publish(message1);
120+
verify(fakeInnerPublisher).publish(message1);
121+
122+
Future<?> secondPublish =
123+
Executors.newSingleThreadExecutor()
124+
.submit(
125+
() -> {
126+
try {
127+
messagePublisher.publish(message2);
128+
} catch (Exception e) {
129+
throw new RuntimeException(e);
130+
}
131+
});
132+
// Sleep for a short time so that the second publish could complete if it wasn't waiting.
133+
Thread.sleep(50);
134+
assertThat(secondPublish.isDone()).isFalse();
135+
firstPublish.set(MessageMetadata.of(examplePartition(), exampleOffset()));
136+
secondPublish.get();
137+
}
108138
}

0 commit comments

Comments
 (0)