From c3c0141b988ba05df3e16551b72771d13cd5edd6 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Mon, 21 Jun 2021 22:36:46 -0700 Subject: [PATCH] reduce diff for review switch flowController to struct not pointer fix more comments fix more comments --- pubsub/flow_controller.go | 4 +- pubsub/flow_controller_test.go | 14 +-- pubsub/integration_test.go | 168 ++++++++++++++++----------------- pubsub/topic.go | 11 +-- 4 files changed, 96 insertions(+), 101 deletions(-) diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go index edc844fe7845..c779d83141a1 100644 --- a/pubsub/flow_controller.go +++ b/pubsub/flow_controller.go @@ -75,8 +75,8 @@ type flowController struct { // maxCount messages or maxSize bytes are outstanding at once. If maxCount or // maxSize is < 1, then an unlimited number of messages or bytes is permitted, // respectively. -func newFlowController(fc FlowControlSettings) *flowController { - f := &flowController{ +func newFlowController(fc FlowControlSettings) flowController { + f := flowController{ maxCount: fc.MaxOutstandingMessages, maxSize: fc.MaxOutstandingBytes, semCount: nil, diff --git a/pubsub/flow_controller_test.go b/pubsub/flow_controller_test.go index 0cf9038e0633..320a96a62c21 100644 --- a/pubsub/flow_controller_test.go +++ b/pubsub/flow_controller_test.go @@ -187,19 +187,19 @@ func TestFlowControllerTryAcquire(t *testing.T) { fc := newFlowController(fcSettings(3, 10, FlowControlSignalError)) ctx := context.Background() - // Successfully newAcquire 4 bytes. + // Successfully acquired 4 bytes. if err := fc.acquire(ctx, 4); err != nil { - t.Errorf("fc.newAcquire got err: %v", err) + t.Errorf("fc.acquired got err: %v", err) } - // Fail to newAcquire 7 bytes. + // Fail to acquire 7 bytes. if err := fc.acquire(ctx, 7); err == nil { t.Errorf("got nil, wanted err: %v", ErrFlowControllerMaxOutstandingBytes) } - // Successfully newAcquire 6 byte. + // Successfully acquired 6 byte. if err := fc.acquire(ctx, 6); err != nil { - t.Errorf("fc.newAcquire got err: %v", err) + t.Errorf("fc.acquired got err: %v", err) } } @@ -252,12 +252,12 @@ func TestFlowControllerUnboundedBytes(t *testing.T) { t.Errorf("got %v, wanted no error", err) } - // Successfully newAcquire 4GB bytes. + // Successfully acquired 4GB bytes. if err := fc.acquire(ctx, 4e9); err != nil { t.Errorf("got %v, wanted no error", err) } - // Fail to newAcquire a third message. + // Fail to acquire a third message. if err := fc.acquire(ctx, 3); err == nil { t.Errorf("got nil, wanted %v", ErrFlowControllerMaxOutstandingMessages) } diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index dc2cf60b336c..659e9f050f6b 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -224,96 +224,92 @@ func withGoogleClientInfo(ctx context.Context) context.Context { } func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronous bool, numMsgs, extraBytes int) { - t.Run(fmt.Sprintf("maxMsgs:%d,synchronous:%t,numMsgs:%d", maxMsgs, synchronous, numMsgs), - func(t *testing.T) { - t.Parallel() - ctx := context.Background() - topic, err := client.CreateTopic(ctx, topicIDs.New()) - if err != nil { - t.Errorf("CreateTopic error: %v", err) - } - defer topic.Stop() - exists, err := topic.Exists(ctx) - if err != nil { - t.Fatalf("TopicExists error: %v", err) - } - if !exists { - t.Errorf("topic %v should exist, but it doesn't", topic) - } + ctx := context.Background() + topic, err := client.CreateTopic(ctx, topicIDs.New()) + if err != nil { + t.Errorf("CreateTopic error: %v", err) + } + defer topic.Stop() + exists, err := topic.Exists(ctx) + if err != nil { + t.Fatalf("TopicExists error: %v", err) + } + if !exists { + t.Errorf("topic %v should exist, but it doesn't", topic) + } - var sub *Subscription - if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil { - t.Errorf("CreateSub error: %v", err) - } - exists, err = sub.Exists(ctx) - if err != nil { - t.Fatalf("SubExists error: %v", err) - } - if !exists { - t.Errorf("subscription %s should exist, but it doesn't", sub.ID()) - } - var msgs []*Message - for i := 0; i < numMsgs; i++ { - text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes)) - attrs := make(map[string]string) - attrs["foo"] = "bar" - msgs = append(msgs, &Message{ - Data: []byte(text), - Attributes: attrs, - }) - } + var sub *Subscription + if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil { + t.Errorf("CreateSub error: %v", err) + } + exists, err = sub.Exists(ctx) + if err != nil { + t.Fatalf("SubExists error: %v", err) + } + if !exists { + t.Errorf("subscription %s should exist, but it doesn't", sub.ID()) + } + var msgs []*Message + for i := 0; i < numMsgs; i++ { + text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes)) + attrs := make(map[string]string) + attrs["foo"] = "bar" + msgs = append(msgs, &Message{ + Data: []byte(text), + Attributes: attrs, + }) + } - // Publish some messages. - type pubResult struct { - m *Message - r *PublishResult - } - var rs []pubResult - for _, m := range msgs { - r := topic.Publish(ctx, m) - rs = append(rs, pubResult{m, r}) - } - want := make(map[string]messageData) - for _, res := range rs { - id, err := res.r.Get(ctx) - if err != nil { - t.Fatal(err) - } - md := extractMessageData(res.m) - md.ID = id - want[md.ID] = md - } + // Publish some messages. + type pubResult struct { + m *Message + r *PublishResult + } + var rs []pubResult + for _, m := range msgs { + r := topic.Publish(ctx, m) + rs = append(rs, pubResult{m, r}) + } + want := make(map[string]messageData) + for _, res := range rs { + id, err := res.r.Get(ctx) + if err != nil { + t.Fatal(err) + } + md := extractMessageData(res.m) + md.ID = id + want[md.ID] = md + } - sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs - sub.ReceiveSettings.Synchronous = synchronous - - // Use a timeout to ensure that Pull does not block indefinitely if there are - // unexpectedly few messages available. - now := time.Now() - timeoutCtx, cancel := context.WithTimeout(ctx, time.Minute) - defer cancel() - gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) { - m.Ack() - }) - if err != nil { - if c := status.Convert(err); c.Code() == codes.Canceled { - if time.Since(now) >= time.Minute { - t.Fatal("pullN took too long") - } - } else { - t.Fatalf("Pull: %v", err) - } - } - got := make(map[string]messageData) - for _, m := range gotMsgs { - md := extractMessageData(m) - got[md.ID] = md - } - if !testutil.Equal(got, want) { - t.Fatalf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v", - maxMsgs, synchronous, got, want) + sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs + sub.ReceiveSettings.Synchronous = synchronous + + // Use a timeout to ensure that Pull does not block indefinitely if there are + // unexpectedly few messages available. + now := time.Now() + timeoutCtx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) { + m.Ack() + }) + if err != nil { + if c := status.Convert(err); c.Code() == codes.Canceled { + if time.Since(now) >= time.Minute { + t.Fatal("pullN took too long") } - }) + } else { + t.Fatalf("Pull: %v", err) + } + } + got := make(map[string]messageData) + for _, m := range gotMsgs { + md := extractMessageData(m) + got[md.ID] = md + } + if !testutil.Equal(got, want) { + t.Fatalf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v", + maxMsgs, synchronous, got, want) + } } // IAM tests. diff --git a/pubsub/topic.go b/pubsub/topic.go index ff3e396a81be..1d1979af1619 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -68,7 +68,7 @@ type Topic struct { stopped bool scheduler *scheduler.PublishScheduler - fc *flowController + flowController // EnableMessageOrdering enables delivery of ordered keys. EnableMessageOrdering bool @@ -456,7 +456,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { return r } - if err := t.fc.acquire(ctx, msgSize); err != nil { + if err := t.flowController.acquire(ctx, msgSize); err != nil { t.scheduler.Pause(msg.OrderingKey) ipubsub.SetPublishResult(r, "", err) return r @@ -564,7 +564,7 @@ func (t *Topic) initBundler() { fcs.MaxOutstandingMessages = t.PublishSettings.FlowControlSettings.MaxOutstandingMessages } - t.fc = newFlowController(fcs) + t.flowController = newFlowController(fcs) bufferedByteLimit := DefaultPublishSettings.BufferedByteLimit if t.PublishSettings.BufferedByteLimit > 0 { @@ -586,12 +586,11 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) var orderingKey string for i, bm := range bms { orderingKey = bm.msg.OrderingKey - msg := &pb.PubsubMessage{ + pbMsgs[i] = &pb.PubsubMessage{ Data: bm.msg.Data, Attributes: bm.msg.Attributes, OrderingKey: bm.msg.OrderingKey, } - pbMsgs[i] = msg bm.msg = nil // release bm.msg for GC } var res *pb.PublishResponse @@ -616,7 +615,7 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) PublishLatency.M(float64(end.Sub(start)/time.Millisecond)), PublishedMessages.M(int64(len(bms)))) for i, bm := range bms { - t.fc.release(ctx, bm.size) + t.flowController.release(ctx, bm.size) if err != nil { ipubsub.SetPublishResult(bm.res, "", err) } else {