Skip to content

Commit

Permalink
feat(pubsub): expose common errors for easier handling (#7940)
Browse files Browse the repository at this point in the history
* feat(pubsub): expose common errors for easier handling

* unexport topic ordering not enabled error, switch ErrPublishingPaused to custom error
  • Loading branch information
hongalex authored May 18, 2023
1 parent f037795 commit 983105d
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 9 deletions.
3 changes: 2 additions & 1 deletion pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -1347,7 +1348,7 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) {
Data: []byte("should fail"),
OrderingKey: orderingKey,
})
if _, err := r.Get(ctx); err == nil || !strings.Contains(err.Error(), "pubsub: Publishing for ordering key") {
if _, err := r.Get(ctx); err == nil || !errors.As(err, &ErrPublishingPaused{}) {
t.Fatalf("expected ordering keys publish error, got %v", err)
}

Expand Down
21 changes: 17 additions & 4 deletions pubsub/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,8 @@ func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator {
}
}

var errTopicStopped = errors.New("pubsub: Stop has been called for this topic")
// ErrTopicStopped indicates that topic has been stopped and further publishing will fail.
var ErrTopicStopped = errors.New("pubsub: Stop has been called for this topic")

// A PublishResult holds the result from a call to Publish.
//
Expand All @@ -548,6 +549,8 @@ var errTopicStopped = errors.New("pubsub: Stop has been called for this topic")
// }
type PublishResult = ipubsub.PublishResult

var errTopicOrderingNotEnabled = errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering")

// Publish publishes msg to the topic asynchronously. Messages are batched and
// sent according to the topic's PublishSettings. Publish never blocks.
//
Expand All @@ -565,7 +568,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {

r := ipubsub.NewPublishResult()
if !t.EnableMessageOrdering && msg.OrderingKey != "" {
ipubsub.SetPublishResult(r, "", errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering"))
ipubsub.SetPublishResult(r, "", errTopicOrderingNotEnabled)
return r
}

Expand All @@ -582,7 +585,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
defer t.mu.RUnlock()
// TODO(aboulhosn) [from bcmills] consider changing the semantics of bundler to perform this logic so we don't have to do it here
if t.stopped {
ipubsub.SetPublishResult(r, "", errTopicStopped)
ipubsub.SetPublishResult(r, "", ErrTopicStopped)
return r
}

Expand Down Expand Up @@ -697,6 +700,16 @@ func (t *Topic) initBundler() {
t.scheduler.BundleByteLimit = MaxPublishRequestBytes - calcFieldSizeString(t.name) - 5
}

// ErrPublishingPaused is a custom error indicating that the publish paused for the specified ordering key.
type ErrPublishingPaused struct {
OrderingKey string
}

func (e ErrPublishingPaused) Error() string {
return fmt.Sprintf("pubsub: Publishing for ordering key, %s, paused due to previous error. Call topic.ResumePublish(orderingKey) before resuming publishing", e.OrderingKey)

}

func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) {
ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name))
if err != nil {
Expand All @@ -716,7 +729,7 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage)
var res *pb.PublishResponse
start := time.Now()
if orderingKey != "" && t.scheduler.IsPaused(orderingKey) {
err = fmt.Errorf("pubsub: Publishing for ordering key, %s, paused due to previous error. Call topic.ResumePublish(orderingKey) before resuming publishing", orderingKey)
err = ErrPublishingPaused{OrderingKey: orderingKey}
} else {
// Apply custom publish retryer on top of user specified retryer and
// default retryer.
Expand Down
25 changes: 21 additions & 4 deletions pubsub/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pubsub
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -167,8 +168,8 @@ func TestStopPublishOrder(t *testing.T) {
topic.Stop()
r := topic.Publish(ctx, &Message{})
_, err := r.Get(ctx)
if err != errTopicStopped {
t.Errorf("got %v, want errTopicStopped", err)
if !errors.Is(err, ErrTopicStopped) {
t.Errorf("got %v, want ErrTopicStopped", err)
}
}

Expand Down Expand Up @@ -460,8 +461,8 @@ func TestFlushStopTopic(t *testing.T) {
r5 := topic.Publish(ctx, &Message{
Data: []byte("this should fail"),
})
if _, err := r5.Get(ctx); err != errTopicStopped {
t.Errorf("got %v, want errTopicStopped", err)
if _, err := r5.Get(ctx); !errors.Is(err, ErrTopicStopped) {
t.Errorf("got %v, want ErrTopicStopped", err)
}
}

Expand Down Expand Up @@ -673,3 +674,19 @@ func addSingleResponse(srv *pstest.Server, id string) {
MessageIds: []string{id},
}, nil)
}

func TestPublishOrderingNotEnabled(t *testing.T) {
ctx := context.Background()
c, srv := newFake(t)
defer c.Close()
defer srv.Close()

topic, err := c.CreateTopic(ctx, "test-topic")
if err != nil {
t.Fatal(err)
}
res := publishSingleMessageWithKey(ctx, topic, "test", "non-existent-key")
if _, err := res.Get(ctx); !errors.Is(err, errTopicOrderingNotEnabled) {
t.Errorf("got %v, want errTopicOrderingNotEnabled", err)
}
}

0 comments on commit 983105d

Please sign in to comment.