Skip to content

Commit

Permalink
pubsub: expose bundler.BufferedByteLimit to topic.PublishSettings
Browse files Browse the repository at this point in the history
Fixes googleapis#1440

Change-Id: Icc2e818320b26dad15920b8299c2dc29ee779166
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/41650
Reviewed-by: Jean de Klerk <deklerk@google.com>
Reviewed-by: kokoro <noreply+kokoro@google.com>
  • Loading branch information
hongalex authored and jeanbza committed Jun 6, 2019
1 parent c8433c9 commit ad9d9cb
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 7 deletions.
17 changes: 10 additions & 7 deletions pubsub/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ type PublishSettings struct {

// The maximum time that the client will attempt to publish a bundle of messages.
Timeout time.Duration

// The maximum number of bytes that the Bundler will keep in memory before
// returning ErrOverflow.
BufferedByteLimit int
}

// DefaultPublishSettings holds the default values for topics' PublishSettings.
Expand All @@ -90,6 +94,10 @@ var DefaultPublishSettings = PublishSettings{
CountThreshold: 100,
ByteThreshold: 1e6,
Timeout: 60 * time.Second,
// By default, limit the bundler to 10 times the max message size. The number 10 is
// chosen as a reasonable amount of messages in the worst case whilst still
// capping the number to a low enough value to not OOM users.
BufferedByteLimit: 10 * MaxPublishRequestBytes,
}

// CreateTopic creates a new topic.
Expand Down Expand Up @@ -331,9 +339,6 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {

// TODO(jba) [from bcmills] consider using a shared channel per bundle
// (requires Bundler API changes; would reduce allocations)
// The call to Add should never return an error because the bundler's
// BufferedByteLimit is set to maxInt; we do not perform any flow
// control in the client.
err := t.bundler.Add(&bundledMessage{msg, r}, msg.size)
if err != nil {
r.set("", err)
Expand Down Expand Up @@ -426,10 +431,8 @@ func (t *Topic) initBundler() {
}
t.bundler.BundleByteThreshold = t.PublishSettings.ByteThreshold

// Limit the bundler to 10 times the max message size. The number 10 is
// chosen as a reasonable amount of messages in the worst case whilst still
// capping the number to a low enough value to not OOM users.
t.bundler.BufferedByteLimit = 10 * MaxPublishRequestBytes
t.bundler.BufferedByteLimit = t.PublishSettings.BufferedByteLimit

t.bundler.BundleByteLimit = MaxPublishRequestBytes
// Unless overridden, allow many goroutines per CPU to call the Publish RPC concurrently.
// The default value was determined via extensive load testing (see the loadtest subdirectory).
Expand Down
25 changes: 25 additions & 0 deletions pubsub/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package pubsub

import (
"bytes"
"context"
"fmt"
"testing"
Expand All @@ -23,6 +24,7 @@ import (
"cloud.google.com/go/internal/testutil"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/api/support/bundler"
pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -137,6 +139,29 @@ func TestPublishTimeout(t *testing.T) {
}
}

func TestPublishBufferedByteLimit(t *testing.T) {
ctx := context.Background()
client, srv := newFake(t)
defer client.Close()
defer srv.Close()

topic := mustCreateTopic(t, client, "topic-small-buffered-byte-limit")
defer topic.Stop()

// Test setting BufferedByteLimit to small number of bytes that should fail.
topic.PublishSettings.BufferedByteLimit = 100

const messageSizeBytes = 1000

msg := &Message{Data: bytes.Repeat([]byte{'A'}, int(messageSizeBytes))}
res := topic.Publish(ctx, msg)

_, err := res.Get(ctx)
if err != bundler.ErrOverflow {
t.Errorf("got %v, want ErrOverflow", err)
}
}

func TestUpdateTopic(t *testing.T) {
ctx := context.Background()
client, srv := newFake(t)
Expand Down

0 comments on commit ad9d9cb

Please sign in to comment.