Skip to content

Commit c266d6f

Browse files
committed
Add ability for local output configuration to override policy configuration.
1 parent 5c5b174 commit c266d6f

File tree

4 files changed

+267
-15
lines changed

4 files changed

+267
-15
lines changed

internal/pkg/agent/application/application.go

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package application
77
import (
88
"context"
99
"fmt"
10+
"github.com/elastic/go-ucfg"
1011
"time"
1112

1213
"go.elastic.co/apm/v2"
@@ -215,7 +216,7 @@ func New(
215216
if err != nil {
216217
return nil, nil, nil, err
217218
}
218-
configMgr = coordinator.NewConfigPatchManager(managed, PatchAPMConfig(log, rawConfig))
219+
configMgr = coordinator.NewConfigPatchManager(managed, injectOutputOverrides(log, rawConfig), PatchAPMConfig(log, rawConfig))
219220
}
220221
}
221222

@@ -305,3 +306,70 @@ func mergeFleetConfig(ctx context.Context, rawConfig *config.Config) (storage.St
305306

306307
return store, cfg, nil
307308
}
309+
310+
// injectOutputOverrides takes local configuration for specific outputs and applies them to the configuration.
311+
//
312+
// The name of the output must match or no options will be overwritten.
313+
func injectOutputOverrides(log *logger.Logger, rawConfig *config.Config) func(change coordinator.ConfigChange) coordinator.ConfigChange {
314+
// merging uses no resolving as the AST variable substitution occurs on the outputs
315+
// append the values to arrays (don't allow complete overriding of arrays)
316+
mergeOpts := config.NoResolveOptions
317+
mergeOpts = append(mergeOpts, ucfg.AppendValues)
318+
319+
// parse the outputs defined local in the configuration
320+
// in the case the configuration as no outputs defined (most cases) then noop can be used
321+
var parsed struct {
322+
Outputs map[string]*ucfg.Config `config:"outputs"`
323+
}
324+
err := rawConfig.UnpackTo(&parsed)
325+
if err != nil {
326+
log.Errorf("error decoding raw config, output injection disabled: %v", err)
327+
return noop
328+
}
329+
if len(parsed.Outputs) == 0 {
330+
return noop
331+
}
332+
333+
return func(change coordinator.ConfigChange) coordinator.ConfigChange {
334+
cfg := change.Config()
335+
outputs, err := cfg.Agent.Child("outputs", -1)
336+
if err != nil {
337+
// no outputs in configuration; do nothing
338+
return change
339+
}
340+
for outputName, outputOverrides := range parsed.Outputs {
341+
cfgOutput, err := outputs.Child(outputName, -1)
342+
if err != nil {
343+
// no output with that name; do nothing
344+
continue
345+
}
346+
// the order of merging is important
347+
//
348+
// this merges the ConfigChange on-top of the rawConfig to ensure that the
349+
// ConfigChange options always override local options
350+
//
351+
// meaning that local options are only applied in the case that the ConfigChange
352+
// doesn't provide a different value for those fields
353+
err = func() error {
354+
clone, err := ucfg.NewFrom(outputOverrides, mergeOpts...)
355+
if err != nil {
356+
return fmt.Errorf("failed to clone output overrides: %w", err)
357+
}
358+
err = clone.Merge(cfgOutput, mergeOpts...)
359+
if err != nil {
360+
return fmt.Errorf("failed to merge output over overrides: %w", err)
361+
}
362+
err = outputs.SetChild(outputName, -1, clone, mergeOpts...)
363+
if err != nil {
364+
return fmt.Errorf("failed to re-set output with overrides: %w", err)
365+
}
366+
return nil
367+
}()
368+
if err != nil {
369+
log.Errorf("failed to perform output injection for output %s: %v", outputName, err)
370+
continue
371+
}
372+
}
373+
return change
374+
}
375+
}

