Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: wrap errors using %w to preserve context #1321

Merged
merged 2 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion oauth2/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func ExtractUserName(token oauth2.Token) (string, error) {
p := jwt.Parser{}
claims := jwt.MapClaims{}
if _, _, err := p.ParseUnverified(token.AccessToken, claims); err != nil {
return "", fmt.Errorf("unable to decode the access token: %v", err)
return "", fmt.Errorf("unable to decode the access token: %w", err)
}
username, ok := claims[ClaimNameUserName]
if !ok {
Expand Down
10 changes: 5 additions & 5 deletions oauth2/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (t *tokenCache) Token() (*xoauth2.Token, error) {
// load from the store and use the access token if it isn't expired
grant, err := t.store.LoadGrant(t.audience)
if err != nil {
return nil, fmt.Errorf("LoadGrant: %v", err)
return nil, fmt.Errorf("LoadGrant: %w", err)
}
t.token = grant.Token
if t.token != nil && t.validateAccessToken(*t.token) {
Expand All @@ -90,13 +90,13 @@ func (t *tokenCache) Token() (*xoauth2.Token, error) {
// obtain and cache a fresh access token
grant, err = t.refresher.Refresh(grant)
if err != nil {
return nil, fmt.Errorf("RefreshGrant: %v", err)
return nil, fmt.Errorf("RefreshGrant: %w", err)
}
t.token = grant.Token
err = t.store.SaveGrant(t.audience, *grant)
if err != nil {
// TODO log rather than throw
return nil, fmt.Errorf("SaveGrant: %v", err)
return nil, fmt.Errorf("SaveGrant: %w", err)
}

return t.token, nil
Expand All @@ -117,14 +117,14 @@ func (t *tokenCache) InvalidateToken() error {
}
grant, err := t.store.LoadGrant(t.audience)
if err != nil {
return fmt.Errorf("LoadGrant: %v", err)
return fmt.Errorf("LoadGrant: %w", err)
}
if grant.Token != nil && grant.Token.AccessToken == previous.AccessToken {
grant.Token.Expiry = time.Unix(0, 0).Add(expiryDelta)
err = t.store.SaveGrant(t.audience, *grant)
if err != nil {
// TODO log rather than throw
return fmt.Errorf("SaveGrant: %v", err)
return fmt.Errorf("SaveGrant: %w", err)
}
}
return nil
Expand Down
13 changes: 7 additions & 6 deletions oauth2/store/keyring.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package store
import (
"crypto/sha1"
"encoding/json"
"errors"
"fmt"
"sync"

Expand Down Expand Up @@ -92,7 +93,7 @@ func (f *KeyringStore) LoadGrant(audience string) (*oauth2.AuthorizationGrant, e

item, err := f.getItem(audience)
if err != nil {
if err == keyring.ErrKeyNotFound {
if errors.Is(err, keyring.ErrKeyNotFound) {
return nil, ErrNoAuthenticationData
}
return nil, err
Expand All @@ -119,10 +120,10 @@ func (f *KeyringStore) WhoAmI(audience string) (string, error) {
key := hashKeyringKey(audience)
authItem, err := f.kr.Get(key)
if err != nil {
if err == keyring.ErrKeyNotFound {
if errors.Is(err, keyring.ErrKeyNotFound) {
return "", ErrNoAuthenticationData
}
return "", fmt.Errorf("unable to get information from the keyring: %v", err)
return "", fmt.Errorf("unable to get information from the keyring: %w", err)
}
return authItem.Label, nil
}
Expand All @@ -134,13 +135,13 @@ func (f *KeyringStore) Logout() error {
var err error
keys, err := f.kr.Keys()
if err != nil {
return fmt.Errorf("unable to get information from the keyring: %v", err)
return fmt.Errorf("unable to get information from the keyring: %w", err)
}
for _, key := range keys {
err = f.kr.Remove(key)
}
if err != nil {
return fmt.Errorf("unable to update the keyring: %v", err)
return fmt.Errorf("unable to update the keyring: %w", err)
}
return nil
}
Expand Down Expand Up @@ -180,7 +181,7 @@ func (f *KeyringStore) setItem(item storedItem) error {
}
err = f.kr.Set(i)
if err != nil {
return fmt.Errorf("unable to update the keyring: %v", err)
return fmt.Errorf("unable to update the keyring: %w", err)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion oauth2/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// ErrNoAuthenticationData indicates that stored authentication data is not available
var ErrNoAuthenticationData = errors.New("authentication data is not available")

// ErrUnsupportedAuthData ndicates that stored authentication data is unusable
// ErrUnsupportedAuthData indicates that stored authentication data is unusable
var ErrUnsupportedAuthData = errors.New("authentication data is not usable")

// Store is responsible for persisting authorization grants
Expand Down
20 changes: 10 additions & 10 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1158,21 +1158,21 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
// error decrypting the payload
if err != nil {
// default crypto failure action
crypToFailureAction := crypto.ConsumerCryptoFailureActionFail
cryptoFailureAction := crypto.ConsumerCryptoFailureActionFail
if pc.options.decryption != nil {
crypToFailureAction = pc.options.decryption.ConsumerCryptoFailureAction
cryptoFailureAction = pc.options.decryption.ConsumerCryptoFailureAction
}

switch crypToFailureAction {
switch cryptoFailureAction {
case crypto.ConsumerCryptoFailureActionFail:
pc.log.Errorf("consuming message failed due to decryption err :%v", err)
pc.log.Errorf("consuming message failed due to decryption err: %v", err)
pc.NackID(newTrackingMessageID(int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), 0, 0, 0, nil))
return err
case crypto.ConsumerCryptoFailureActionDiscard:
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecryptionError)
return fmt.Errorf("discarding message on decryption error :%v", err)
return fmt.Errorf("discarding message on decryption error: %w", err)
case crypto.ConsumerCryptoFailureActionConsume:
pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err)
pc.log.Warnf("consuming encrypted message due to error in decryption: %v", err)
messages := []*message{
{
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
Expand Down Expand Up @@ -1767,16 +1767,16 @@ func (pc *partitionConsumer) runEventsLoop() {
func (pc *partitionConsumer) internalClose(req *closeRequest) {
defer close(req.doneCh)
state := pc.getConsumerState()
if state != consumerReady {
// this might be redundant but to ensure nack tracker is closed
if state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Consumer is closing or has closed")
if pc.nackTracker != nil {
pc.nackTracker.Close()
}
return
}

if state == consumerClosed || state == consumerClosing {
RobertIndie marked this conversation as resolved.
Show resolved Hide resolved
pc.log.WithField("state", state).Error("Consumer is closing or has closed")
if state != consumerReady {
// this might be redundant but to ensure nack tracker is closed
if pc.nackTracker != nil {
pc.nackTracker.Close()
}
Expand Down
14 changes: 7 additions & 7 deletions pulsar/crypto/default_message_crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (d *DefaultMessageCrypto) addPublicKeyCipher(keyName string, keyReader KeyR
d.cipherLock.Lock()
defer d.cipherLock.Unlock()
if keyName == "" || keyReader == nil {
return fmt.Errorf("keyname or keyreader is null")
return fmt.Errorf("keyname or keyreader is nil")
}

// read the public key and its info using keyReader
Expand Down Expand Up @@ -212,7 +212,7 @@ func (d *DefaultMessageCrypto) Encrypt(encKeys []string,
func (d *DefaultMessageCrypto) Decrypt(msgMetadata MessageMetadataSupplier,
payload []byte,
keyReader KeyReader) ([]byte, error) {
// if data key is present, attempt to derypt using the existing key
// if data key is present, attempt to decrypt using the existing key
if d.dataKey != nil {
decryptedData, err := d.getKeyAndDecryptData(msgMetadata, payload)
if err != nil {
Expand Down Expand Up @@ -342,20 +342,20 @@ func (d *DefaultMessageCrypto) loadPrivateKey(key []byte) (gocrypto.PrivateKey,

// read the public key into RSA key
func (d *DefaultMessageCrypto) loadPublicKey(key []byte) (gocrypto.PublicKey, error) {
var publickKey gocrypto.PublicKey
var publicKey gocrypto.PublicKey

pubPem, _ := pem.Decode(key)
if pubPem == nil {
return publickKey, fmt.Errorf("failed to decode public key")
return publicKey, fmt.Errorf("failed to decode public key")
}

genericPublicKey, err := x509.ParsePKIXPublicKey(pubPem.Bytes)
if err != nil {
return publickKey, err
return publicKey, err
}
publickKey = genericPublicKey
publicKey = genericPublicKey

return publickKey, nil
return publicKey, nil
}

func generateDataKey() ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func serializeMessage(wb Buffer,
encryptedPayload, err := encryptor.Encrypt(compressedPayload, msgMetadata)
if err != nil {
// error occurred while encrypting the payload, ProducerCryptoFailureAction is set to Fail
return fmt.Errorf("encryption of message failed, ProducerCryptoFailureAction is set to Fail. Error :%v", err)
return fmt.Errorf("encryption of message failed, ProducerCryptoFailureAction is set to Fail. Error: %w", err)
}

cmdSize := uint32(proto.Size(cmdSend))
Expand Down
2 changes: 1 addition & 1 deletion pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ func (c *connection) handleTopicMigrated(commandTopicMigrated *pb.CommandTopicMi
resourceID := commandTopicMigrated.GetResourceId()
migratedBrokerServiceURL := c.getMigratedBrokerServiceURL(commandTopicMigrated)
if migratedBrokerServiceURL == "" {
c.log.Warnf("Failed to find the migrated broker url for resource: %s, migratedBrokerUrl: %s, migratedBrokerUrlTls:%s",
c.log.Warnf("Failed to find the migrated broker url for resource: %d, migratedBrokerUrl: %s, migratedBrokerUrlTls:%s",
resourceID,
commandTopicMigrated.GetBrokerServiceUrl(),
commandTopicMigrated.GetBrokerServiceUrlTls())
Expand Down
6 changes: 3 additions & 3 deletions pulsar/internal/crypto/producer_encryptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,19 @@ func (e *producerEncryptor) Encrypt(payload []byte, msgMetadata *pb.MessageMetad
crypto.NewMessageMetadataSupplier(msgMetadata),
payload)

// error encryping the payload
// error encrypting the payload
if err != nil {
// error occurred in encrypting the payload
// crypto ProducerCryptoFailureAction is set to send
// send unencrypted message
// unencrypted message
if e.producerCryptoFailureAction == crypto.ProducerCryptoFailureActionSend {
e.logger.
WithError(err).
Warnf("Encryption failed for payload sending unencrypted message ProducerCryptoFailureAction is set to send")
return payload, nil
}

return nil, fmt.Errorf("ProducerCryptoFailureAction is set to Fail and error occurred in encrypting payload :%v", err)
return nil, fmt.Errorf("ProducerCryptoFailureAction is set to Fail and error occurred in encrypting payload: %w", err)
}
return encryptedPayload, nil
}
4 changes: 2 additions & 2 deletions pulsar/primitiveSerDe.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ func (b BinaryFreeList) Uint64(r io.Reader, byteOrder binary.ByteOrder) (uint64,

func (b BinaryFreeList) Float64(buf []byte) (float64, error) {
if len(buf) < 8 {
return 0, fmt.Errorf("cannot decode binary double: %s", io.ErrShortBuffer)
return 0, fmt.Errorf("cannot decode binary double: %w", io.ErrShortBuffer)
}
return math.Float64frombits(binary.BigEndian.Uint64(buf[:8])), nil
}

func (b BinaryFreeList) Float32(buf []byte) (float32, error) {
if len(buf) < 4 {
return 0, fmt.Errorf("cannot decode binary float: %s", io.ErrShortBuffer)
return 0, fmt.Errorf("cannot decode binary float: %w", io.ErrShortBuffer)
}
return math.Float32frombits(binary.BigEndian.Uint32(buf[:4])), nil
}
Expand Down
2 changes: 1 addition & 1 deletion pulsar/producer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
true,
client.log.SubLogger(log.Fields{"topic": p.topic}))
if err != nil {
return nil, fmt.Errorf("unable to get MessageCrypto instance. Producer creation is abandoned. %v", err)
return nil, fmt.Errorf("unable to get MessageCrypto instance. Producer creation is abandoned. %w", err)
}
p.options.Encryption.MessageCrypto = messageCrypto
}
Expand Down
34 changes: 17 additions & 17 deletions pulsar/table_view_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type TableViewImpl struct {
dataMu sync.Mutex
data map[string]interface{}

readersMu sync.Mutex
cancelRaders map[string]cancelReader
readersMu sync.Mutex
cancelReaders map[string]cancelReader

listenersMu sync.Mutex
listeners []func(string, interface{}) error
Expand Down Expand Up @@ -73,12 +73,12 @@ func newTableView(client *client, options TableViewOptions) (TableView, error) {
}

tv := TableViewImpl{
client: client,
options: options,
data: make(map[string]interface{}),
cancelRaders: make(map[string]cancelReader),
logger: logger,
closedCh: make(chan struct{}),
client: client,
options: options,
data: make(map[string]interface{}),
cancelReaders: make(map[string]cancelReader),
logger: logger,
closedCh: make(chan struct{}),
}

// Do an initial round of partition update check to make sure we can populate the partition readers
Expand All @@ -104,16 +104,16 @@ func (tv *TableViewImpl) partitionUpdateCheck() error {
tv.readersMu.Lock()
defer tv.readersMu.Unlock()

for partition, cancelReader := range tv.cancelRaders {
for partition, cancelReader := range tv.cancelReaders {
if _, ok := partitions[partition]; !ok {
cancelReader.cancelFunc()
cancelReader.reader.Close()
delete(tv.cancelRaders, partition)
delete(tv.cancelReaders, partition)
}
}

for partition := range partitions {
if _, ok := tv.cancelRaders[partition]; !ok {
if _, ok := tv.cancelReaders[partition]; !ok {
reader, err := newReader(tv.client, ReaderOptions{
Topic: partition,
StartMessageID: EarliestMessageID(),
Expand All @@ -127,14 +127,14 @@ func (tv *TableViewImpl) partitionUpdateCheck() error {
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
tv.logger.Errorf("read next message failed for %s: %w", partition, err)
tv.logger.Errorf("read next message failed for %s: %v", partition, err)
}
if msg != nil {
tv.handleMessage(msg)
}
}
ctx, cancelFunc := context.WithCancel(context.Background())
tv.cancelRaders[partition] = cancelReader{
tv.cancelReaders[partition] = cancelReader{
reader: reader,
cancelFunc: cancelFunc,
}
Expand All @@ -148,7 +148,7 @@ func (tv *TableViewImpl) partitionUpdateCheck() error {
func (tv *TableViewImpl) periodicPartitionUpdateCheck() {
for {
if err := tv.partitionUpdateCheck(); err != nil {
tv.logger.Errorf("failed to check for changes in number of partitions: %w", err)
tv.logger.Errorf("failed to check for changes in number of partitions: %v", err)
}
select {
case <-tv.closedCh:
Expand Down Expand Up @@ -236,7 +236,7 @@ func (tv *TableViewImpl) Close() {

if !tv.closed {
tv.closed = true
for _, cancelReader := range tv.cancelRaders {
for _, cancelReader := range tv.cancelReaders {
cancelReader.reader.Close()
}
close(tv.closedCh)
Expand All @@ -259,7 +259,7 @@ func (tv *TableViewImpl) handleMessage(msg Message) {

for _, listener := range tv.listeners {
if err := listener(msg.Key(), reflect.Indirect(payload).Interface()); err != nil {
tv.logger.Errorf("table view listener failed for %v: %w", msg, err)
tv.logger.Errorf("table view listener failed for %v: %v", msg, err)
}
}
}
Expand All @@ -268,7 +268,7 @@ func (tv *TableViewImpl) watchReaderForNewMessages(ctx context.Context, reader R
for {
msg, err := reader.Next(ctx)
if err != nil {
tv.logger.Errorf("read next message failed for %s: %w", reader.Topic(), err)
tv.logger.Errorf("read next message failed for %s: %v", reader.Topic(), err)
}
var e *Error
if (errors.As(err, &e) && e.Result() == ConsumerClosed) || errors.Is(err, context.Canceled) {
Expand Down
Loading