Skip to content

Commit bed4eff

Browse files
authored
Add ability for local output configuration to add to policy configuration (#8766)
* Add ability for local output configuration to override policy configuration. * Fix import. * Add extra test for variables. * Improvements from code review. * Disable errorlint on isMissingError.
1 parent 676e18d commit bed4eff

File tree

4 files changed

+320
-15
lines changed

4 files changed

+320
-15
lines changed

internal/pkg/agent/application/application.go

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111

1212
"go.elastic.co/apm/v2"
1313

14+
"github.com/elastic/go-ucfg"
15+
1416
"github.com/elastic/elastic-agent-libs/logp"
1517

1618
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
@@ -215,7 +217,7 @@ func New(
215217
if err != nil {
216218
return nil, nil, nil, err
217219
}
218-
configMgr = coordinator.NewConfigPatchManager(managed, PatchAPMConfig(log, rawConfig))
220+
configMgr = coordinator.NewConfigPatchManager(managed, injectOutputOverrides(log, rawConfig), PatchAPMConfig(log, rawConfig))
219221
}
220222
}
221223

@@ -305,3 +307,88 @@ func mergeFleetConfig(ctx context.Context, rawConfig *config.Config) (storage.St
305307

306308
return store, cfg, nil
307309
}
310+
311+
// injectOutputOverrides takes local configuration for specific outputs and applies them to the configuration.
312+
//
313+
// The name of the output must match or no options will be overwritten.
314+
func injectOutputOverrides(log *logger.Logger, rawConfig *config.Config) func(change coordinator.ConfigChange) coordinator.ConfigChange {
315+
// merging uses no resolving as the AST variable substitution occurs on the outputs
316+
// append the values to arrays (don't allow complete overriding of arrays)
317+
mergeOpts := config.NoResolveOptions
318+
mergeOpts = append(mergeOpts, ucfg.AppendValues)
319+
320+
// parse the outputs defined local in the configuration
321+
// in the case the configuration as no outputs defined (most cases) then noop can be used
322+
var parsed struct {
323+
Outputs map[string]*ucfg.Config `config:"outputs"`
324+
}
325+
err := rawConfig.UnpackTo(&parsed)
326+
if err != nil {
327+
log.Errorf("error decoding raw config, output injection disabled: %v", err)
328+
return noop
329+
}
330+
if len(parsed.Outputs) == 0 {
331+
return noop
332+
}
333+
334+
return func(change coordinator.ConfigChange) coordinator.ConfigChange {
335+
cfg := change.Config()
336+
outputs, err := cfg.Agent.Child("outputs", -1)
337+
if err != nil {
338+
if !isMissingError(err) {
339+
// expecting only ErrMissing
340+
log.Errorf("error getting outputs from config: %v", err)
341+
}
342+
return change
343+
}
344+
for outputName, outputOverrides := range parsed.Outputs {
345+
cfgOutput, err := outputs.Child(outputName, -1)
346+
if err != nil {
347+
// no output with that name; do nothing
348+
continue
349+
}
350+
// the order of merging is important
351+
//
352+
// this merges the ConfigChange on-top of the rawConfig to ensure that the
353+
// ConfigChange options always override local options
354+
//
355+
// meaning that local options are only applied in the case that the ConfigChange
356+
// doesn't provide a different value for those fields
357+
err = func() error {
358+
clone, err := ucfg.NewFrom(outputOverrides, mergeOpts...)
359+
if err != nil {
360+
return fmt.Errorf("failed to clone output overrides: %w", err)
361+
}
362+
err = clone.Merge(cfgOutput, mergeOpts...)
363+
if err != nil {
364+
return fmt.Errorf("failed to merge output over overrides: %w", err)
365+
}
366+
err = outputs.SetChild(outputName, -1, clone, mergeOpts...)
367+
if err != nil {
368+
return fmt.Errorf("failed to re-set output with overrides: %w", err)
369+
}
370+
return nil
371+
}()
372+
if err != nil {
373+
log.Errorf("failed to perform output injection for output %s: %v", outputName, err)
374+
continue
375+
}
376+
log.Infof("successfully injected output overrides for output %s", outputName)
377+
}
378+
return change
379+
}
380+
}
381+
382+
// isMissingError returns true if the error is because the field is missing
383+
//
384+
// Sadly go-ucfg doesn't support Unwrap interface so using `errors.Is(err, ucfg.ErrMissing)` doesn't work
385+
// this specific function is required to ensure its an `ErrMissing` error.
386+
func isMissingError(err error) bool {
387+
//nolint:errorlint // limitation of go-ucfg (read docstring)
388+
switch v := err.(type) {
389+
case ucfg.Error:
390+
//nolint:errorlint // limitation of go-ucfg (read docstring)
391+
return v.Reason() == ucfg.ErrMissing
392+
}
393+
return false
394+
}

