Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up / document metrics monitor fields #39413

Merged
merged 18 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update / add tests
  • Loading branch information
faec committed May 25, 2024
commit 3ccb4f16c436412e6c0fe03a64ace394d933d9ed
16 changes: 2 additions & 14 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ func (client *Client) Clone() *Client {
}

func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error {
fmt.Printf("hi fae, Publish with %v events\n", len(batch.Events()))
span, ctx := apm.StartSpan(ctx, "publishEvents", "output")
defer span.End()
span.Context.SetLabel("events_original", len(batch.Events()))
Expand All @@ -224,8 +223,6 @@ func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error
if bulkResult.connErr != nil {
// If there was a connection-level error there is no per-item response,
// handle it and return.
fmt.Printf("hi fae, calling handleBulkResultError\n")
defer fmt.Printf("hi fae, returning from Publish\n")
return client.handleBulkResultError(ctx, batch, bulkResult)
}
span.Context.SetLabel("events_published", len(bulkResult.events))
Expand Down Expand Up @@ -258,7 +255,6 @@ func (client *Client) doBulkRequest(

rawEvents := batch.Events()

fmt.Printf("hi fae, doBulkRequest\n")
// encode events into bulk request buffer, dropping failed elements from
// events slice
resultEvents, bulkItems := client.bulkEncodePublishRequest(client.conn.GetVersion(), rawEvents)
Expand All @@ -270,16 +266,13 @@ func (client *Client) doBulkRequest(
begin := time.Now()
result.status, result.response, result.connErr =
client.conn.Bulk(ctx, "", "", bulkRequestParams, bulkItems)
fmt.Printf("hi fae, bulk status %v response %v\n", result.status, string(result.response))
if result.connErr == nil {
duration := time.Since(begin)
client.observer.ReportLatency(duration)
client.log.Debugf(
"doBulkRequest: %d events have been sent to elasticsearch in %v.",
len(result.events), duration)
}
} else {
fmt.Printf("hi fae, doBulkRequest had no events left after encoding\n")
}

return result
Expand All @@ -289,21 +282,16 @@ func (client *Client) handleBulkResultError(
ctx context.Context, batch publisher.Batch, bulkResult bulkResult,
) error {
if bulkResult.status == http.StatusRequestEntityTooLarge {
fmt.Printf("hi fae, got statusrequestentitytoolarge\n")
if batch.SplitRetry() {
fmt.Printf("hi fae, split a batch\n")
// Report that we split a batch
client.observer.BatchSplit()
client.observer.RetryableErrors(len(bulkResult.events))
} else {
fmt.Printf("hi fae, batch split failed\n")
// If the batch could not be split, there is no option left but
// to drop it and log the error state.
batch.Drop()
client.observer.PermanentErrors(len(bulkResult.events))
err := apm.CaptureError(ctx, fmt.Errorf("failed to perform bulk index operation: %w", bulkResult.connErr))
err.Send()
client.log.Error(err)
client.log.Error(errPayloadTooLarge)
}
// Don't propagate a too-large error since it doesn't indicate a problem
// with the connection.
Expand All @@ -320,6 +308,7 @@ func (client *Client) handleBulkResultError(
// All events were sent successfully
batch.ACK()
}
client.observer.RetryableErrors(len(bulkResult.events))
return bulkResult.connErr
}

Expand Down Expand Up @@ -413,7 +402,6 @@ func getPipeline(event *beat.Event, defaultSelector *outil.Selector) (string, er
// Each of the events will be reported in the returned stats as exactly one of
// acked, duplicates, fails, nonIndexable, or deadLetter.
func (client *Client) bulkCollectPublishFails(bulkResult bulkResult) ([]publisher.Event, bulkResultStats) {
fmt.Printf("hi fae, bulkCollectPublishFails\n")
events := bulkResult.events

if len(bulkResult.events) == 0 {
Expand Down
Loading