Skip to content

Commit

Permalink
[discovery] Move continuous discovery config parts to static files (#…
Browse files Browse the repository at this point in the history
…5381)

We cannot assume that ${SPLUNK_ACCESS_TOKEN} and ${SPLUNK_REALM} are aways set. It's not the case for the helm chart. We should move static config part of continuous discovery from discoverer to the config files. Having it in the `agent_config.yaml` has no overhead until the continuous discovery is enabled.
  • Loading branch information
dmitryax authored Sep 17, 2024
1 parent 6a5d42a commit b05792c
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 28 deletions.
11 changes: 11 additions & 0 deletions cmd/otelcol/config/collector/agent_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ receivers:
#access_token_passthrough: true
zipkin:
endpoint: "${SPLUNK_LISTEN_INTERFACE}:9411"
nop:

processors:
batch:
Expand Down Expand Up @@ -137,6 +138,11 @@ exporters:
#ingest_url: http://${SPLUNK_GATEWAY_URL}:9943
sync_host_metadata: true
correlation:
# Entities (applicable only if discovery mode is enabled)
otlphttp/entities:
logs_endpoint: "${SPLUNK_INGEST_URL}/v3/event"
headers:
"X-SF-Token": "${SPLUNK_ACCESS_TOKEN}"
# Logs
splunk_hec:
token: "${SPLUNK_HEC_TOKEN}"
Expand Down Expand Up @@ -190,6 +196,11 @@ service:
receivers: [signalfx, smartagent/processlist]
processors: [memory_limiter, batch, resourcedetection]
exporters: [signalfx]
logs/entities:
# Receivers are dynamically added if discovery mode is enabled
receivers: [nop]
processors: [memory_limiter, batch, resourcedetection]
exporters: [otlphttp/entities]
logs:
receivers: [fluentforward, otlp]
processors:
Expand Down
11 changes: 11 additions & 0 deletions cmd/otelcol/config/collector/upstream_agent_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ receivers:
endpoint: 0.0.0.0:9943
zipkin:
endpoint: 0.0.0.0:9411
nop:

