Skip to content

Commit 9d2902f

Browse files
committed
processor ends when next one start
1 parent 6d8c1a8 commit 9d2902f

File tree

2 files changed

+190
-69
lines changed

2 files changed

+190
-69
lines changed

internal/elasticsearch/ingest/processors.go

Lines changed: 77 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55
package ingest
66

77
import (
8+
"bufio"
9+
"bytes"
810
"fmt"
9-
"strings"
1011

1112
"gopkg.in/yaml.v3"
1213
)
@@ -59,6 +60,7 @@ func processorsFromYAML(content []byte) (procs []Processor, err error) {
5960
if err = yaml.Unmarshal(content, &p); err != nil {
6061
return nil, err
6162
}
63+
6264
for idx, entry := range p.Processors {
6365
if entry.Kind != yaml.MappingNode || len(entry.Content) != 2 {
6466
return nil, fmt.Errorf("processor#%d is not a single-key map (kind:%v content:%d)", idx, entry.Kind, len(entry.Content))
@@ -71,28 +73,88 @@ func processorsFromYAML(content []byte) (procs []Processor, err error) {
7173
return nil, fmt.Errorf("error decoding processor#%d type: %w", idx, err)
7274
}
7375
proc.FirstLine = entry.Line
74-
proc.LastLine = lastLine(&entry)
76+
lastLine, err := getProcessorLastLine(idx, p.Processors, proc, content)
77+
if err != nil {
78+
return nil, err
79+
}
80+
proc.LastLine = lastLine
81+
7582
procs = append(procs, proc)
7683
}
77-
return procs, nil
84+
return procs, err
85+
}
86+
87+
// getProcessorLastLine determines the last line number for the given processor.
88+
func getProcessorLastLine(idx int, processors []yaml.Node, currentProcessor Processor, content []byte) (int, error) {
89+
if idx < len(processors)-1 {
90+
var endProcessor = processors[idx+1].Line - 1
91+
if endProcessor < currentProcessor.FirstLine {
92+
return currentProcessor.FirstLine, nil
93+
} else {
94+
return processors[idx+1].Line - 1, nil
95+
}
96+
}
97+
98+
return nextProcessorOrEndOfPipeline(content)
7899
}
79100

80-
// lastLine returns the last (greater) line number used by a yaml.Node.
81-
func lastLine(node *yaml.Node) int {
101+
// lastProcessorLine get the line before the node after the processors node. If there is none, it returns the end of file line
102+
func nextProcessorOrEndOfPipeline(content []byte) (int, error) {
103+
var root yaml.Node
104+
if err := yaml.Unmarshal(content, &root); err != nil {
105+
return 0, fmt.Errorf("error unmarshaling YAML: %v", err)
106+
}
107+
108+
var nodes []*yaml.Node
109+
extractNodesFromMapping(&root, &nodes)
110+
for i, node := range nodes {
111+
112+
if node.Value == "processors" {
113+
if i < len(nodes)-1 {
114+
115+
return nodes[i+1].Line - 1, nil
116+
}
117+
}
118+
119+
}
120+
return countLinesInBytes(content)
121+
}
122+
123+
// extractNodesFromMapping recursively extracts all nodes from MappingNodes within DocumentNodes.
124+
func extractNodesFromMapping(node *yaml.Node, nodes *[]*yaml.Node) {
82125
if node == nil {
83-
return 0
126+
return
84127
}
85-
last := node.Line
86-
for _, inner := range node.Content {
87-
if line := lastLine(inner); line > last {
88-
last = line
128+
129+
if node.Kind == yaml.DocumentNode {
130+
for _, child := range node.Content {
131+
extractNodesFromMapping(child, nodes)
89132
}
90-
// For scalar node with multiline content, calculate the last line based on line breaks
91-
if inner.Kind == yaml.ScalarNode && strings.Contains(inner.Value, "\n") {
92-
lineCount := strings.Count(inner.Value, "\n")
93-
last += lineCount
133+
return
134+
}
135+
136+
if node.Kind == yaml.MappingNode {
137+
for _, child := range node.Content {
138+
if child.Kind == yaml.MappingNode || child.Kind == yaml.ScalarNode {
139+
*nodes = append(*nodes, child)
140+
}
141+
extractNodesFromMapping(child, nodes)
94142
}
95143
}
144+
}
145+
146+
// countLinesInBytes counts the number of lines in the given byte slice.
147+
func countLinesInBytes(data []byte) (int, error) {
148+
scanner := bufio.NewScanner(bytes.NewReader(data))
149+
lineCount := 0
150+
151+
for scanner.Scan() {
152+
lineCount++
153+
}
154+
155+
if err := scanner.Err(); err != nil {
156+
return 0, fmt.Errorf("error reading data: %w", err)
157+
}
96158

97-
return last
159+
return lineCount, nil
98160
}

internal/elasticsearch/ingest/processors_test.go

Lines changed: 113 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -24,61 +24,104 @@ func TestResource_Processors(t *testing.T) {
2424
content: []byte(`---
2525
description: Made up pipeline
2626
processors:
27-
# First processor.
28-
- grok:
29-
tag: Extract header
30-
field: message
31-
patterns:
32-
- \[%{APACHE_TIME:apache.error.timestamp}\] \[%{LOGLEVEL:log.level}\]( \[client
33-
%{IPORHOST:source.address}(:%{POSINT:source.port})?\])? %{GREEDYDATA:message}
34-
- \[%{APACHE_TIME:apache.error.timestamp}\] \[%{DATA:apache.error.module}:%{LOGLEVEL:log.level}\]
35-
\[pid %{NUMBER:process.pid:long}(:tid %{NUMBER:process.thread.id:long})?\](
36-
\[client %{IPORHOST:source.address}(:%{POSINT:source.port})?\])? %{GREEDYDATA:message}
37-
pattern_definitions:
38-
APACHE_TIME: '%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}'
39-
ignore_missing: true
27+
- grok:
28+
tag: Extract header
29+
field: message
30+
patterns:
31+
- \[%{APACHE_TIME:apache.error.timestamp}\] \[%{LOGLEVEL:log.level}\]( \[client%{IPORHOST:source.address}(:%{POSINT:source.port})?\])? %{GREEDYDATA:message}
32+
- \[%{APACHE_TIME:apache.error.timestamp}\] \[%{DATA:apache.error.module}:%{LOGLEVEL:log.level}\]
33+
ignore_missing: true
4034
41-
- date:
42-
field: apache.error.timestamp
43-
target_field: '@timestamp'
44-
formats:
45-
- EEE MMM dd H:m:s yyyy
46-
- EEE MMM dd H:m:s.SSSSSS yyyy
47-
on_failure:
48-
- append:
35+
- date:
36+
field: apache.error.timestamp
37+
target_field: '@timestamp'
38+
formats:
39+
- EEE MMM dd H:m:s yyyy
40+
- EEE MMM dd H:m:s.SSSSSS yyyy
41+
on_failure:
42+
- append:
43+
field: error.message
44+
value: '{{ _ingest.on_failure_message }}'
45+
- set:
46+
description: Set event category
47+
field: event.category
48+
value: web
49+
# Some script
50+
- script:
51+
lang: painless
52+
53+
- grok:
54+
field: source.address
55+
ignore_missing: true
56+
patterns:
57+
- ^(%{IP:source.ip}|%{HOSTNAME:source.domain})$
58+
- rename:
59+
field: source.as.organization_name
60+
target_field: source.as.organization.name
61+
ignore_missing: true
62+
on_failure:
63+
- set:
4964
field: error.message
5065
value: '{{ _ingest.on_failure_message }}'
51-
- set:
52-
description: Set event category
53-
field: event.category
54-
value: web
55-
# Some script
56-
- script:
57-
lang: painless
58-
source: >-
59-
[...]
66+
`),
67+
expected: []Processor{
68+
{Type: "grok", FirstLine: 4, LastLine: 11},
69+
{Type: "date", FirstLine: 12, LastLine: 21},
70+
{Type: "set", FirstLine: 22, LastLine: 26},
71+
{Type: "script", FirstLine: 27, LastLine: 29},
72+
{Type: "grok", FirstLine: 30, LastLine: 34},
73+
{Type: "rename", FirstLine: 35, LastLine: 38},
74+
},
75+
},
76+
{
77+
name: "Yaml pipeline",
78+
format: "yml",
79+
content: []byte(`---
80+
description: Made up pipeline
81+
processors:
82+
- grok:
83+
tag: Extract header
84+
field: message
85+
patterns:
86+
- \[%{APACHE_TIME:apache.error.timestamp}\] \[%{LOGLEVEL:log.level}\]( \[client%{IPORHOST:source.address}(:%{POSINT:source.port})?\])? %{GREEDYDATA:message}
87+
- \[%{APACHE_TIME:apache.error.timestamp}\] \[%{DATA:apache.error.module}:%{LOGLEVEL:log.level}\]
88+
ignore_missing: true
6089
61-
- grok:
62-
field: source.address
63-
ignore_missing: true
64-
patterns:
65-
- ^(%{IP:source.ip}|%{HOSTNAME:source.domain})$
66-
- rename:
67-
field: source.as.organization_name
68-
target_field: source.as.organization.name
69-
ignore_missing: true
70-
on_failure:
71-
- set:
72-
field: error.message
73-
value: '{{ _ingest.on_failure_message }}'
90+
- date:
91+
field: apache.error.timestamp
92+
target_field: '@timestamp'
93+
formats:
94+
- EEE MMM dd H:m:s yyyy
95+
- EEE MMM dd H:m:s.SSSSSS yyyy
96+
on_failure:
97+
- append:
98+
field: error.message
99+
value: '{{ _ingest.on_failure_message }}'
100+
- set:
101+
description: Set event category
102+
field: event.category
103+
value: web
104+
# Some script
105+
- script:
106+
lang: painless
107+
108+
- grok:
109+
field: source.address
110+
ignore_missing: true
111+
patterns:
112+
- ^(%{IP:source.ip}|%{HOSTNAME:source.domain})$
113+
- rename:
114+
field: source.as.organization_name
115+
target_field: source.as.organization.name
116+
ignore_missing: true
74117
`),
75118
expected: []Processor{
76-
{Type: "grok", FirstLine: 5, LastLine: 16},
77-
{Type: "date", FirstLine: 18, LastLine: 27},
78-
{Type: "set", FirstLine: 28, LastLine: 31},
79-
{Type: "script", FirstLine: 33, LastLine: 35},
80-
{Type: "grok", FirstLine: 38, LastLine: 42},
81-
{Type: "rename", FirstLine: 43, LastLine: 46},
119+
{Type: "grok", FirstLine: 4, LastLine: 11},
120+
{Type: "date", FirstLine: 12, LastLine: 21},
121+
{Type: "set", FirstLine: 22, LastLine: 26},
122+
{Type: "script", FirstLine: 27, LastLine: 29},
123+
{Type: "grok", FirstLine: 30, LastLine: 34},
124+
{Type: "rename", FirstLine: 35, LastLine: 38},
82125
},
83126
},
84127
{
@@ -109,10 +152,10 @@ on_failure:
109152
`),
110153
expected: []Processor{
111154
{Type: "drop", FirstLine: 3, LastLine: 3},
112-
{Type: "set", FirstLine: 4, LastLine: 7},
155+
{Type: "set", FirstLine: 4, LastLine: 8},
113156
{Type: "remove", FirstLine: 9, LastLine: 9},
114157
{Type: "set", FirstLine: 9, LastLine: 9},
115-
{Type: "set", FirstLine: 10, LastLine: 13},
158+
{Type: "set", FirstLine: 10, LastLine: 15},
116159
},
117160
},
118161
{
@@ -155,7 +198,7 @@ on_failure:
155198
"processors": [{"drop": {"if":"ctx.drop!=null"}}]
156199
}`),
157200
expected: []Processor{
158-
{Type: "drop", FirstLine: 3, LastLine: 3},
201+
{Type: "drop", FirstLine: 3, LastLine: 4},
159202
},
160203
},
161204
{
@@ -173,7 +216,7 @@ on_failure:
173216
]
174217
}`),
175218
expected: []Processor{
176-
{Type: "script", FirstLine: 3, LastLine: 10},
219+
{Type: "script", FirstLine: 3, LastLine: 11},
177220
// Source will be processed as multiline:
178221
// "source": """
179222
// String[] envSplit = ctx['env'].splitOnToken(params['delimiter']);
@@ -222,6 +265,22 @@ processors:
222265
{Type: "script", FirstLine: 3, LastLine: 6},
223266
},
224267
},
268+
{
269+
name: "Yaml script with empty line characters",
270+
format: "yml",
271+
content: []byte(`---
272+
processors:
273+
- script:
274+
description: Do something.
275+
tag: script_drop_null_empty_values
276+
lang: painless
277+
source: "def a = b \n
278+
; def b = 2; \n"
279+
`),
280+
expected: []Processor{
281+
{Type: "script", FirstLine: 3, LastLine: 8},
282+
},
283+
},
225284
{
226285
name: "Yaml empty processor",
227286
format: "yml",
@@ -249,7 +308,7 @@ processors:
249308
def b = 2;
250309
`),
251310
expected: []Processor{
252-
{Type: "set", FirstLine: 4, LastLine: 6},
311+
{Type: "set", FirstLine: 4, LastLine: 8},
253312
{Type: "script", FirstLine: 9, LastLine: 12},
254313
},
255314
},

0 commit comments

Comments
 (0)