Skip to content

Commit c51f18e

Browse files
committed
Take into account on-failure handlers
1 parent 1029807 commit c51f18e

File tree

1 file changed

+38
-3
lines changed

1 file changed

+38
-3
lines changed

internal/elasticsearch/ingest/pipeline.go

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"io"
1414
"net/http"
1515
"strings"
16+
"time"
1617

1718
"gopkg.in/yaml.v3"
1819

@@ -46,9 +47,24 @@ type verboseProcessorError struct {
4647
RootCause json.RawMessage `json:"root_cause"`
4748
}
4849

50+
func (e verboseProcessorError) Error() string {
51+
return fmt.Sprintf("[%s] %s", e.Type, e.Reason)
52+
}
53+
4954
type pipelineDocument struct {
50-
Index string `json:"_index"`
51-
Source json.RawMessage `json:"_source"`
55+
Index string `json:"_index"`
56+
Source json.RawMessage `json:"_source"`
57+
Ingest verboseProcessorIngest `json:"_ingest"`
58+
}
59+
60+
type verboseProcessorIngest struct {
61+
Pipeline string `json:"pipeline"`
62+
Timestamp time.Time `json:"timestamp"`
63+
64+
OnFailurePipeline string `json:"on_failure_pipeline"`
65+
OnFailureMessage string `json:"on_failure_message"`
66+
OnFailureProcessorTag string `json:"on_failure_processor_tag"`
67+
OnFailureProcessorType string `json:"on_failure_processor_type"`
5268
}
5369

5470
// Pipeline represents a pipeline resource loaded from a file
@@ -128,12 +144,31 @@ func SimulatePipeline(ctx context.Context, api *elasticsearch.API, pipelineName
128144
return nil, fmt.Errorf("unmarshalling simulate request failed: %w", err)
129145
}
130146

147+
handleErrors := func(ingest verboseProcessorIngest, errs []error) []error {
148+
var filtered []error
149+
for _, err := range errs {
150+
var processorError verboseProcessorError
151+
if errors.As(err, &processorError) && processorError.Reason == ingest.OnFailureMessage {
152+
continue
153+
}
154+
filtered = append(filtered, err)
155+
}
156+
return filtered
157+
}
158+
131159
processedEvents := make([]json.RawMessage, len(response.Docs))
132160
var errs []error
133161
for i, doc := range response.Docs {
134162
var source json.RawMessage
135163
failed := false
136164
for _, result := range doc.ProcessorResults {
165+
if result.Doc.Ingest.OnFailureMessage != "" {
166+
// This processor is in an on_failure handler, filter out the handled errors
167+
// and assume that processing is going on.
168+
errs = handleErrors(result.Doc.Ingest, errs)
169+
failed = false
170+
}
171+
137172
switch result.Status {
138173
case "success":
139174
// Keep last successful document.
@@ -147,7 +182,7 @@ func SimulatePipeline(ctx context.Context, api *elasticsearch.API, pipelineName
147182
continue
148183
case "error":
149184
failed = true
150-
errs = append(errs, fmt.Errorf("error in processor %s: [%s] %s", result.Processor, result.Error.Type, result.Error.Reason))
185+
errs = append(errs, fmt.Errorf("error in pricessor %s: %w", result.Processor, result.Error))
151186
case "failed":
152187
failed = true
153188
errs = append(errs, fmt.Errorf("%q processor failed", result.Processor))

0 commit comments

Comments
 (0)