diff --git a/pubsub/topic.go b/pubsub/topic.go index d1fea115168c..32aa0f58e70d 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -82,6 +82,10 @@ type PublishSettings struct { // The maximum time that the client will attempt to publish a bundle of messages. Timeout time.Duration + + // The maximum number of bytes that the Bundler will keep in memory before + // returning ErrOverflow. + BufferedByteLimit int } // DefaultPublishSettings holds the default values for topics' PublishSettings. @@ -90,6 +94,10 @@ var DefaultPublishSettings = PublishSettings{ CountThreshold: 100, ByteThreshold: 1e6, Timeout: 60 * time.Second, + // By default, limit the bundler to 10 times the max message size. The number 10 is + // chosen as a reasonable amount of messages in the worst case whilst still + // capping the number to a low enough value to not OOM users. + BufferedByteLimit: 10 * MaxPublishRequestBytes, } // CreateTopic creates a new topic. @@ -331,9 +339,6 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { // TODO(jba) [from bcmills] consider using a shared channel per bundle // (requires Bundler API changes; would reduce allocations) - // The call to Add should never return an error because the bundler's - // BufferedByteLimit is set to maxInt; we do not perform any flow - // control in the client. err := t.bundler.Add(&bundledMessage{msg, r}, msg.size) if err != nil { r.set("", err) @@ -426,10 +431,8 @@ func (t *Topic) initBundler() { } t.bundler.BundleByteThreshold = t.PublishSettings.ByteThreshold - // Limit the bundler to 10 times the max message size. The number 10 is - // chosen as a reasonable amount of messages in the worst case whilst still - // capping the number to a low enough value to not OOM users. - t.bundler.BufferedByteLimit = 10 * MaxPublishRequestBytes + t.bundler.BufferedByteLimit = t.PublishSettings.BufferedByteLimit + t.bundler.BundleByteLimit = MaxPublishRequestBytes // Unless overridden, allow many goroutines per CPU to call the Publish RPC concurrently. // The default value was determined via extensive load testing (see the loadtest subdirectory). diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go index 752f4811627f..43d3d3e728a3 100644 --- a/pubsub/topic_test.go +++ b/pubsub/topic_test.go @@ -15,6 +15,7 @@ package pubsub import ( + "bytes" "context" "fmt" "testing" @@ -23,6 +24,7 @@ import ( "cloud.google.com/go/internal/testutil" "google.golang.org/api/iterator" "google.golang.org/api/option" + "google.golang.org/api/support/bundler" pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -137,6 +139,29 @@ func TestPublishTimeout(t *testing.T) { } } +func TestPublishBufferedByteLimit(t *testing.T) { + ctx := context.Background() + client, srv := newFake(t) + defer client.Close() + defer srv.Close() + + topic := mustCreateTopic(t, client, "topic-small-buffered-byte-limit") + defer topic.Stop() + + // Test setting BufferedByteLimit to small number of bytes that should fail. + topic.PublishSettings.BufferedByteLimit = 100 + + const messageSizeBytes = 1000 + + msg := &Message{Data: bytes.Repeat([]byte{'A'}, int(messageSizeBytes))} + res := topic.Publish(ctx, msg) + + _, err := res.Get(ctx) + if err != bundler.ErrOverflow { + t.Errorf("got %v, want ErrOverflow", err) + } +} + func TestUpdateTopic(t *testing.T) { ctx := context.Background() client, srv := newFake(t)