Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): add publisher flow control support #4292

Merged
merged 26 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4854aa3
feat(pubsub): add publish flow control
hongalex Jan 7, 2021
b4789ca
add SignalError logic to flow controller
hongalex Jun 11, 2021
4cf48d8
feat(pubsub/pstest): add channel to support user-defined publish resp…
hongalex Jun 11, 2021
5db7ba7
resolve bad merge for proto library migration
hongalex Jun 12, 2021
0e002d8
resolve merge with updated flow controller
hongalex Jun 15, 2021
023a92a
Merge branch 'master' of ssh://github.com/googleapis/google-cloud-go …
hongalex Jun 15, 2021
8bfe09e
move flow control settings to struct
hongalex Jun 15, 2021
7cadb33
fix issue with max bundler size calculation
hongalex Jun 17, 2021
119e3fa
Merge branch 'master' of ssh://github.com/googleapis/google-cloud-go …
hongalex Jun 21, 2021
df39d40
add integration tests for publish flow control
hongalex Jun 22, 2021
3b5fd1f
improve docs
hongalex Jun 22, 2021
c3c0141
reduce diff for review
hongalex Jun 22, 2021
11d68fa
fix double counting issue with messages
hongalex Jun 24, 2021
7c39ff2
Merge branch 'master' into pubsub-publish-fc
hongalex Jul 1, 2021
e1ba281
fix exported variable comments
hongalex Jul 1, 2021
f935174
do not report oc metrics if flow controller is disabled
hongalex Jul 9, 2021
99e3d59
disable MaxOutstandingMessages by default
hongalex Jul 13, 2021
fd434ce
Merge branch 'master' into pubsub-publish-fc
hongalex Jul 13, 2021
2c4bd0a
Merge branch 'master' into pubsub-publish-fc
hongalex Jul 13, 2021
cbb147f
Merge branch 'master' of ssh://github.com/googleapis/google-cloud-go …
hongalex Jul 30, 2021
79b1cea
Merge branch 'master' into pubsub-publish-fc
hongalex Aug 26, 2021
30c46db
Merge branch 'master' into pubsub-publish-fc
hongalex Sep 1, 2021
324062b
override bufferedbytelimit
hongalex Sep 1, 2021
0d5c903
make default fc behavior ignore
hongalex Sep 3, 2021
796e7dd
Merge branch 'master' into pubsub-publish-fc
hongalex Sep 3, 2021
a7a7ec2
Merge branch 'master' into pubsub-publish-fc
hongalex Sep 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 99 additions & 57 deletions pubsub/flow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,51 @@ package pubsub

import (
"context"
"errors"
"sync/atomic"

"golang.org/x/sync/semaphore"
)

// flowController implements flow control for Subscription.Receive.
// LimitExceededBehavior configures the behavior that flowController can use in case
// the flow control limits are exceeded.
type LimitExceededBehavior int

const (
// FlowControlBlock signals to wait until the request can be made without exceeding the limit.
FlowControlBlock LimitExceededBehavior = iota
// FlowControlIgnore disables flow control.
FlowControlIgnore
// FlowControlSignalError signals an error to the caller of acquire.
FlowControlSignalError
)

// FlowControlSettings controls flow control for messages while publishing or subscribing.
type FlowControlSettings struct {
// MaxOutstandingMessages is the maximum number of bufered messages to be published.
// If less than or equal to zero, this is disabled.
MaxOutstandingMessages int

// MaxOutstandingBytes is the maximum size of buffered messages to be published.
// If less than or equal to zero, this is disabled.
MaxOutstandingBytes int

// LimitExceededBehavior configures the behavior when trying to publish
// additional messages while the flow controller is full. The available options
// include Block (default), Ignore (disable), and SignalError (publish
// results will return an error).
LimitExceededBehavior LimitExceededBehavior
}