internal/pkg/agent/application/application_test.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,184 @@ func TestLimitsLog(t *testing.T) {
8686
logs := obs.FilterMessageSnippet(expLogLine)
8787
require.Equalf(t, 1, logs.Len(), "expected one log message about limits change")
8888
}
89+
90+
func TestInjectOutputOverrides(t *testing.T) {
91+
scenarios := []struct {
92+
Name string
93+
RawConfig map[string]any
94+
ChangeConfig map[string]any
95+
Result map[string]any
96+
}{
97+
{
98+
Name: "rawConfig no outputs",
99+
RawConfig: map[string]any{
100+
"inputs": []any{},
101+
},
102+
ChangeConfig: map[string]any{
103+
"outputs": map[string]any{
104+
"default": map[string]any{
105+
"type": "elasticsearch",
106+
},
107+
},
108+
},
109+
Result: map[string]any{
110+
"outputs": map[string]any{
111+
"default": map[string]any{
112+
"type": "elasticsearch",
113+
},
114+
},
115+
},
116+
},
117+
{
118+
Name: "change config no outputs",
119+
RawConfig: map[string]any{
120+
"outputs": map[string]any{
121+
"default": map[string]any{
122+
"type": "elasticsearch",
123+
},
124+
},
125+
},
126+
ChangeConfig: map[string]any{
127+
"inputs": []any{},
128+
},
129+
Result: map[string]any{
130+
"inputs": []any{},
131+
},
132+
},
133+
{
134+
Name: "mismatch output",
135+
RawConfig: map[string]any{
136+
"outputs": map[string]any{
137+
"default": map[string]any{
138+
"type": "elasticsearch",
139+
"headers": map[string]any{
140+
"X-App-Auth": "token-123",
141+
},
142+
},
143+
},
144+
},
145+
ChangeConfig: map[string]any{
146+
"outputs": map[string]any{
147+
"elasticsearch": map[string]any{
148+
"type": "elasticsearch",
149+
},
150+
},
151+
},
152+
Result: map[string]any{
153+
"outputs": map[string]any{
154+
"elasticsearch": map[string]any{
155+
"type": "elasticsearch",
156+
},
157+
},
158+
},
159+
},
160+
{
161+
Name: "simple merge",
162+
RawConfig: map[string]any{
163+
"outputs": map[string]any{
164+
"default": map[string]any{
165+
"type": "elasticsearch",
166+
"headers": map[string]any{
167+
"X-App-Auth": "token-123",
168+
},
169+
},
170+
},
171+
},
172+
ChangeConfig: map[string]any{
173+
"outputs": map[string]any{
174+
"default": map[string]any{
175+
"type": "elasticsearch",
176+
},
177+
},
178+
},
179+
Result: map[string]any{
180+
"outputs": map[string]any{
181+
"default": map[string]any{
182+
"type": "elasticsearch",
183+
"headers": map[string]any{
184+
"X-App-Auth": "token-123",
185+
},
186+
},
187+
},
188+
},
189+
},
190+
{
191+
Name: "simple merge array",
192+
RawConfig: map[string]any{
193+
"outputs": map[string]any{
194+
"default": map[string]any{
195+
"type": "elasticsearch",
196+
"headers": map[string]any{
197+
"X-App-Auth": "token-123",
198+
},
199+
},
200+
},
201+
},
202+
ChangeConfig: map[string]any{
203+
"outputs": map[string]any{
204+
"default": map[string]any{
205+
"type": "elasticsearch",
206+
"headers": map[string]any{
207+
"X-Other-Field": "field-123",
208+
},
209+
},
210+
},
211+
},
212+
Result: map[string]any{
213+
"outputs": map[string]any{
214+
"default": map[string]any{
215+
"type": "elasticsearch",
216+
"headers": map[string]any{
217+
"X-App-Auth": "token-123",
218+
"X-Other-Field": "field-123",
219+
},
220+
},
221+
},
222+
},
223+
},
224+
{
225+
Name: "override setting from change",
226+
RawConfig: map[string]any{
227+
"outputs": map[string]any{
228+
"default": map[string]any{
229+
"type": "elasticsearch",
230+
"headers": map[string]any{
231+
"X-App-Auth": "token-123",
232+
},
233+
},
234+
},
235+
},
236+
ChangeConfig: map[string]any{
237+
"outputs": map[string]any{
238+
"default": map[string]any{
239+
"type": "kafka",
240+
"headers": map[string]any{
241+
"X-App-Auth": "token-546",
242+
},
243+
},
244+
},
245+
},
246+
Result: map[string]any{
247+
"outputs": map[string]any{
248+
"default": map[string]any{
249+
"type": "kafka",
250+
"headers": map[string]any{
251+
"X-App-Auth": "token-546",
252+
},
253+
},
254+
},
255+
},
256+
},
257+
}
258+
for _, scenario := range scenarios {
259+
t.Run(scenario.Name, func(t *testing.T) {
260+
log, _ := loggertest.New(t.Name())
261+
rawConfig := config.MustNewConfigFrom(scenario.RawConfig)
262+
cc := &mockConfigChange{c: config.MustNewConfigFrom(scenario.ChangeConfig)}
263+
observed := injectOutputOverrides(log, rawConfig)(cc).Config()
264+
observedMap, err := observed.ToMapStr()
265+
require.NoError(t, err)
266+
assert.Equal(t, scenario.Result, observedMap)
267+
})
268+
}
269+
}

