Skip to content

Commit

Permalink
Remove reroute processors from coverage reports (#1647)
Browse files Browse the repository at this point in the history
Remove the reroute processors from the pipeline stats to generate
the coverage reports. And at the same time, to be able to use the
right lines in the coverage reports, it is needed to keep the original
contents (the ones read from the file) of the pipeline.
  • Loading branch information
mrodm committed Jan 30, 2024
1 parent 8b4aaa0 commit 808efee
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 10 deletions.
11 changes: 6 additions & 5 deletions internal/elasticsearch/ingest/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,18 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, er
return nil, err
}

c, err = addRerouteProcessors(c, dataStreamPath, path)
cWithRerouteProcessors, err := addRerouteProcessors(c, dataStreamPath, path)
if err != nil {
return nil, err
}

name := filepath.Base(path)
pipelines = append(pipelines, Pipeline{
Path: path,
Name: getPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce),
Format: filepath.Ext(path)[1:],
Content: c,
Path: path,
Name: getPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce),
Format: filepath.Ext(path)[1:],
Content: cWithRerouteProcessors,
ContentOriginal: c,
})
}
return pipelines, nil
Expand Down
9 changes: 5 additions & 4 deletions internal/elasticsearch/ingest/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ type pipelineIngestedDocument struct {

// Pipeline represents a pipeline resource loaded from a file
type Pipeline struct {
Path string // Path of the file with the pipeline definition.
Name string // Name of the pipeline.
Format string // Format (extension) of the pipeline.
Content []byte // Content is the original file contents.
Path string // Path of the file with the pipeline definition.
Name string // Name of the pipeline.
Format string // Format (extension) of the pipeline.
Content []byte // Content is the pipeline file contents with reroute processors if any.
ContentOriginal []byte // Content is the original file contents.
}

// Filename returns the original filename associated with the pipeline.
Expand Down
14 changes: 14 additions & 0 deletions internal/elasticsearch/ingest/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@ func (p Pipeline) Processors() (procs []Processor, err error) {
return procs, nil
}

// OriginalProcessors return the original list of processors in an ingest pipeline.
func (p Pipeline) OriginalProcessors() (procs []Processor, err error) {
switch p.Format {
case "yaml", "yml", "json":
procs, err = processorsFromYAML(p.ContentOriginal)
default:
return nil, fmt.Errorf("unsupported pipeline format: %s", p.Format)
}
if err != nil {
return nil, fmt.Errorf("failure processing %s pipeline '%s': %w", p.Format, p.Filename(), err)
}
return procs, nil
}

// extract a list of processors from a pipeline definition in YAML format.
func processorsFromYAML(content []byte) (procs []Processor, err error) {
var p struct {
Expand Down
13 changes: 12 additions & 1 deletion internal/testrunner/runners/pipeline/coverage.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func GetPipelineCoverage(options testrunner.TestOptions, pipelines []ingest.Pipe

func pipelineDataForCoverage(pipeline ingest.Pipeline, stats ingest.PipelineStatsMap, basePath, dataStreamPath string) (string, string, []ingest.Processor, ingest.PipelineStats, error) {
// Load the list of main processors from the pipeline source code, annotated with line numbers.
src, err := pipeline.Processors()
src, err := pipeline.OriginalProcessors()
if err != nil {
return "", "", nil, ingest.PipelineStats{}, err
}
Expand All @@ -115,6 +115,17 @@ func pipelineDataForCoverage(pipeline ingest.Pipeline, stats ingest.PipelineStat
return "", "", nil, ingest.PipelineStats{}, fmt.Errorf("pipeline '%s' not installed in Elasticsearch", pipeline.Name)
}

// Remove reroute processors if any so the pipeline has the same processors as in the file
// reroute processors are added if there are any routing_rules file defined
var processors []ingest.ProcessorStats
for _, proc := range pstats.Processors {
if proc.Type == "reroute" {
continue
}
processors = append(processors, proc)
}
pstats.Processors = processors

// Ensure there is no inconsistency in the list of processors in stats vs obtained from source.
if len(src) != len(pstats.Processors) {
return "", "", nil, ingest.PipelineStats{}, fmt.Errorf("processor count mismatch for %s (src:%d stats:%d)", pipeline.Filename(), len(src), len(pstats.Processors))
Expand Down

0 comments on commit 808efee

Please sign in to comment.