Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat][CometD] Resolve Retry Error Handling #34327

Merged
merged 15 commits into from
Feb 22, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
improved logging
  • Loading branch information
kush-elastic committed Feb 1, 2023
commit 40e824cd500ba7f65d9c77aca46249e00cc1752d
14 changes: 8 additions & 6 deletions x-pack/filebeat/input/cometd/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,12 @@ func (in *cometdInput) Run() {
if err != nil {
in.log.Errorw("not able to get access token", "error", err)
// Creating a new channel for cometd input.
in.msgCh = make(chan bay.MaybeMsg, 1)
continue
}

if err := in.run(); err != nil {
if in.workerCtx.Err() == nil {
in.log.Warnw("Restarting failed CometD input worker.", "error", err)
in.log.Errorw("Restarting failed CometD input worker.", "error", err)
// Creating a new channel for cometd input.
in.msgCh = make(chan bay.MaybeMsg, 1)
continue
Expand Down Expand Up @@ -95,7 +94,7 @@ func (in *cometdInput) run() error {
// log warning every 5 seconds only to avoid to many unnecessary logs
select {
case <-ticker.C:
in.log.Warnw("Retrying...! facing issue while collecting data from CometD", "error", e.Error())
in.log.Errorw("Retrying...! facing issue while collecting data from CometD", "error", e.Error())
default:
}
} else if !e.Msg.Successful {
Expand All @@ -106,12 +105,14 @@ func (in *cometdInput) run() error {
if e.Msg.Data.Payload != nil {
msg, err = e.Msg.Data.Payload.MarshalJSON()
if err != nil {
return fmt.Errorf("JSON error: %w", err)
in.log.Errorw("invalid JSON", "error", err)
continue
}
} else if e.Msg.Data.Object != nil {
msg, err = e.Msg.Data.Object.MarshalJSON()
if err != nil {
return fmt.Errorf("JSON error: %w", err)
in.log.Errorw("invalid JSON", "error", err)
continue
}
} else {
// To handle the last response where the object received was empty
Expand All @@ -121,7 +122,8 @@ func (in *cometdInput) run() error {
// Extract event IDs from json.RawMessage
err = json.Unmarshal(msg, &event)
if err != nil {
return fmt.Errorf("error while parsing JSON: %w", err)
in.log.Errorw("error while parsing JSON", "error", err)
continue
}
if ok := in.outlet.OnEvent(makeEvent(event.EventId, e.Msg.Channel, string(msg))); !ok {
in.log.Debug("OnEvent returned false. Stopping input worker.")
Expand Down