internal/pkg/agent/application/coordinator/config_patcher.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ type ConfigPatch func(change ConfigChange) ConfigChange
1212

1313
// ConfigPatchManager is a decorator to restore some agent settings from the elastic agent configuration file
1414
type ConfigPatchManager struct {
15-
inner ConfigManager
16-
outCh chan ConfigChange
17-
patchFn ConfigPatch
15+
inner ConfigManager
16+
outCh chan ConfigChange
17+
patchFns []ConfigPatch
1818
}
1919

2020
func (c ConfigPatchManager) Run(ctx context.Context) error {
@@ -36,14 +36,17 @@ func (c ConfigPatchManager) Watch() <-chan ConfigChange {
3636

3737
func (c ConfigPatchManager) patch(src <-chan ConfigChange, dst chan ConfigChange) {
3838
for ccc := range src {
39-
dst <- c.patchFn(ccc)
39+
for _, patchFn := range c.patchFns {
40+
ccc = patchFn(ccc)
41+
}
42+
dst <- ccc
4043
}
4144
}
4245

43-
func NewConfigPatchManager(inner ConfigManager, pf ConfigPatch) *ConfigPatchManager {
46+
func NewConfigPatchManager(inner ConfigManager, pfs ...ConfigPatch) *ConfigPatchManager {
4447
return &ConfigPatchManager{
45-
inner: inner,
46-
outCh: make(chan ConfigChange),
47-
patchFn: pf,
48+
inner: inner,
49+
outCh: make(chan ConfigChange),
50+
patchFns: pfs,
4851
}
4952
}

internal/pkg/config/config.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ func VarSkipKeys(keys ...string) Option {
4444
}
4545
}
4646

47-
// noResolveOptions don't do any resolve of variables.
48-
var noResolveOptions = []ucfg.Option{
47+
// NoResolveOptions don't do any resolve of variables.
48+
var NoResolveOptions = []ucfg.Option{
4949
ucfg.PathSep("."),
5050
ucfg.IgnoreCommas,
5151
ucfg.ResolveNOOP,
@@ -137,7 +137,7 @@ func NewConfigFrom(from interface{}, opts ...interface{}) (*Config, error) {
137137
return nil, err
138138
}
139139
if len(skippedKeys) > 0 {
140-
err = cfg.Merge(skippedKeys, noResolveOptions...)
140+
err = cfg.Merge(skippedKeys, NoResolveOptions...)
141141
if err != nil {
142142
return nil, err
143143
}
@@ -238,14 +238,14 @@ func (c *Config) ToMapStr(opts ...interface{}) (map[string]interface{}, error) {
238238
var subUnpacked interface{}
239239
if subCfg.IsDict() {
240240
var subDict map[string]interface{}
241-
err = subCfg.Unpack(&subDict, noResolveOptions...)
241+
err = subCfg.Unpack(&subDict, NoResolveOptions...)
242242
if err != nil {
243243
return nil, fmt.Errorf("error unpacking subdict object in config for skip key %s: %w", skip, err)
244244
}
245245
subUnpacked = subDict
246246
} else if subCfg.IsArray() {
247247
var subArr []interface{}
248-
err = subCfg.Unpack(&subArr, noResolveOptions...)
248+
err = subCfg.Unpack(&subArr, NoResolveOptions...)
249249
if err != nil {
250250
return nil, fmt.Errorf("error unpacking subarray in config for skip key %s: %w ", skip, err)
251251
}
@@ -273,7 +273,7 @@ func (c *Config) ToMapStr(opts ...interface{}) (map[string]interface{}, error) {
273273
m[k] = v
274274
}
275275
if len(skippedKeysOrig) > 0 {
276-
err := c.access().Merge(skippedKeysOrig, noResolveOptions...)
276+
err := c.access().Merge(skippedKeysOrig, NoResolveOptions...)
277277
if err != nil {
278278
return nil, fmt.Errorf("error merging config with skipped key config: %w", err)
279279
}

0 commit comments

Comments
 (0)