internal/pkg/agent/application/application_test.go

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,218 @@ func TestLimitsLog(t *testing.T) {
8686
logs := obs.FilterMessageSnippet(expLogLine)
8787
require.Equalf(t, 1, logs.Len(), "expected one log message about limits change")
8888
}
89+
90+
func TestInjectOutputOverrides(t *testing.T) {
91+
scenarios := []struct {
92+
Name string
93+
RawConfig map[string]any
94+
ChangeConfig map[string]any
95+
Result map[string]any
96+
}{
97+
{
98+
Name: "rawConfig no outputs",
99+
RawConfig: map[string]any{
100+
"inputs": []any{},
101+
},
102+
ChangeConfig: map[string]any{
103+
"outputs": map[string]any{
104+
"default": map[string]any{
105+
"type": "elasticsearch",
106+
},
107+
},
108+
},
109+
Result: map[string]any{
110+
"outputs": map[string]any{
111+
"default": map[string]any{
112+
"type": "elasticsearch",
113+
},
114+
},
115+
},
116+
},
117+
{
118+
Name: "change config no outputs",
119+
RawConfig: map[string]any{
120+
"outputs": map[string]any{
121+
"default": map[string]any{
122+
"type": "elasticsearch",
123+
},
124+
},
125+
},
126+
ChangeConfig: map[string]any{
127+
"inputs": []any{},
128+
},
129+
Result: map[string]any{
130+
"inputs": []any{},
131+
},
132+
},
133+
{
134+
Name: "mismatch output",
135+
RawConfig: map[string]any{
136+
"outputs": map[string]any{
137+
"default": map[string]any{
138+
"type": "elasticsearch",
139+
"headers": map[string]any{
140+
"X-App-Auth": "token-123",
141+
},
142+
},
143+
},
144+
},
145+
ChangeConfig: map[string]any{
146+
"outputs": map[string]any{
147+
"elasticsearch": map[string]any{
148+
"type": "elasticsearch",
149+
},
150+
},
151+
},
152+
Result: map[string]any{
153+
"outputs": map[string]any{
154+
"elasticsearch": map[string]any{
155+
"type": "elasticsearch",
156+
},
157+
},
158+
},
159+
},
160+
{
161+
Name: "simple merge",
162+
RawConfig: map[string]any{
163+
"outputs": map[string]any{
164+
"default": map[string]any{
165+
"type": "elasticsearch",
166+
"headers": map[string]any{
167+
"X-App-Auth": "token-123",
168+
},
169+
},
170+
},
171+
},
172+
ChangeConfig: map[string]any{
173+
"outputs": map[string]any{
174+
"default": map[string]any{
175+
"type": "elasticsearch",
176+
},
177+
},
178+
},
179+
Result: map[string]any{
180+
"outputs": map[string]any{
181+
"default": map[string]any{
182+
"type": "elasticsearch",
183+
"headers": map[string]any{
184+
"X-App-Auth": "token-123",
185+
},
186+
},
187+
},
188+
},
189+
},
190+
{
191+
Name: "simple merge array",
192+
RawConfig: map[string]any{
193+
"outputs": map[string]any{
194+
"default": map[string]any{
195+
"type": "elasticsearch",
196+
"headers": map[string]any{
197+
"X-App-Auth": "token-123",
198+
},
199+
},
200+
},
201+
},
202+
ChangeConfig: map[string]any{
203+
"outputs": map[string]any{
204+
"default": map[string]any{
205+
"type": "elasticsearch",
206+
"headers": map[string]any{
207+
"X-Other-Field": "field-123",
208+
},
209+
},
210+
},
211+
},
212+
Result: map[string]any{
213+
"outputs": map[string]any{
214+
"default": map[string]any{
215+
"type": "elasticsearch",
216+
"headers": map[string]any{
217+
"X-App-Auth": "token-123",
218+
"X-Other-Field": "field-123",
219+
},
220+
},
221+
},
222+
},
223+
},
224+
{
225+
Name: "override setting from change",
226+
RawConfig: map[string]any{
227+
"outputs": map[string]any{
228+
"default": map[string]any{
229+
"type": "elasticsearch",
230+
"headers": map[string]any{
231+
"X-App-Auth": "token-123",
232+
},
233+
},
234+
},
235+
},
236+
ChangeConfig: map[string]any{
237+
"outputs": map[string]any{
238+
"default": map[string]any{
239+
"type": "kafka",
240+
"headers": map[string]any{
241+
"X-App-Auth": "token-546",
242+
},
243+
},
244+
},
245+
},
246+
Result: map[string]any{
247+
"outputs": map[string]any{
248+
"default": map[string]any{
249+
"type": "kafka",
250+
"headers": map[string]any{
251+
"X-App-Auth": "token-546",
252+
},
253+
},
254+
},
255+
},
256+
},
257+
{
258+
Name: "setting variables are not expanded",
259+
RawConfig: map[string]any{
260+
"outputs": map[string]any{
261+
"default": map[string]any{
262+
"type": "elasticsearch",
263+
"headers": map[string]any{
264+
"X-App-Auth": "${filesource.app_token}",
265+
},
266+
},
267+
},
268+
},
269+
ChangeConfig: map[string]any{
270+
"outputs": map[string]any{
271+
"default": map[string]any{
272+
"type": "kafka",
273+
"headers": map[string]any{
274+
"X-App-Other": "${filesource.other_token}",
275+
},
276+
},
277+
},
278+
},
279+
Result: map[string]any{
280+
"outputs": map[string]any{
281+
"default": map[string]any{
282+
"type": "kafka",
283+
"headers": map[string]any{
284+
"X-App-Auth": "${filesource.app_token}",
285+
"X-App-Other": "${filesource.other_token}",
286+
},
287+
},
288+
},
289+
},
290+
},
291+
}
292+
for _, scenario := range scenarios {
293+
t.Run(scenario.Name, func(t *testing.T) {
294+
log, _ := loggertest.New(t.Name())
295+
rawConfig := config.MustNewConfigFrom(scenario.RawConfig)
296+
cc := &mockConfigChange{c: config.MustNewConfigFrom(scenario.ChangeConfig)}
297+
observed := injectOutputOverrides(log, rawConfig)(cc).Config()
298+
observedMap, err := observed.ToMapStr()
299+
require.NoError(t, err)
300+
assert.Equal(t, scenario.Result, observedMap)
301+
})
302+
}
303+
}