processors:
batch:
Expand Down Expand Up @@ -132,6 +133,11 @@ exporters:
#ingest_url: http://${SPLUNK_GATEWAY_URL}:9943
sync_host_metadata: true
correlation:
# Entities (applicable only if discovery mode is enabled)
otlphttp/entities:
logs_endpoint: "${SPLUNK_INGEST_URL}/v3/event"
headers:
"X-SF-Token": "${SPLUNK_ACCESS_TOKEN}"
splunk_hec/profiling:
token: "${SPLUNK_ACCESS_TOKEN}"
endpoint: "https://ingest.${SPLUNK_REALM}.signalfx.com/v1/log"
Expand Down Expand Up @@ -180,6 +186,11 @@ service:
exporters: [signalfx]
# Use instead when sending to gateway
#exporters: [otlp]
logs/entities:
# Receivers are dynamically added if discovery mode is enabled
receivers: [nop]
processors: [memory_limiter, batch, resourcedetection]
exporters: [otlphttp/entities]
# Required for profiling
logs:
receivers: [otlp]
Expand Down
38 changes: 30 additions & 8 deletions internal/configconverter/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package configconverter
import (
"context"
"fmt"
"strings"

"go.opentelemetry.io/collector/confmap"

Expand Down Expand Up @@ -58,7 +59,13 @@ func SetupDiscovery(_ context.Context, in *confmap.Conf) error {
service["extensions"] = appendUnique(serviceExtensions, discoExtensions)
}

metricsPipeline, metricsReceivers, err := getMetricsPipelineAndReceivers(service)
pipelines := map[string]any{}
if pl, ok := service["pipelines"]; ok && pl != nil {
pipelines = pl.(map[string]any)
}
service["pipelines"] = pipelines

metricsPipeline, metricsReceivers, err := getMetricsPipelineAndReceivers(pipelines)
if err != nil {
return err
}
Expand All @@ -67,6 +74,8 @@ func SetupDiscovery(_ context.Context, in *confmap.Conf) error {
metricsPipeline["receivers"] = appendUnique(metricsReceivers, discoReceivers)
}

setEntitiesPipelineReceivers(pipelines, discoReceivers)

setAutoDiscoveryResourceAttribute(service)

*in = *confmap.NewFromStringMap(out)
Expand Down Expand Up @@ -135,13 +144,7 @@ func getDiscoReceivers(service map[string]any) (bool, []any, error) {
return isSet, receivers, nil
}

func getMetricsPipelineAndReceivers(service map[string]any) (map[string]any, []any, error) {
pipelines := map[string]any{}
if pl, ok := service["pipelines"]; ok && pl != nil {
pipelines = pl.(map[string]any)
}
service["pipelines"] = pipelines

func getMetricsPipelineAndReceivers(pipelines map[string]any) (map[string]any, []any, error) {
metricsPipeline := map[string]any{}
if mp, ok := pipelines["metrics"]; ok && mp != nil {
metricsPipeline = mp.(map[string]any)
Expand All @@ -158,6 +161,25 @@ func getMetricsPipelineAndReceivers(service map[string]any) (map[string]any, []a
return metricsPipeline, metricsReceivers, nil
}

func setEntitiesPipelineReceivers(pipelines map[string]any, discoReceivers []any) {
ep, ok := pipelines["logs/entities"].(map[string]any)
if !ok {
// Entities pipeline not set, nothing to do.
return
}

var receivers []any
if existing, ok := ep["receivers"]; ok && existing != nil {
receivers = existing.([]any)
}
for _, r := range discoReceivers {
if strings.HasPrefix(r.(string), "discovery") {
receivers = appendUnique(receivers, []any{r})
}
}
ep["receivers"] = receivers
}

func appendUnique(serviceComponents []any, discoComponents []any) []any {
existing := map[any]struct{}{}
for _, e := range serviceComponents {
Expand Down
81 changes: 81 additions & 0 deletions internal/configconverter/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,87 @@ func TestDiscoveryEmptyReceivers(t *testing.T) {
require.Equal(t, expected.ToStringMap(), in.ToStringMap())
}

func TestContinuousDiscoveryNoEntitiesPipeline(t *testing.T) {
in := confFromYaml(t, `service:
pipelines:
metrics:
processors: [proc/one, proc/two, proc/three]
exporters: [exp/one, exp/two, exp/three]
metrics/untouched:
receivers: [recv/six, recv/seven, recv/eight]
processors: [proc/six, proc/seven, proc/eight]
exporters: [exp/six, exp/seven, exp/eight]
logs/untouched:
receivers: [recv/six]
processors: [proc/six]
exporters: [exp/six]
receivers/splunk.discovery: [discovery/host_observer]
`)

expected := confFromYaml(t, `service:
pipelines:
metrics:
receivers: [discovery/host_observer]
processors: [proc/one, proc/two, proc/three]
exporters: [exp/one, exp/two, exp/three]
metrics/untouched:
receivers: [recv/six, recv/seven, recv/eight]
processors: [proc/six, proc/seven, proc/eight]
exporters: [exp/six, exp/seven, exp/eight]
logs/untouched:
receivers: [recv/six]
processors: [proc/six]
exporters: [exp/six]
telemetry:
resource:
splunk_autodiscovery: "true"
`)

require.NoError(t, SetupDiscovery(context.Background(), in))
require.Equal(t, expected.ToStringMap(), in.ToStringMap())
}

func TestContinuousDiscoveryWithEntitiesPipeline(t *testing.T) {
in := confFromYaml(t, `service:
pipelines:
metrics:
receivers: [recv/one]
processors: [proc/one, proc/two, proc/three]
exporters: [exp/one, exp/two, exp/three]
logs/entities:
receivers: [recv/one, recv/two]
processors: [proc/one, proc/two]
exporters: [exp/one, exp/two]
logs/untouched:
receivers: [recv/three]
processors: [proc/three]
exporters: [exp/three]
receivers/splunk.discovery: [discovery/one, discovery/two]
`)

expected := confFromYaml(t, `service:
pipelines:
metrics:
receivers: [recv/one, discovery/one, discovery/two]
processors: [proc/one, proc/two, proc/three]
exporters: [exp/one, exp/two, exp/three]
logs/entities:
receivers: [recv/one, recv/two, discovery/one, discovery/two]
processors: [proc/one, proc/two]
exporters: [exp/one, exp/two]
logs/untouched:
receivers: [recv/three]
processors: [proc/three]
exporters: [exp/three]
telemetry:
resource:
splunk_autodiscovery: "true"
`)

require.NoError(t, SetupDiscovery(context.Background(), in))
require.Equal(t, expected.ToStringMap(), in.ToStringMap())
}

func confFromYaml(t testing.TB, content string) *confmap.Conf {
var conf map[string]any
if err := yaml.Unmarshal([]byte(content), &conf); err != nil {
Expand Down
15 changes: 0 additions & 15 deletions internal/confmapprovider/discovery/discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,24 +352,9 @@ func (d *discoverer) continuousDiscoveryConfig(cfg *Config, discoveryReceiversCo
dCfg := map[string]any{
"extensions": extensions,
"receivers": discoveryReceiversConfigs,
"exporters": map[string]any{
"otlphttp/entities": map[string]any{
"logs_endpoint": "https://ingest.${SPLUNK_REALM}.signalfx.com/v3/event",
"headers": map[string]any{
"X-SF-Token": "${SPLUNK_ACCESS_TOKEN}",
},
},
},
"service": map[string]any{
discovery.DiscoReceiversKey: receiverIDs,
discovery.DiscoExtensionsKey: observerIDs,
"pipelines": map[string]any{
"logs/entities": map[string]any{
"receivers": receiverIDs,
// TODO: add processors dynamically in the converter: memory_limiter, resource (in k8s only).
"exporters": []string{"otlphttp/entities"},
},
},
},
}
d.logger.Debug("determined continuous discovery config", zap.Any("config", dCfg))
Expand Down
11 changes: 7 additions & 4 deletions internal/confmapprovider/discovery/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/provider/envprovider"
"go.opentelemetry.io/collector/confmap/provider/fileprovider"
"go.opentelemetry.io/collector/exporter/otlphttpexporter"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/otelcol"
Expand Down Expand Up @@ -110,18 +111,20 @@ func TestConfigDProviderInvalidURIs(t *testing.T) {

func TestDiscoveryProvider_ContinuousDiscoveryConfig(t *testing.T) {
require.NoError(t, featuregate.GlobalRegistry().Set(continuousDiscoveryFGKey, true))
t.Setenv("SPLUNK_REALM", "fake-realm")
t.Setenv("SPLUNK_INGEST_URL", "https://ingest.fake-realm.signalfx.com")
t.Setenv("SPLUNK_ACCESS_TOKEN", "fake-token")

confmapProvider, err := New()
require.NoError(t, err)

cfgDir, err := filepath.Abs(filepath.Join(".", "testdata", "config.d"))
require.NoError(t, err)
provider, err := otelcol.NewConfigProvider(otelcol.ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: []string{fmt.Sprintf("%s:%s", discoveryModeScheme, cfgDir)},
URIs: []string{
fmt.Sprintf("file:%s", filepath.Join("testdata", "base-config.yaml")),
fmt.Sprintf("%s:%s", discoveryModeScheme, filepath.Join("testdata", "config.d")),
},
ProviderFactories: []confmap.ProviderFactory{
fileprovider.NewFactory(),
confmapProvider.DiscoveryModeProviderFactory(),
envprovider.NewFactory(),
},
Expand Down
9 changes: 9 additions & 0 deletions internal/confmapprovider/discovery/testdata/base-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
exporters:
otlphttp/entities:
logs_endpoint: "${SPLUNK_INGEST_URL}/v3/event"
headers:
"X-SF-Token": "${SPLUNK_ACCESS_TOKEN}"
service:
pipelines:
logs/entities:
exporters: [otlphttp/entities]
14 changes: 13 additions & 1 deletion tests/general/default_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,12 @@ func TestDefaultAgentConfig(t *testing.T) {
"token": "<redacted>",
"log_data_enabled": false,
},
"otlphttp/entities": map[string]any{
"logs_endpoint": "https://ingest.not.real.signalfx.com/v3/event",
"headers": map[string]any{
"X-SF-Token": "<redacted>",
},
},
},
"extensions": map[string]any{
"health_check": map[string]any{"endpoint": fmt.Sprintf("%s:13133", ip)},
Expand Down Expand Up @@ -362,7 +368,8 @@ func TestDefaultAgentConfig(t *testing.T) {
},
"signalfx": map[string]any{"endpoint": fmt.Sprintf("%s:9943", ip)},
"smartagent/processlist": map[string]any{"type": "processlist"},
"zipkin": map[string]any{"endpoint": fmt.Sprintf("%s:9411", ip)}},
"zipkin": map[string]any{"endpoint": fmt.Sprintf("%s:9411", ip)},
"nop": nil},
"service": map[string]any{
"telemetry": map[string]any{"metrics": map[string]any{"address": fmt.Sprintf("%s:8888", ip)}},
"extensions": []any{"health_check", "http_forwarder", "zpages", "smartagent"},
Expand All @@ -388,6 +395,11 @@ func TestDefaultAgentConfig(t *testing.T) {
"processors": []any{"memory_limiter", "batch", "resourcedetection"},
"receivers": []any{"jaeger", "otlp", "zipkin"},
},
"logs/entities": map[string]any{
"receivers": []any{"nop"},
"processors": []any{"memory_limiter", "batch", "resourcedetection"},
"exporters": []any{"otlphttp/entities"},
},
},
},
}, config)
Expand Down

0 comments on commit b05792c

Please sign in to comment.