Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use errors.Join to wrap multiple errors #1322

Merged
merged 1 commit into from
Dec 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@ require (
github.com/golang/protobuf v1.5.4
github.com/google/uuid v1.6.0
github.com/hamba/avro/v2 v2.26.0
github.com/hashicorp/go-multierror v1.1.1
github.com/klauspost/compress v1.17.9
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.19.0
@@ -61,7 +60,6 @@ require (
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
5 changes: 0 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
@@ -213,11 +213,6 @@ github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/hamba/avro/v2 v2.26.0 h1:IaT5l6W3zh7K67sMrT2+RreJyDTllBGVJm4+Hedk9qE=
github.com/hamba/avro/v2 v2.26.0/go.mod h1:I8glyswHnpED3Nlx2ZdUe+4LJnCOOyiCzLMno9i/Uu0=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
8 changes: 0 additions & 8 deletions pulsar/error.go
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@ import (
"fmt"

proto "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/hashicorp/go-multierror"
)

// Result used to represent pulsar processing is an alias of type int.
@@ -255,10 +254,3 @@ func getErrorFromServerError(serverError *proto.ServerError) error {
return newError(UnknownError, serverError.String())
}
}

// joinErrors can join multiple errors into one error, and the returned error can be tested by errors.Is()
// we use github.com/hashicorp/go-multierror instead of errors.Join() of Go 1.20 so that we can compile pulsar
// go client with go versions that newer than go 1.13
func joinErrors(errs ...error) error {
return multierror.Append(nil, errs...)
}
2 changes: 1 addition & 1 deletion pulsar/error_test.go
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ func Test_joinErrors(t *testing.T) {
err1 := errors.New("err1")
err2 := errors.New("err2")
err3 := errors.New("err3")
err := joinErrors(ErrInvalidMessage, err1, err2)
err := errors.Join(ErrInvalidMessage, err1, err2)
assert.True(t, errors.Is(err, ErrInvalidMessage))
assert.True(t, errors.Is(err, err1))
assert.True(t, errors.Is(err, err2))
26 changes: 13 additions & 13 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
@@ -514,25 +514,25 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up reconnection.
p.log.Warn("Topic not found, stop reconnecting, close the producer")
p.doClose(joinErrors(ErrTopicNotfound, err))
p.doClose(errors.Join(ErrTopicNotfound, err))
return struct{}{}, nil
}

if strings.Contains(errMsg, errMsgTopicTerminated) {
p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting, close the producer")
p.doClose(joinErrors(ErrTopicTerminated, err))
p.doClose(errors.Join(ErrTopicTerminated, err))
return struct{}{}, nil
}

if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) {
p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting")
p.failPendingMessages(joinErrors(ErrProducerBlockedQuotaExceeded, err))
p.failPendingMessages(errors.Join(ErrProducerBlockedQuotaExceeded, err))
return struct{}{}, nil
}

if strings.Contains(errMsg, errMsgProducerFenced) {
p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting")
p.doClose(joinErrors(ErrProducerFenced, err))
p.doClose(errors.Join(ErrProducerFenced, err))
return struct{}{}, nil
}

@@ -1111,18 +1111,18 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,

func (p *partitionProducer) validateMsg(msg *ProducerMessage) error {
if msg == nil {
return joinErrors(ErrInvalidMessage, fmt.Errorf("message is nil"))
return errors.Join(ErrInvalidMessage, fmt.Errorf("message is nil"))
}

if msg.Value != nil && msg.Payload != nil {
return joinErrors(ErrInvalidMessage, fmt.Errorf("can not set Value and Payload both"))
return errors.Join(ErrInvalidMessage, fmt.Errorf("can not set Value and Payload both"))
}

if p.options.DisableMultiSchema {
if msg.Schema != nil && p.options.Schema != nil &&
msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
p.log.Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return joinErrors(ErrSchema, fmt.Errorf("msg schema can not match with producer schema"))
return errors.Join(ErrSchema, fmt.Errorf("msg schema can not match with producer schema"))
}
}

@@ -1138,16 +1138,16 @@ func (p *partitionProducer) prepareTransaction(sr *sendRequest) error {
if txn.state.Load() != int32(TxnOpen) {
p.log.WithField("state", txn.state.Load()).Error("Failed to send message" +
" by a non-open transaction.")
return joinErrors(ErrTransaction,
return errors.Join(ErrTransaction,
fmt.Errorf("failed to send message by a non-open transaction"))
}

if err := txn.registerProducerTopic(p.topic); err != nil {
return joinErrors(ErrTransaction, err)
return errors.Join(ErrTransaction, err)
}

if err := txn.registerSendOrAckOp(); err != nil {
return joinErrors(ErrTransaction, err)
return errors.Join(ErrTransaction, err)
}

sr.transaction = txn
@@ -1173,7 +1173,7 @@ func (p *partitionProducer) updateSchema(sr *sendRequest) error {
if schemaVersion == nil {
schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
if err != nil {
return joinErrors(ErrSchema, fmt.Errorf("get schema version fail, err: %w", err))
return errors.Join(ErrSchema, fmt.Errorf("get schema version fail, err: %w", err))
}
p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
}
@@ -1190,15 +1190,15 @@ func (p *partitionProducer) updateUncompressedPayload(sr *sendRequest) error {
if sr.msg.Value != nil {
if sr.schema == nil {
p.log.Errorf("Schema encode message failed %s", sr.msg.Value)
return joinErrors(ErrSchema, fmt.Errorf("set schema value without setting schema"))
return errors.Join(ErrSchema, fmt.Errorf("set schema value without setting schema"))
}

// payload and schema are mutually exclusive
// try to get payload from schema value only if payload is not set
schemaPayload, err := sr.schema.Encode(sr.msg.Value)
if err != nil {
p.log.WithError(err).Errorf("Schema encode message failed %s", sr.msg.Value)
return joinErrors(ErrSchema, err)
return errors.Join(ErrSchema, err)
}

sr.uncompressedPayload = schemaPayload
Loading