Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Handle error message in handleS3Objects function #15545

Merged
merged 4 commits into from
Jan 14, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Handle error message in handleS3Objects function
  • Loading branch information
kaiyan-sheng committed Jan 14, 2020
commit 9a260a1016bebca06d1087791689a9de6a949e0c
7 changes: 5 additions & 2 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func (p *s3Input) Wait() {
func (p *s3Input) processor(queueURL string, messages []sqs.Message, visibilityTimeout int64, svcS3 s3iface.ClientAPI, svcSQS sqsiface.ClientAPI) {
var wg sync.WaitGroup
numMessages := len(messages)
p.logger.Debugf("Processing %v messages", numMessages)
wg.Add(numMessages * 2)

// process messages received from sqs
Expand All @@ -257,7 +258,7 @@ func (p *s3Input) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, w
if err != nil {
err = errors.Wrap(err, "handleS3Objects failed")
p.logger.Error(err)
errC <- err
return
}
}

Expand Down Expand Up @@ -408,7 +409,9 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC
}
return nil
} else if err != nil {
return errors.Wrapf(err, "ReadString failed for %v", s3Info.key)
err = errors.Wrapf(err, "ReadString failed for %v", s3Info.key)
s3Context.Fail(err)
return err
}

// create event per log line
Expand Down