Skip to content

Commit

Permalink
Merge pull request moov-io#165 from moov-io/autocommit-determines-ack…
Browse files Browse the repository at this point in the history
…-order

pipeline: Let .AutoCommit determine when messages are committed
  • Loading branch information
adamdecaf authored Mar 9, 2023
2 parents 4a3b9af + c6b56c9 commit 4af57d6
Showing 1 changed file with 27 additions and 1 deletion.
28 changes: 27 additions & 1 deletion internal/pipeline/file_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,16 @@ func contains(err error, options ...string) bool {
}

func (fr *FileReceiver) processMessage(msg *pubsub.Message) error {
msg.Ack()
// AutoCommit is a setting which will acknowledge messages with the pubsub service
// immediately after receiving it. With this disabled messages are committed after
// successful processing.
//
// Uncommitted messages will be redelivered and reprocessed, which can delay or
// pause processing.
committed := fr.shouldAutocommit()
if committed {
msg.Ack()
}

data := msg.Body
var err error
Expand All @@ -252,6 +261,9 @@ func (fr *FileReceiver) processMessage(msg *pubsub.Message) error {
"type": log.String(fmt.Sprintf("%T", evt)),
}).LogError(err).Err()
}
if !committed {
msg.Ack()
}
return nil

case *models.QueueACHFile:
Expand All @@ -262,6 +274,9 @@ func (fr *FileReceiver) processMessage(msg *pubsub.Message) error {
"type": log.String(fmt.Sprintf("%T", evt)),
}).LogError(err).Err()
}
if !committed {
msg.Ack()
}
return nil

case *models.CancelACHFile:
Expand All @@ -271,13 +286,24 @@ func (fr *FileReceiver) processMessage(msg *pubsub.Message) error {
"type": log.String(fmt.Sprintf("%T", evt)),
}).LogError(err).Err()
}
if !committed {
msg.Ack()
}
return nil
}

// Unhandled Message
return fr.wrappedErrorLogger(msg).LogError(errors.New("unhandled message")).Err()
}

func (fr *FileReceiver) shouldAutocommit() bool {
kafkaConfig := fr.cfg.Inbound.Kafka
if kafkaConfig == nil {
return false
}
return kafkaConfig.AutoCommit
}

func (fr *FileReceiver) wrappedErrorLogger(msg *pubsub.Message) log.Logger {
logger := fr.logger.With(log.Fields{
"loggableID": log.String(msg.LoggableID),
Expand Down

0 comments on commit 4af57d6

Please sign in to comment.