diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 85a7d32e1705..42d2c7d971ad 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -18,6 +18,7 @@ import ( "bufio" "bytes" "context" + "errors" "fmt" "io/ioutil" "os" @@ -1347,7 +1348,7 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) { Data: []byte("should fail"), OrderingKey: orderingKey, }) - if _, err := r.Get(ctx); err == nil || !strings.Contains(err.Error(), "pubsub: Publishing for ordering key") { + if _, err := r.Get(ctx); err == nil || !errors.As(err, &ErrPublishingPaused{}) { t.Fatalf("expected ordering keys publish error, got %v", err) } diff --git a/pubsub/topic.go b/pubsub/topic.go index 36af7d582aa4..0cc1f95ad945 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -535,7 +535,8 @@ func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator { } } -var errTopicStopped = errors.New("pubsub: Stop has been called for this topic") +// ErrTopicStopped indicates that topic has been stopped and further publishing will fail. +var ErrTopicStopped = errors.New("pubsub: Stop has been called for this topic") // A PublishResult holds the result from a call to Publish. // @@ -548,6 +549,8 @@ var errTopicStopped = errors.New("pubsub: Stop has been called for this topic") // } type PublishResult = ipubsub.PublishResult +var errTopicOrderingNotEnabled = errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering") + // Publish publishes msg to the topic asynchronously. Messages are batched and // sent according to the topic's PublishSettings. Publish never blocks. // @@ -565,7 +568,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { r := ipubsub.NewPublishResult() if !t.EnableMessageOrdering && msg.OrderingKey != "" { - ipubsub.SetPublishResult(r, "", errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering")) + ipubsub.SetPublishResult(r, "", errTopicOrderingNotEnabled) return r } @@ -582,7 +585,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { defer t.mu.RUnlock() // TODO(aboulhosn) [from bcmills] consider changing the semantics of bundler to perform this logic so we don't have to do it here if t.stopped { - ipubsub.SetPublishResult(r, "", errTopicStopped) + ipubsub.SetPublishResult(r, "", ErrTopicStopped) return r } @@ -697,6 +700,16 @@ func (t *Topic) initBundler() { t.scheduler.BundleByteLimit = MaxPublishRequestBytes - calcFieldSizeString(t.name) - 5 } +// ErrPublishingPaused is a custom error indicating that the publish paused for the specified ordering key. +type ErrPublishingPaused struct { + OrderingKey string +} + +func (e ErrPublishingPaused) Error() string { + return fmt.Sprintf("pubsub: Publishing for ordering key, %s, paused due to previous error. Call topic.ResumePublish(orderingKey) before resuming publishing", e.OrderingKey) + +} + func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) { ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name)) if err != nil { @@ -716,7 +729,7 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) var res *pb.PublishResponse start := time.Now() if orderingKey != "" && t.scheduler.IsPaused(orderingKey) { - err = fmt.Errorf("pubsub: Publishing for ordering key, %s, paused due to previous error. Call topic.ResumePublish(orderingKey) before resuming publishing", orderingKey) + err = ErrPublishingPaused{OrderingKey: orderingKey} } else { // Apply custom publish retryer on top of user specified retryer and // default retryer. diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go index 13ccb0098091..5992d03f2c46 100644 --- a/pubsub/topic_test.go +++ b/pubsub/topic_test.go @@ -17,6 +17,7 @@ package pubsub import ( "bytes" "context" + "errors" "fmt" "strings" "sync" @@ -167,8 +168,8 @@ func TestStopPublishOrder(t *testing.T) { topic.Stop() r := topic.Publish(ctx, &Message{}) _, err := r.Get(ctx) - if err != errTopicStopped { - t.Errorf("got %v, want errTopicStopped", err) + if !errors.Is(err, ErrTopicStopped) { + t.Errorf("got %v, want ErrTopicStopped", err) } } @@ -460,8 +461,8 @@ func TestFlushStopTopic(t *testing.T) { r5 := topic.Publish(ctx, &Message{ Data: []byte("this should fail"), }) - if _, err := r5.Get(ctx); err != errTopicStopped { - t.Errorf("got %v, want errTopicStopped", err) + if _, err := r5.Get(ctx); !errors.Is(err, ErrTopicStopped) { + t.Errorf("got %v, want ErrTopicStopped", err) } } @@ -673,3 +674,19 @@ func addSingleResponse(srv *pstest.Server, id string) { MessageIds: []string{id}, }, nil) } + +func TestPublishOrderingNotEnabled(t *testing.T) { + ctx := context.Background() + c, srv := newFake(t) + defer c.Close() + defer srv.Close() + + topic, err := c.CreateTopic(ctx, "test-topic") + if err != nil { + t.Fatal(err) + } + res := publishSingleMessageWithKey(ctx, topic, "test", "non-existent-key") + if _, err := res.Get(ctx); !errors.Is(err, errTopicOrderingNotEnabled) { + t.Errorf("got %v, want errTopicOrderingNotEnabled", err) + } +}