var (
// ErrFlowControllerMaxOutstandingMessages indicates that outstanding messages exceeds MaxOutstandingMessages.
ErrFlowControllerMaxOutstandingMessages = errors.New("pubsub: MaxOutstandingMessages flow controller limit exceeded")

// ErrFlowControllerMaxOutstandingBytes indicates that outstanding bytes of messages exceeds MaxOutstandingBytes.
ErrFlowControllerMaxOutstandingBytes = errors.New("pubsub: MaxOutstandingBytes flow control limit exceeded")
)

// flowController implements flow control for publishing and subscribing.
type flowController struct {
maxCount int
maxSize int // max total size of messages
Expand All @@ -33,93 +72,94 @@ type flowController struct {
countRemaining int64
// Number of outstanding bytes remaining. Atomic.
bytesRemaining int64
limitBehavior LimitExceededBehavior
}

// newFlowController creates a new flowController that ensures no more than
// maxCount messages or maxSize bytes are outstanding at once. If maxCount or
// maxSize is < 1, then an unlimited number of messages or bytes is permitted,
// respectively.
func newFlowController(maxCount, maxSize int) *flowController {
fc := &flowController{
maxCount: maxCount,
maxSize: maxSize,
semCount: nil,
semSize: nil,
func newFlowController(fc FlowControlSettings) flowController {
f := flowController{
maxCount: fc.MaxOutstandingMessages,
maxSize: fc.MaxOutstandingBytes,
semCount: nil,
semSize: nil,
limitBehavior: fc.LimitExceededBehavior,
}
if maxCount > 0 {
fc.semCount = semaphore.NewWeighted(int64(maxCount))
if fc.MaxOutstandingMessages > 0 {
f.semCount = semaphore.NewWeighted(int64(fc.MaxOutstandingMessages))
}
if maxSize > 0 {
fc.semSize = semaphore.NewWeighted(int64(maxSize))
if fc.MaxOutstandingBytes > 0 {
f.semSize = semaphore.NewWeighted(int64(fc.MaxOutstandingBytes))
}
return fc
return f
}

// acquire blocks until one message of size bytes can proceed or ctx is done.
// It returns nil in the first case, or ctx.Err() in the second.
// acquire allocates space for a message: the message count and its size.
//
// acquire allows large messages to proceed by treating a size greater than maxSize
// In FlowControlSignalError mode, large messages greater than maxSize
// will be result in an error. In other modes, large messages will be treated
// as if it were equal to maxSize.
func (f *flowController) acquire(ctx context.Context, size int) error {
if f.semCount != nil {
if err := f.semCount.Acquire(ctx, 1); err != nil {
return err
switch f.limitBehavior {
case FlowControlIgnore:
return nil
case FlowControlBlock:
if f.semCount != nil {
if err := f.semCount.Acquire(ctx, 1); err != nil {
return err
}
}
}
if f.semSize != nil {
if err := f.semSize.Acquire(ctx, f.bound(size)); err != nil {
if f.semCount != nil {
f.semCount.Release(1)
if f.semSize != nil {
if err := f.semSize.Acquire(ctx, f.bound(size)); err != nil {
if f.semCount != nil {
f.semCount.Release(1)
}
return err
}
}
case FlowControlSignalError:
if f.semCount != nil {
if !f.semCount.TryAcquire(1) {
return ErrFlowControllerMaxOutstandingMessages
}
}
if f.semSize != nil {
// Try to acquire the full size of the message here.
if !f.semSize.TryAcquire(int64(size)) {
if f.semCount != nil {
f.semCount.Release(1)
}
return ErrFlowControllerMaxOutstandingBytes
}
return err
}
}
outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)

return nil
}

// tryAcquire returns false if acquire would block. Otherwise, it behaves like
// acquire and returns true.
//
// tryAcquire allows large messages to proceed by treating a size greater than
// maxSize as if it were equal to maxSize.
func (f *flowController) tryAcquire(ctx context.Context, size int) bool {
if f.semCount != nil {
if !f.semCount.TryAcquire(1) {
return false
}
outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
}
if f.semSize != nil {
if !f.semSize.TryAcquire(f.bound(size)) {
if f.semCount != nil {
f.semCount.Release(1)
}
return false
}
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)
}
outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)

return true
return nil
}

// release notes that one message of size bytes is no longer outstanding.
func (f *flowController) release(ctx context.Context, size int) {
outstandingMessages := atomic.AddInt64(&f.countRemaining, -1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)
if f.limitBehavior == FlowControlIgnore {
return
}

if f.semCount != nil {
outstandingMessages := atomic.AddInt64(&f.countRemaining, -1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
f.semCount.Release(1)
}
if f.semSize != nil {
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)
f.semSize.Release(f.bound(size))
}
}
Expand All @@ -131,6 +171,8 @@ func (f *flowController) bound(size int) int64 {
return int64(size)
}

// count returns the number of outstanding messages.
// if maxCount is 0, this will always return 0.
func (f *flowController) count() int {
return int(atomic.LoadInt64(&f.countRemaining))
}
69 changes: 28 additions & 41 deletions pubsub/flow_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,18 @@ import (
"golang.org/x/sync/errgroup"
)

func fcSettings(c int, s int, l LimitExceededBehavior) FlowControlSettings {
return FlowControlSettings{
MaxOutstandingMessages: c,
MaxOutstandingBytes: s,
LimitExceededBehavior: l,
}
}

func TestFlowControllerCancel(t *testing.T) {
// Test canceling a flow controller's context.
t.Parallel()
fc := newFlowController(3, 10)
fc := newFlowController(fcSettings(3, 10, FlowControlBlock))
if err := fc.acquire(context.Background(), 5); err != nil {
t.Fatal(err)
}
Expand All @@ -51,7 +59,7 @@ func TestFlowControllerCancel(t *testing.T) {
func TestFlowControllerLargeRequest(t *testing.T) {
// Large requests succeed, consuming the entire allotment.
t.Parallel()
fc := newFlowController(3, 10)
fc := newFlowController(fcSettings(3, 10, FlowControlBlock))
err := fc.acquire(context.Background(), 11)
if err != nil {
t.Fatal(err)
Expand All @@ -64,7 +72,7 @@ func TestFlowControllerNoStarve(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
fc := newFlowController(10, 10)
fc := newFlowController(fcSettings(10, 10, FlowControlBlock))
first := make(chan int)
for i := 0; i < 20; i++ {
go func() {
Expand Down Expand Up @@ -120,7 +128,7 @@ func TestFlowControllerSaturation(t *testing.T) {
wantSize: 9,
},
} {
fc := newFlowController(maxCount, maxSize)
fc := newFlowController(fcSettings(maxCount, maxSize, FlowControlBlock))
// Atomically track flow controller state.
// The flowController itself tracks count.
var curSize int64
Expand Down Expand Up @@ -174,60 +182,39 @@ func TestFlowControllerSaturation(t *testing.T) {
}
}

func TestFlowControllerTryAcquire(t *testing.T) {
t.Parallel()
fc := newFlowController(3, 10)
ctx := context.Background()

// Successfully tryAcquire 4 bytes.
if !fc.tryAcquire(ctx, 4) {
t.Error("got false, wanted true")
}

// Fail to tryAcquire 7 bytes.
if fc.tryAcquire(ctx, 7) {
t.Error("got true, wanted false")
}

// Successfully tryAcquire 6 byte.
if !fc.tryAcquire(ctx, 6) {
t.Error("got false, wanted true")
}
}

func TestFlowControllerUnboundedCount(t *testing.T) {
t.Parallel()
ctx := context.Background()
fc := newFlowController(0, 10)
fc := newFlowController(fcSettings(0, 10, FlowControlSignalError))

// Successfully acquire 4 bytes.
if err := fc.acquire(ctx, 4); err != nil {
t.Errorf("got %v, wanted no error", err)
}

// Successfully tryAcquire 4 bytes.
if !fc.tryAcquire(ctx, 4) {
t.Error("got false, wanted true")
// Successfully acquire 4 bytes.
if err := fc.acquire(ctx, 4); err != nil {
t.Errorf("got %v, wanted no error", err)
}

// Fail to tryAcquire 3 bytes.
if fc.tryAcquire(ctx, 3) {
t.Error("got true, wanted false")
// Fail to acquire 3 bytes.
if err := fc.acquire(ctx, 3); err == nil {
t.Errorf("got nil, wanted %v", ErrFlowControllerMaxOutstandingBytes)
}
}

func TestFlowControllerUnboundedCount2(t *testing.T) {
t.Parallel()
ctx := context.Background()
fc := newFlowController(0, 0)
fc := newFlowController(fcSettings(0, 0, FlowControlSignalError))
// Successfully acquire 4 bytes.
if err := fc.acquire(ctx, 4); err != nil {
t.Errorf("got %v, wanted no error", err)
}
fc.release(ctx, 1)
fc.release(ctx, 1)
fc.release(ctx, 1)
wantCount := int64(-2)
wantCount := int64(0)
c := int64(fc.count())
if c != wantCount {
t.Fatalf("got count %d, want %d", c, wantCount)
Expand All @@ -237,20 +224,20 @@ func TestFlowControllerUnboundedCount2(t *testing.T) {
func TestFlowControllerUnboundedBytes(t *testing.T) {
t.Parallel()
ctx := context.Background()
fc := newFlowController(2, 0)
fc := newFlowController(fcSettings(2, 0, FlowControlSignalError))

// Successfully acquire 4GB.
if err := fc.acquire(ctx, 4e9); err != nil {
t.Errorf("got %v, wanted no error", err)
}

// Successfully tryAcquire 4GB bytes.
if !fc.tryAcquire(ctx, 4e9) {
t.Error("got false, wanted true")
// Successfully acquired 4GB bytes.
if err := fc.acquire(ctx, 4e9); err != nil {
t.Errorf("got %v, wanted no error", err)
}

// Fail to tryAcquire a third message.
if fc.tryAcquire(ctx, 3) {
t.Error("got true, wanted false")
// Fail to acquire a third message.
if err := fc.acquire(ctx, 3); err == nil {
t.Errorf("got nil, wanted %v", ErrFlowControllerMaxOutstandingMessages)
}
}
1 change: 1 addition & 0 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ func TestIntegration_LargePublishSize(t *testing.T) {
msg := &Message{
Data: bytes.Repeat([]byte{'A'}, maxLengthSingleMessage),
}
topic.PublishSettings.FlowControlSettings.LimitExceededBehavior = FlowControlSignalError
r := topic.Publish(ctx, msg)
if _, err := r.Get(ctx); err != nil {
t.Fatalf("Failed to publish max length message: %v", err)
Expand Down
8 changes: 6 additions & 2 deletions pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,11 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
maxOutstandingBytes: maxBytes,
useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl,
}
fc := newFlowController(maxCount, maxBytes)
fc := newFlowController(FlowControlSettings{
MaxOutstandingMessages: maxCount,
MaxOutstandingBytes: maxBytes,
LimitExceededBehavior: FlowControlBlock,
})

sched := scheduler.NewReceiveScheduler(maxCount)

Expand Down Expand Up @@ -982,7 +986,7 @@ func (s *Subscription) checkOrdering() {
type pullOptions struct {
maxExtension time.Duration // the maximum time to extend a message's ack deadline in total
maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc
maxPrefetch int32
maxPrefetch int32 // the max number of outstanding messages, used to calculate maxToPull
// If true, use unary Pull instead of StreamingPull, and never pull more
// than maxPrefetch messages.
synchronous bool
Expand Down
Loading