internal/pkg/agent/application/coordinator/config_patcher.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ type ConfigPatch func(change ConfigChange) ConfigChange
1212

1313
// ConfigPatchManager is a decorator to restore some agent settings from the elastic agent configuration file
1414
type ConfigPatchManager struct {
15-
inner ConfigManager
16-
outCh chan ConfigChange
17-
patchFn ConfigPatch
15+
inner ConfigManager
16+
outCh chan ConfigChange
17+
patchFns []ConfigPatch
1818
}
1919

2020
func (c ConfigPatchManager) Run(ctx context.Context) error {
@@ -36,14 +36,17 @@ func (c ConfigPatchManager) Watch() <-chan ConfigChange {
3636

3737
func (c ConfigPatchManager) patch(src <-chan ConfigChange, dst chan ConfigChange) {
3838
for ccc := range src {
39-
dst <- c.patchFn(ccc)
39+
for _, patchFn := range c.patchFns {
40+
ccc = patchFn(ccc)
41+
}
42+
dst <- ccc
4043
}
4144
}
4245

43-
func NewConfigPatchManager(inner ConfigManager, pf ConfigPatch) *ConfigPatchManager {
46+
func NewConfigPatchManager(inner ConfigManager, pfs ...ConfigPatch) *ConfigPatchManager {
4447
return &ConfigPatchManager{
45-
inner: inner,
46-
outCh: make(chan ConfigChange),
47-
patchFn: pf,
48+
inner: inner,
49+
outCh: make(chan ConfigChange),
50+
patchFns: pfs,
4851
}
4952
}

0 commit comments

Comments
 (0)