From 1fcd7747d21c2294e4941e625f2791a78c1078e0 Mon Sep 17 00:00:00 2001 From: Ashvitha Date: Wed, 2 Aug 2023 13:38:00 -0400 Subject: [PATCH] Backport of [HCP Telemetry] Periodic Refresh for Dynamic Telemetry Configuration into release/1.15.x (#18360) [HCP Telemetry] Periodic Refresh for Dynamic Telemetry Configuration (#18168) * OTElExporter now uses an EndpointProvider to discover the endpoint * OTELSink uses a ConfigProvider to obtain filters and labels configuration * improve tests for otel_sink * Regex logic is moved into client for a method on the TelemetryConfig object * Create a telemetry_config_provider and update deps to use it * Fix conversion * fix import newline * Add logger to hcp client and move telemetry_config out of the client.go file * Add a telemetry_config.go to refactor client.go * Update deps * update hcp deps test * Modify telemetry_config_providers * Check for nil filters * PR review updates * Fix comments and move around pieces * Fix comments * Remove context from client struct * Moved ctx out of sink struct and fixed filters, added a test * Remove named imports, use errors.New if not fformatting * Remove HCP dependencies in telemetry package * Add success metric and move lock only to grab the t.cfgHahs * Update hash * fix nits * Create an equals method and add tests * Improve telemetry_config_provider.go tests * Add race test * Add missing godoc * Remove mock for MetricsClient * Avoid goroutine test panics * trying to kick CI lint issues by upgrading mod * imprve test code and add hasher for testing * Use structure logging for filters, fix error constants, and default to allow all regex * removed hashin and modify logic to simplify * Improve race test and fix PR feedback by removing hash equals and avoid testing the timer.Ticker logic, and instead unit test * Ran make go-mod-tidy * Use errtypes in the test * Add changelog * add safety check for exporter endpoint * remove require.Contains by using error types, fix structure logging, and fix success metric typo in exporter * Fixed race test to have changing config values * Send success metric before modifying config * Avoid the defer and move the success metric under --- .changelog/18168.txt | 3 + agent/hcp/client/client.go | 80 +---- agent/hcp/client/client_test.go | 240 +++++--------- agent/hcp/client/metrics_client.go | 8 +- agent/hcp/client/mock_metrics_client.go | 5 - agent/hcp/client/telemetry_config.go | 176 ++++++++++ agent/hcp/client/telemetry_config_test.go | 377 +++++++++++++++++++++ agent/hcp/deps.go | 43 ++- agent/hcp/deps_test.go | 97 +++--- agent/hcp/manager_test.go | 12 +- agent/hcp/telemetry/custom_metrics.go | 2 +- agent/hcp/telemetry/filter.go | 37 --- agent/hcp/telemetry/filter_test.go | 58 ---- agent/hcp/telemetry/otel_exporter.go | 35 +- agent/hcp/telemetry/otel_exporter_test.go | 41 ++- agent/hcp/telemetry/otel_sink.go | 103 +++--- agent/hcp/telemetry/otel_sink_test.go | 229 ++++++++++--- agent/hcp/telemetry_provider.go | 156 +++++++++ agent/hcp/telemetry_provider_test.go | 384 ++++++++++++++++++++++ go.mod | 2 +- go.sum | 4 +- 21 files changed, 1575 insertions(+), 517 deletions(-) create mode 100644 .changelog/18168.txt delete mode 100644 agent/hcp/client/mock_metrics_client.go create mode 100644 agent/hcp/client/telemetry_config.go create mode 100644 agent/hcp/client/telemetry_config_test.go delete mode 100644 agent/hcp/telemetry/filter.go delete mode 100644 agent/hcp/telemetry/filter_test.go create mode 100644 agent/hcp/telemetry_provider.go create mode 100644 agent/hcp/telemetry_provider_test.go diff --git a/.changelog/18168.txt b/.changelog/18168.txt new file mode 100644 index 000000000000..a68483527e10 --- /dev/null +++ b/.changelog/18168.txt @@ -0,0 +1,3 @@ +```release-note:improvement +hcp: Add dynamic configuration support for the export of server metrics to HCP. +``` \ No newline at end of file diff --git a/agent/hcp/client/client.go b/agent/hcp/client/client.go index 1c49fd792471..f04767e983c7 100644 --- a/agent/hcp/client/client.go +++ b/agent/hcp/client/client.go @@ -35,21 +35,6 @@ type Client interface { DiscoverServers(ctx context.Context) ([]string, error) } -// MetricsConfig holds metrics specific configuration for the TelemetryConfig. -// The endpoint field overrides the TelemetryConfig endpoint. -type MetricsConfig struct { - Filters []string - Endpoint string -} - -// TelemetryConfig contains configuration for telemetry data forwarded by Consul servers -// to the HCP Telemetry gateway. -type TelemetryConfig struct { - Endpoint string - Labels map[string]string - MetricsConfig *MetricsConfig -} - type BootstrapConfig struct { Name string BootstrapExpect int @@ -112,10 +97,14 @@ func (c *hcpClient) FetchTelemetryConfig(ctx context.Context) (*TelemetryConfig, resp, err := c.tgw.AgentTelemetryConfig(params, nil) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to fetch from HCP: %w", err) + } + + if err := validateAgentTelemetryConfigPayload(resp); err != nil { + return nil, fmt.Errorf("invalid response payload: %w", err) } - return convertTelemetryConfig(resp) + return convertAgentTelemetryResponse(ctx, resp, c.cfg) } func (c *hcpClient) FetchBootstrap(ctx context.Context) (*BootstrapConfig, error) { @@ -272,60 +261,3 @@ func (c *hcpClient) DiscoverServers(ctx context.Context) ([]string, error) { return servers, nil } - -// convertTelemetryConfig validates the AgentTelemetryConfig payload and converts it into a TelemetryConfig object. -func convertTelemetryConfig(resp *hcptelemetry.AgentTelemetryConfigOK) (*TelemetryConfig, error) { - if resp.Payload == nil { - return nil, fmt.Errorf("missing payload") - } - - if resp.Payload.TelemetryConfig == nil { - return nil, fmt.Errorf("missing telemetry config") - } - - payloadConfig := resp.Payload.TelemetryConfig - var metricsConfig MetricsConfig - if payloadConfig.Metrics != nil { - metricsConfig.Endpoint = payloadConfig.Metrics.Endpoint - metricsConfig.Filters = payloadConfig.Metrics.IncludeList - } - return &TelemetryConfig{ - Endpoint: payloadConfig.Endpoint, - Labels: payloadConfig.Labels, - MetricsConfig: &metricsConfig, - }, nil -} - -// Enabled verifies if telemetry is enabled by ensuring a valid endpoint has been retrieved. -// It returns full metrics endpoint and true if a valid endpoint was obtained. -func (t *TelemetryConfig) Enabled() (string, bool) { - endpoint := t.Endpoint - if override := t.MetricsConfig.Endpoint; override != "" { - endpoint = override - } - - if endpoint == "" { - return "", false - } - - // The endpoint from Telemetry Gateway is a domain without scheme, and without the metrics path, so they must be added. - return endpoint + metricsGatewayPath, true -} - -// DefaultLabels returns a set of string pairs that must be added as attributes to all exported telemetry data. -func (t *TelemetryConfig) DefaultLabels(cfg config.CloudConfig) map[string]string { - labels := make(map[string]string) - nodeID := string(cfg.NodeID) - if nodeID != "" { - labels["node_id"] = nodeID - } - if cfg.NodeName != "" { - labels["node_name"] = cfg.NodeName - } - - for k, v := range t.Labels { - labels[k] = v - } - - return labels -} diff --git a/agent/hcp/client/client_test.go b/agent/hcp/client/client_test.go index 0292fa3fab22..d4bae2ae4cb5 100644 --- a/agent/hcp/client/client_test.go +++ b/agent/hcp/client/client_test.go @@ -2,200 +2,122 @@ package client import ( "context" + "fmt" + "net/url" + "regexp" "testing" + "time" - "github.com/hashicorp/consul/agent/hcp/config" - "github.com/hashicorp/consul/types" - "github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service" + "github.com/go-openapi/runtime" + hcptelemetry "github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service" "github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/models" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) -func TestFetchTelemetryConfig(t *testing.T) { - t.Parallel() - for name, test := range map[string]struct { - metricsEndpoint string - expect func(*MockClient) - disabled bool - }{ - "success": { - expect: func(mockClient *MockClient) { - mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{ - Endpoint: "https://test.com", - MetricsConfig: &MetricsConfig{ - Endpoint: "", - }, - }, nil) - }, - metricsEndpoint: "https://test.com/v1/metrics", - }, - "overrideMetricsEndpoint": { - expect: func(mockClient *MockClient) { - mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{ - Endpoint: "https://test.com", - MetricsConfig: &MetricsConfig{ - Endpoint: "https://test.com", - }, - }, nil) - }, - metricsEndpoint: "https://test.com/v1/metrics", - }, - "disabledWithEmptyEndpoint": { - expect: func(mockClient *MockClient) { - mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{ - Endpoint: "", - MetricsConfig: &MetricsConfig{ - Endpoint: "", - }, - }, nil) - }, - disabled: true, - }, - } { - test := test - t.Run(name, func(t *testing.T) { - t.Parallel() - - mock := NewMockClient(t) - test.expect(mock) - - telemetryCfg, err := mock.FetchTelemetryConfig(context.Background()) - require.NoError(t, err) - - if test.disabled { - endpoint, ok := telemetryCfg.Enabled() - require.False(t, ok) - require.Empty(t, endpoint) - return - } +type mockTGW struct { + mockResponse *hcptelemetry.AgentTelemetryConfigOK + mockError error +} - endpoint, ok := telemetryCfg.Enabled() +func (m *mockTGW) AgentTelemetryConfig(params *hcptelemetry.AgentTelemetryConfigParams, authInfo runtime.ClientAuthInfoWriter, opts ...hcptelemetry.ClientOption) (*hcptelemetry.AgentTelemetryConfigOK, error) { + return m.mockResponse, m.mockError +} +func (m *mockTGW) GetLabelValues(params *hcptelemetry.GetLabelValuesParams, authInfo runtime.ClientAuthInfoWriter, opts ...hcptelemetry.ClientOption) (*hcptelemetry.GetLabelValuesOK, error) { + return hcptelemetry.NewGetLabelValuesOK(), nil +} +func (m *mockTGW) QueryRangeBatch(params *hcptelemetry.QueryRangeBatchParams, authInfo runtime.ClientAuthInfoWriter, opts ...hcptelemetry.ClientOption) (*hcptelemetry.QueryRangeBatchOK, error) { + return hcptelemetry.NewQueryRangeBatchOK(), nil +} +func (m *mockTGW) SetTransport(transport runtime.ClientTransport) {} - require.True(t, ok) - require.Equal(t, test.metricsEndpoint, endpoint) - }) - } +type expectedTelemetryCfg struct { + endpoint string + labels map[string]string + filters string + refreshInterval time.Duration } -func TestConvertTelemetryConfig(t *testing.T) { +func TestFetchTelemetryConfig(t *testing.T) { t.Parallel() - for name, test := range map[string]struct { - resp *consul_telemetry_service.AgentTelemetryConfigOK - expectedTelemetryCfg *TelemetryConfig - wantErr string + for name, tc := range map[string]struct { + mockResponse *hcptelemetry.AgentTelemetryConfigOK + mockError error + wantErr string + expected *expectedTelemetryCfg }{ - "success": { - resp: &consul_telemetry_service.AgentTelemetryConfigOK{ - Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{ - TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{ - Endpoint: "https://test.com", - Labels: map[string]string{"test": "test"}, - }, - }, - }, - expectedTelemetryCfg: &TelemetryConfig{ - Endpoint: "https://test.com", - Labels: map[string]string{"test": "test"}, - MetricsConfig: &MetricsConfig{}, + "errorsWithFetchFailure": { + mockError: fmt.Errorf("failed to fetch from HCP"), + mockResponse: nil, + wantErr: "failed to fetch from HCP", + }, + "errorsWithInvalidPayload": { + mockResponse: &hcptelemetry.AgentTelemetryConfigOK{ + Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{}, }, + mockError: nil, + wantErr: "invalid response payload", }, - "successWithMetricsConfig": { - resp: &consul_telemetry_service.AgentTelemetryConfigOK{ + "success:": { + mockResponse: &hcptelemetry.AgentTelemetryConfigOK{ Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{ + RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{ + RefreshInterval: "1s", + }, TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{ Endpoint: "https://test.com", - Labels: map[string]string{"test": "test"}, + Labels: map[string]string{"test": "123"}, Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{ - Endpoint: "https://metrics-test.com", - IncludeList: []string{"consul.raft.apply"}, + IncludeList: []string{"consul", "test"}, }, }, }, }, - expectedTelemetryCfg: &TelemetryConfig{ - Endpoint: "https://test.com", - Labels: map[string]string{"test": "test"}, - MetricsConfig: &MetricsConfig{ - Endpoint: "https://metrics-test.com", - Filters: []string{"consul.raft.apply"}, - }, - }, - }, - "errorsWithNilPayload": { - resp: &consul_telemetry_service.AgentTelemetryConfigOK{}, - wantErr: "missing payload", - }, - "errorsWithNilTelemetryConfig": { - resp: &consul_telemetry_service.AgentTelemetryConfigOK{ - Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{}, + expected: &expectedTelemetryCfg{ + endpoint: "https://test.com/v1/metrics", + labels: map[string]string{"test": "123"}, + filters: "consul|test", + refreshInterval: 1 * time.Second, }, - wantErr: "missing telemetry config", }, } { - test := test + tc := tc t.Run(name, func(t *testing.T) { t.Parallel() - telemetryCfg, err := convertTelemetryConfig(test.resp) - if test.wantErr != "" { + c := &hcpClient{ + tgw: &mockTGW{ + mockError: tc.mockError, + mockResponse: tc.mockResponse, + }, + } + + telemetryCfg, err := c.FetchTelemetryConfig(context.Background()) + + if tc.wantErr != "" { require.Error(t, err) + require.Contains(t, err.Error(), tc.wantErr) require.Nil(t, telemetryCfg) - require.Contains(t, err.Error(), test.wantErr) return } + urlEndpoint, err := url.Parse(tc.expected.endpoint) require.NoError(t, err) - require.Equal(t, test.expectedTelemetryCfg, telemetryCfg) - }) - } -} -func Test_DefaultLabels(t *testing.T) { - for name, tc := range map[string]struct { - cfg config.CloudConfig - expectedLabels map[string]string - }{ - "Success": { - cfg: config.CloudConfig{ - NodeID: types.NodeID("nodeyid"), - NodeName: "nodey", - }, - expectedLabels: map[string]string{ - "node_id": "nodeyid", - "node_name": "nodey", - }, - }, + regexFilters, err := regexp.Compile(tc.expected.filters) + require.NoError(t, err) - "NoNodeID": { - cfg: config.CloudConfig{ - NodeID: types.NodeID(""), - NodeName: "nodey", - }, - expectedLabels: map[string]string{ - "node_name": "nodey", - }, - }, - "NoNodeName": { - cfg: config.CloudConfig{ - NodeID: types.NodeID("nodeyid"), - NodeName: "", - }, - expectedLabels: map[string]string{ - "node_id": "nodeyid", - }, - }, - "Empty": { - cfg: config.CloudConfig{ - NodeID: "", - NodeName: "", - }, - expectedLabels: map[string]string{}, - }, - } { - t.Run(name, func(t *testing.T) { - tCfg := &TelemetryConfig{} - labels := tCfg.DefaultLabels(tc.cfg) - require.Equal(t, labels, tc.expectedLabels) + expectedCfg := &TelemetryConfig{ + MetricsConfig: &MetricsConfig{ + Endpoint: urlEndpoint, + Filters: regexFilters, + Labels: tc.expected.labels, + }, + RefreshConfig: &RefreshConfig{ + RefreshInterval: tc.expected.refreshInterval, + }, + } + + require.NoError(t, err) + require.Equal(t, expectedCfg, telemetryCfg) }) } } diff --git a/agent/hcp/client/metrics_client.go b/agent/hcp/client/metrics_client.go index 0bcb90b81ce2..3c5b5c4fb9d6 100644 --- a/agent/hcp/client/metrics_client.go +++ b/agent/hcp/client/metrics_client.go @@ -18,6 +18,7 @@ import ( "golang.org/x/oauth2" "google.golang.org/protobuf/proto" + "github.com/hashicorp/consul/agent/hcp/telemetry" "github.com/hashicorp/consul/version" ) @@ -38,11 +39,6 @@ const ( defaultErrRespBodyLength = 100 ) -// MetricsClient exports Consul metrics in OTLP format to the HCP Telemetry Gateway. -type MetricsClient interface { - ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error -} - // cloudConfig represents cloud config for TLS abstracted in an interface for easy testing. type CloudConfig interface { HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) @@ -58,7 +54,7 @@ type otlpClient struct { // NewMetricsClient returns a configured MetricsClient. // The current implementation uses otlpClient to provide retry functionality. -func NewMetricsClient(ctx context.Context, cfg CloudConfig) (MetricsClient, error) { +func NewMetricsClient(ctx context.Context, cfg CloudConfig) (telemetry.MetricsClient, error) { if cfg == nil { return nil, fmt.Errorf("failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)") } diff --git a/agent/hcp/client/mock_metrics_client.go b/agent/hcp/client/mock_metrics_client.go deleted file mode 100644 index a30b1f1c62c0..000000000000 --- a/agent/hcp/client/mock_metrics_client.go +++ /dev/null @@ -1,5 +0,0 @@ -package client - -type MockMetricsClient struct { - MetricsClient -} diff --git a/agent/hcp/client/telemetry_config.go b/agent/hcp/client/telemetry_config.go new file mode 100644 index 000000000000..55c226438030 --- /dev/null +++ b/agent/hcp/client/telemetry_config.go @@ -0,0 +1,176 @@ +package client + +import ( + "context" + "errors" + "fmt" + "net/url" + "regexp" + "strings" + "time" + + "github.com/hashicorp/go-hclog" + hcptelemetry "github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service" + + "github.com/hashicorp/consul/agent/hcp/config" +) + +var ( + // defaultMetricFilters is a regex that matches all metric names. + defaultMetricFilters = regexp.MustCompile(".+") + + // Validation errors for AgentTelemetryConfigOK response. + errMissingPayload = errors.New("missing payload") + errMissingTelemetryConfig = errors.New("missing telemetry config") + errMissingRefreshConfig = errors.New("missing refresh config") + errMissingMetricsConfig = errors.New("missing metrics config") + errInvalidRefreshInterval = errors.New("invalid refresh interval") + errInvalidEndpoint = errors.New("invalid metrics endpoint") +) + +// TelemetryConfig contains configuration for telemetry data forwarded by Consul servers +// to the HCP Telemetry gateway. +type TelemetryConfig struct { + MetricsConfig *MetricsConfig + RefreshConfig *RefreshConfig +} + +// MetricsConfig holds metrics specific configuration within TelemetryConfig. +type MetricsConfig struct { + Labels map[string]string + Filters *regexp.Regexp + Endpoint *url.URL +} + +// RefreshConfig contains configuration for the periodic fetch of configuration from HCP. +type RefreshConfig struct { + RefreshInterval time.Duration +} + +// MetricsEnabled returns true if metrics export is enabled, i.e. a valid metrics endpoint exists. +func (t *TelemetryConfig) MetricsEnabled() bool { + return t.MetricsConfig.Endpoint != nil +} + +// validateAgentTelemetryConfigPayload ensures the returned payload from HCP is valid. +func validateAgentTelemetryConfigPayload(resp *hcptelemetry.AgentTelemetryConfigOK) error { + if resp.Payload == nil { + return errMissingPayload + } + + if resp.Payload.TelemetryConfig == nil { + return errMissingTelemetryConfig + } + + if resp.Payload.RefreshConfig == nil { + return errMissingRefreshConfig + } + + if resp.Payload.TelemetryConfig.Metrics == nil { + return errMissingMetricsConfig + } + + return nil +} + +// convertAgentTelemetryResponse converts an AgentTelemetryConfig payload into a TelemetryConfig object. +func convertAgentTelemetryResponse(ctx context.Context, resp *hcptelemetry.AgentTelemetryConfigOK, cfg config.CloudConfig) (*TelemetryConfig, error) { + refreshInterval, err := time.ParseDuration(resp.Payload.RefreshConfig.RefreshInterval) + if err != nil { + return nil, fmt.Errorf("%w: %w", errInvalidRefreshInterval, err) + } + + telemetryConfig := resp.Payload.TelemetryConfig + metricsEndpoint, err := convertMetricEndpoint(telemetryConfig.Endpoint, telemetryConfig.Metrics.Endpoint) + if err != nil { + return nil, errInvalidEndpoint + } + + metricsFilters := convertMetricFilters(ctx, telemetryConfig.Metrics.IncludeList) + metricLabels := convertMetricLabels(telemetryConfig.Labels, cfg) + + return &TelemetryConfig{ + MetricsConfig: &MetricsConfig{ + Endpoint: metricsEndpoint, + Labels: metricLabels, + Filters: metricsFilters, + }, + RefreshConfig: &RefreshConfig{ + RefreshInterval: refreshInterval, + }, + }, nil +} + +// convertMetricEndpoint returns a url for the export of metrics, if a valid endpoint was obtained. +// It returns no error, and no url, if an empty endpoint is retrieved (server not registered with CCM). +// It returns an error, and no url, if a bad endpoint is retrieved. +func convertMetricEndpoint(telemetryEndpoint string, metricsEndpoint string) (*url.URL, error) { + // Telemetry endpoint overriden by metrics specific endpoint, if given. + endpoint := telemetryEndpoint + if metricsEndpoint != "" { + endpoint = metricsEndpoint + } + + // If endpoint is empty, server not registered with CCM, no error returned. + if endpoint == "" { + return nil, nil + } + + // Endpoint from CTW has no metrics path, so it must be added. + rawUrl := endpoint + metricsGatewayPath + u, err := url.ParseRequestURI(rawUrl) + if err != nil { + return nil, fmt.Errorf("%w: %w", errInvalidEndpoint, err) + } + + return u, nil +} + +// convertMetricFilters returns a valid regex used to filter metrics. +// if invalid filters are given, a defaults regex that allow all metrics is returned. +func convertMetricFilters(ctx context.Context, payloadFilters []string) *regexp.Regexp { + logger := hclog.FromContext(ctx) + validFilters := make([]string, 0, len(payloadFilters)) + for _, filter := range payloadFilters { + _, err := regexp.Compile(filter) + if err != nil { + logger.Error("invalid filter", "error", err) + continue + } + validFilters = append(validFilters, filter) + } + + if len(validFilters) == 0 { + logger.Error("no valid filters") + return defaultMetricFilters + } + + // Combine the valid regex strings with OR. + finalRegex := strings.Join(validFilters, "|") + composedRegex, err := regexp.Compile(finalRegex) + if err != nil { + logger.Error("failed to compile final regex", "error", err) + return defaultMetricFilters + } + + return composedRegex +} + +// convertMetricLabels returns a set of string pairs that must be added as attributes to all exported telemetry data. +func convertMetricLabels(payloadLabels map[string]string, cfg config.CloudConfig) map[string]string { + labels := make(map[string]string) + nodeID := string(cfg.NodeID) + if nodeID != "" { + labels["node_id"] = nodeID + } + + if cfg.NodeName != "" { + labels["node_name"] = cfg.NodeName + } + + for k, v := range payloadLabels { + labels[k] = v + } + + return labels +} diff --git a/agent/hcp/client/telemetry_config_test.go b/agent/hcp/client/telemetry_config_test.go new file mode 100644 index 000000000000..42d3ee649802 --- /dev/null +++ b/agent/hcp/client/telemetry_config_test.go @@ -0,0 +1,377 @@ +package client + +import ( + "context" + "net/url" + "regexp" + "testing" + "time" + + "github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service" + "github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/models" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/hcp/config" + "github.com/hashicorp/consul/types" +) + +func TestValidateAgentTelemetryConfigPayload(t *testing.T) { + t.Parallel() + for name, tc := range map[string]struct { + resp *consul_telemetry_service.AgentTelemetryConfigOK + wantErr error + }{ + "errorsWithNilPayload": { + resp: &consul_telemetry_service.AgentTelemetryConfigOK{}, + wantErr: errMissingPayload, + }, + "errorsWithNilTelemetryConfig": { + resp: &consul_telemetry_service.AgentTelemetryConfigOK{ + Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{ + RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{}, + }, + }, + wantErr: errMissingTelemetryConfig, + }, + "errorsWithNilRefreshConfig": { + resp: &consul_telemetry_service.AgentTelemetryConfigOK{ + Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{ + TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{}, + }, + }, + wantErr: errMissingRefreshConfig, + }, + "errorsWithNilMetricsConfig": { + resp: &consul_telemetry_service.AgentTelemetryConfigOK{ + Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{ + TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{}, + RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{}, + }, + }, + wantErr: errMissingMetricsConfig, + }, + "success": { + resp: &consul_telemetry_service.AgentTelemetryConfigOK{ + Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{ + TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{ + Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{}, + }, + RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{}, + }, + }, + }, + } { + tc := tc + t.Run(name, func(t *testing.T) { + t.Parallel() + err := validateAgentTelemetryConfigPayload(tc.resp) + if tc.wantErr != nil { + require.ErrorIs(t, err, tc.wantErr) + return + } + require.NoError(t, err) + }) + } +} + +func TestConvertAgentTelemetryResponse(t *testing.T) { + validTestURL, err := url.Parse("https://test.com/v1/metrics") + require.NoError(t, err) + + validTestFilters, err := regexp.Compile("test|consul") + require.NoError(t, err) + + for name, tc := range map[string]struct { + resp *consul_telemetry_service.AgentTelemetryConfigOK + expectedTelemetryCfg *TelemetryConfig + wantErr error + expectedEnabled bool + }{ + "success": { + resp: &consul_telemetry_service.AgentTelemetryConfigOK{ + Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{ + TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{ + Endpoint: "https://test.com", + Labels: map[string]string{"test": "test"}, + Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{ + IncludeList: []string{"test", "consul"}, + }, + }, + RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{ + RefreshInterval: "2s", + }, + }, + }, + expectedTelemetryCfg: &TelemetryConfig{ + MetricsConfig: &MetricsConfig{ + Endpoint: validTestURL, + Labels: map[string]string{"test": "test"}, + Filters: validTestFilters, + }, + RefreshConfig: &RefreshConfig{ + RefreshInterval: 2 * time.Second, + }, + }, + expectedEnabled: true, + }, + "successNoEndpoint": { + resp: &consul_telemetry_service.AgentTelemetryConfigOK{ + Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{ + TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{ + Endpoint: "", + Labels: map[string]string{"test": "test"}, + Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{ + IncludeList: []string{"test", "consul"}, + }, + }, + RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{ + RefreshInterval: "2s", + }, + }, + }, + expectedTelemetryCfg: &TelemetryConfig{ + MetricsConfig: &MetricsConfig{ + Endpoint: nil, + Labels: map[string]string{"test": "test"}, + Filters: validTestFilters, + }, + RefreshConfig: &RefreshConfig{ + RefreshInterval: 2 * time.Second, + }, + }, + expectedEnabled: false, + }, + "successBadFilters": { + resp: &consul_telemetry_service.AgentTelemetryConfigOK{ + Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{ + TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{ + Endpoint: "https://test.com", + Labels: map[string]string{"test": "test"}, + Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{ + IncludeList: []string{"[", "(*LF)"}, + }, + }, + RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{ + RefreshInterval: "2s", + }, + }, + }, + expectedTelemetryCfg: &TelemetryConfig{ + MetricsConfig: &MetricsConfig{ + Endpoint: validTestURL, + Labels: map[string]string{"test": "test"}, + Filters: defaultMetricFilters, + }, + RefreshConfig: &RefreshConfig{ + RefreshInterval: 2 * time.Second, + }, + }, + expectedEnabled: true, + }, + "errorsWithInvalidRefreshInterval": { + resp: &consul_telemetry_service.AgentTelemetryConfigOK{ + Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{ + TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{ + Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{}, + }, + RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{ + RefreshInterval: "300ws", + }, + }, + }, + wantErr: errInvalidRefreshInterval, + }, + "errorsWithInvalidEndpoint": { + resp: &consul_telemetry_service.AgentTelemetryConfigOK{ + Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{ + TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{ + Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{ + Endpoint: " ", + }, + }, + RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{ + RefreshInterval: "1s", + }, + }, + }, + wantErr: errInvalidEndpoint, + }, + } { + t.Run(name, func(t *testing.T) { + telemetryCfg, err := convertAgentTelemetryResponse(context.Background(), tc.resp, config.CloudConfig{}) + if tc.wantErr != nil { + require.ErrorIs(t, err, tc.wantErr) + require.Nil(t, telemetryCfg) + return + } + require.NoError(t, err) + require.Equal(t, tc.expectedTelemetryCfg, telemetryCfg) + require.Equal(t, tc.expectedEnabled, telemetryCfg.MetricsEnabled()) + }) + } +} + +func TestConvertMetricEndpoint(t *testing.T) { + t.Parallel() + for name, tc := range map[string]struct { + endpoint string + override string + expected string + wantErr error + }{ + "success": { + endpoint: "https://test.com", + expected: "https://test.com/v1/metrics", + }, + "successMetricsOverride": { + endpoint: "https://test.com", + override: "https://override.com", + expected: "https://override.com/v1/metrics", + }, + "noErrorWithEmptyEndpoints": { + endpoint: "", + override: "", + expected: "", + }, + "errorWithInvalidURL": { + endpoint: " ", + override: "", + wantErr: errInvalidEndpoint, + }, + } { + tc := tc + t.Run(name, func(t *testing.T) { + t.Parallel() + u, err := convertMetricEndpoint(tc.endpoint, tc.override) + if tc.wantErr != nil { + require.ErrorIs(t, err, tc.wantErr) + require.Empty(t, u) + return + } + + if tc.expected == "" { + require.Nil(t, u) + require.NoError(t, err) + return + } + + require.NotNil(t, u) + require.NoError(t, err) + require.Equal(t, tc.expected, u.String()) + }) + } + +} + +func TestConvertMetricFilters(t *testing.T) { + t.Parallel() + for name, tc := range map[string]struct { + filters []string + expectedRegexString string + matches []string + wantErr string + wantMatch bool + }{ + "badFilterRegex": { + filters: []string{"(*LF)"}, + expectedRegexString: defaultMetricFilters.String(), + matches: []string{"consul.raft.peers", "consul.mem.heap_size"}, + wantMatch: true, + }, + "emptyRegex": { + filters: []string{}, + expectedRegexString: defaultMetricFilters.String(), + matches: []string{"consul.raft.peers", "consul.mem.heap_size"}, + wantMatch: true, + }, + "matchFound": { + filters: []string{"raft.*", "mem.*"}, + expectedRegexString: "raft.*|mem.*", + matches: []string{"consul.raft.peers", "consul.mem.heap_size"}, + wantMatch: true, + }, + "matchNotFound": { + filters: []string{"mem.*"}, + matches: []string{"consul.raft.peers", "consul.txn.apply"}, + expectedRegexString: "mem.*", + wantMatch: false, + }, + } { + tc := tc + t.Run(name, func(t *testing.T) { + t.Parallel() + f := convertMetricFilters(context.Background(), tc.filters) + + require.Equal(t, tc.expectedRegexString, f.String()) + for _, metric := range tc.matches { + m := f.MatchString(metric) + require.Equal(t, tc.wantMatch, m) + } + }) + } +} + +func TestConvertMetricLabels(t *testing.T) { + t.Parallel() + for name, tc := range map[string]struct { + payloadLabels map[string]string + cfg config.CloudConfig + expectedLabels map[string]string + }{ + "Success": { + payloadLabels: map[string]string{ + "ctw_label": "test", + }, + cfg: config.CloudConfig{ + NodeID: types.NodeID("nodeyid"), + NodeName: "nodey", + }, + expectedLabels: map[string]string{ + "ctw_label": "test", + "node_id": "nodeyid", + "node_name": "nodey", + }, + }, + + "NoNodeID": { + payloadLabels: map[string]string{ + "ctw_label": "test", + }, + cfg: config.CloudConfig{ + NodeID: types.NodeID(""), + NodeName: "nodey", + }, + expectedLabels: map[string]string{ + "ctw_label": "test", + "node_name": "nodey", + }, + }, + "NoNodeName": { + payloadLabels: map[string]string{ + "ctw_label": "test", + }, + cfg: config.CloudConfig{ + NodeID: types.NodeID("nodeyid"), + NodeName: "", + }, + expectedLabels: map[string]string{ + "ctw_label": "test", + "node_id": "nodeyid", + }, + }, + "Empty": { + cfg: config.CloudConfig{ + NodeID: "", + NodeName: "", + }, + expectedLabels: map[string]string{}, + }, + } { + tc := tc + t.Run(name, func(t *testing.T) { + t.Parallel() + labels := convertMetricLabels(tc.payloadLabels, tc.cfg) + require.Equal(t, labels, tc.expectedLabels) + }) + } +} diff --git a/agent/hcp/deps.go b/agent/hcp/deps.go index 951a3900288d..d0d7f48106da 100644 --- a/agent/hcp/deps.go +++ b/agent/hcp/deps.go @@ -3,7 +3,6 @@ package hcp import ( "context" "fmt" - "net/url" "time" "github.com/armon/go-metrics" @@ -41,7 +40,11 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) { return Deps{}, fmt.Errorf("failed to init metrics client: %w", err) } - sink := sink(ctx, client, metricsClient, cfg) + sink, err := sink(ctx, client, metricsClient) + if err != nil { + // Do not prevent server start if sink init fails, only log error. + logger.Error("failed to init sink", "error", err) + } return Deps{ Client: client, @@ -50,50 +53,44 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) { }, nil } -// sink provides initializes an OTELSink which forwards Consul metrics to HCP. +// sink initializes an OTELSink which forwards Consul metrics to HCP. // The sink is only initialized if the server is registered with the management plane (CCM). -// This step should not block server initialization, so errors are logged, but not returned. +// This step should not block server initialization, errors are returned, only to be logged. func sink( ctx context.Context, hcpClient hcpclient.Client, - metricsClient hcpclient.MetricsClient, - cfg config.CloudConfig, -) metrics.MetricSink { + metricsClient telemetry.MetricsClient, +) (metrics.MetricSink, error) { logger := hclog.FromContext(ctx).Named("sink") reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() telemetryCfg, err := hcpClient.FetchTelemetryConfig(reqCtx) if err != nil { - logger.Error("failed to fetch telemetry config", "error", err) - return nil + return nil, fmt.Errorf("failed to fetch telemetry config: %w", err) } - endpoint, isEnabled := telemetryCfg.Enabled() - if !isEnabled { - return nil + if !telemetryCfg.MetricsEnabled() { + return nil, nil } - u, err := url.Parse(endpoint) + cfgProvider, err := NewHCPProvider(ctx, hcpClient, telemetryCfg) if err != nil { - logger.Error("failed to parse url endpoint", "error", err) - return nil + return nil, fmt.Errorf("failed to init config provider: %w", err) } + reader := telemetry.NewOTELReader(metricsClient, cfgProvider, telemetry.DefaultExportInterval) sinkOpts := &telemetry.OTELSinkOpts{ - Ctx: ctx, - Reader: telemetry.NewOTELReader(metricsClient, u, telemetry.DefaultExportInterval), - Labels: telemetryCfg.DefaultLabels(cfg), - Filters: telemetryCfg.MetricsConfig.Filters, + Reader: reader, + ConfigProvider: cfgProvider, } - sink, err := telemetry.NewOTELSink(sinkOpts) + sink, err := telemetry.NewOTELSink(ctx, sinkOpts) if err != nil { - logger.Error("failed to init OTEL sink", "error", err) - return nil + return nil, fmt.Errorf("failed create OTELSink: %w", err) } logger.Debug("initialized HCP metrics sink") - return sink + return sink, nil } diff --git a/agent/hcp/deps_test.go b/agent/hcp/deps_test.go index 9a90c26d50ad..8bab66bac3f3 100644 --- a/agent/hcp/deps_test.go +++ b/agent/hcp/deps_test.go @@ -3,98 +3,97 @@ package hcp import ( "context" "fmt" + "net/url" + "regexp" "testing" + "time" - "github.com/hashicorp/consul/agent/hcp/config" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/hashicorp/consul/agent/hcp/client" - "github.com/hashicorp/consul/types" + "github.com/hashicorp/consul/agent/hcp/telemetry" ) +type mockMetricsClient struct { + telemetry.MetricsClient +} + func TestSink(t *testing.T) { t.Parallel() for name, test := range map[string]struct { expect func(*client.MockClient) - cloudCfg config.CloudConfig + wantErr string expectedSink bool }{ "success": { expect: func(mockClient *client.MockClient) { - mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ - Endpoint: "https://test.com", - MetricsConfig: &client.MetricsConfig{ - Endpoint: "https://test.com", - }, - }, nil) - }, - cloudCfg: config.CloudConfig{ - NodeID: types.NodeID("nodeyid"), - NodeName: "nodey", + u, _ := url.Parse("https://test.com/v1/metrics") + filters, _ := regexp.Compile("test") + mt := mockTelemetryConfig(1*time.Second, u, filters) + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil) }, expectedSink: true, }, - "noSinkWhenServerNotRegisteredWithCCM": { - expect: func(mockClient *client.MockClient) { - mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ - Endpoint: "", - MetricsConfig: &client.MetricsConfig{ - Endpoint: "", - }, - }, nil) - }, - cloudCfg: config.CloudConfig{}, - }, - "noSinkWhenCCMVerificationFails": { + "noSinkWhenFetchTelemetryConfigFails": { expect: func(mockClient *client.MockClient) { mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("fetch failed")) }, - cloudCfg: config.CloudConfig{}, + wantErr: "failed to fetch telemetry config", }, - "failsWithFetchTelemetryFailure": { - expect: func(mockClient *client.MockClient) { - mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("FetchTelemetryConfig error")) - }, - }, - "failsWithURLParseErr": { + "noSinkWhenServerNotRegisteredWithCCM": { expect: func(mockClient *client.MockClient) { - mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ - // Minimum 2 chars for a domain to be valid. - Endpoint: "s", - MetricsConfig: &client.MetricsConfig{ - // Invalid domain chars - Endpoint: " ", - }, - }, nil) + mt := mockTelemetryConfig(1*time.Second, nil, nil) + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil) }, }, - "noErrWithEmptyEndpoint": { + "noSinkWhenTelemetryConfigProviderInitFails": { expect: func(mockClient *client.MockClient) { - mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ - Endpoint: "", - MetricsConfig: &client.MetricsConfig{ - Endpoint: "", - }, - }, nil) + u, _ := url.Parse("https://test.com/v1/metrics") + // Bad refresh interval forces ConfigProvider creation failure. + mt := mockTelemetryConfig(0*time.Second, u, nil) + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil) }, + wantErr: "failed to init config provider", }, } { test := test t.Run(name, func(t *testing.T) { t.Parallel() c := client.NewMockClient(t) - mc := client.MockMetricsClient{} + mc := mockMetricsClient{} test.expect(c) ctx := context.Background() - s := sink(ctx, c, mc, test.cloudCfg) + s, err := sink(ctx, c, mc) + + if test.wantErr != "" { + require.NotNil(t, err) + require.Contains(t, err.Error(), test.wantErr) + require.Nil(t, s) + return + } + if !test.expectedSink { require.Nil(t, s) + require.Nil(t, err) return } + require.NotNil(t, s) }) } } + +func mockTelemetryConfig(refreshInterval time.Duration, metricsEndpoint *url.URL, filters *regexp.Regexp) *client.TelemetryConfig { + return &client.TelemetryConfig{ + MetricsConfig: &client.MetricsConfig{ + Endpoint: metricsEndpoint, + Filters: filters, + }, + RefreshConfig: &client.RefreshConfig{ + RefreshInterval: refreshInterval, + }, + } +} diff --git a/agent/hcp/manager_test.go b/agent/hcp/manager_test.go index fbde8f29019b..2363739d6206 100644 --- a/agent/hcp/manager_test.go +++ b/agent/hcp/manager_test.go @@ -55,7 +55,11 @@ func TestManager_SendUpdate(t *testing.T) { StatusFn: statusF, }) mgr.testUpdateSent = updateCh - go mgr.Run(context.Background()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go mgr.Run(ctx) select { case <-updateCh: case <-time.After(time.Second): @@ -87,7 +91,11 @@ func TestManager_SendUpdate_Periodic(t *testing.T) { MinInterval: 100 * time.Millisecond, }) mgr.testUpdateSent = updateCh - go mgr.Run(context.Background()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go mgr.Run(ctx) select { case <-updateCh: case <-time.After(time.Second): diff --git a/agent/hcp/telemetry/custom_metrics.go b/agent/hcp/telemetry/custom_metrics.go index 746dc56cbe41..d691dccde207 100644 --- a/agent/hcp/telemetry/custom_metrics.go +++ b/agent/hcp/telemetry/custom_metrics.go @@ -6,7 +6,7 @@ package telemetry var ( internalMetricTransformFailure []string = []string{"hcp", "otel", "transform", "failure"} - internalMetricExportSuccess []string = []string{"hcp", "otel", "exporter", "export", "sucess"} + internalMetricExportSuccess []string = []string{"hcp", "otel", "exporter", "export", "success"} internalMetricExportFailure []string = []string{"hcp", "otel", "exporter", "export", "failure"} internalMetricExporterShutdown []string = []string{"hcp", "otel", "exporter", "shutdown"} diff --git a/agent/hcp/telemetry/filter.go b/agent/hcp/telemetry/filter.go deleted file mode 100644 index 54dca7d44aef..000000000000 --- a/agent/hcp/telemetry/filter.go +++ /dev/null @@ -1,37 +0,0 @@ -package telemetry - -import ( - "fmt" - "regexp" - "strings" - - "github.com/hashicorp/go-multierror" -) - -// newFilterRegex returns a valid regex used to filter metrics. -// It will fail if there are 0 valid regex filters given. -func newFilterRegex(filters []string) (*regexp.Regexp, error) { - var mErr error - validFilters := make([]string, 0, len(filters)) - for _, filter := range filters { - _, err := regexp.Compile(filter) - if err != nil { - mErr = multierror.Append(mErr, fmt.Errorf("compilation of filter %q failed: %w", filter, err)) - continue - } - validFilters = append(validFilters, filter) - } - - if len(validFilters) == 0 { - return nil, multierror.Append(mErr, fmt.Errorf("no valid filters")) - } - - // Combine the valid regex strings with an OR. - finalRegex := strings.Join(validFilters, "|") - composedRegex, err := regexp.Compile(finalRegex) - if err != nil { - return nil, fmt.Errorf("failed to compile regex: %w", err) - } - - return composedRegex, nil -} diff --git a/agent/hcp/telemetry/filter_test.go b/agent/hcp/telemetry/filter_test.go deleted file mode 100644 index abe962f4cd47..000000000000 --- a/agent/hcp/telemetry/filter_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package telemetry - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestFilter(t *testing.T) { - t.Parallel() - for name, tc := range map[string]struct { - filters []string - expectedRegexString string - matches []string - wantErr string - wantMatch bool - }{ - "badFilterRegex": { - filters: []string{"(*LF)"}, - wantErr: "no valid filters", - }, - "failsWithNoRegex": { - filters: []string{}, - wantErr: "no valid filters", - }, - "matchFound": { - filters: []string{"raft.*", "mem.*"}, - expectedRegexString: "raft.*|mem.*", - matches: []string{"consul.raft.peers", "consul.mem.heap_size"}, - wantMatch: true, - }, - "matchNotFound": { - filters: []string{"mem.*"}, - matches: []string{"consul.raft.peers", "consul.txn.apply"}, - expectedRegexString: "mem.*", - wantMatch: false, - }, - } { - tc := tc - t.Run(name, func(t *testing.T) { - t.Parallel() - f, err := newFilterRegex(tc.filters) - - if tc.wantErr != "" { - require.Error(t, err) - require.Contains(t, err.Error(), tc.wantErr) - return - } - - require.NoError(t, err) - require.Equal(t, tc.expectedRegexString, f.String()) - for _, metric := range tc.matches { - m := f.MatchString(metric) - require.Equal(t, tc.wantMatch, m) - } - }) - } -} diff --git a/agent/hcp/telemetry/otel_exporter.go b/agent/hcp/telemetry/otel_exporter.go index 76c8f5b000b5..084657816e0c 100644 --- a/agent/hcp/telemetry/otel_exporter.go +++ b/agent/hcp/telemetry/otel_exporter.go @@ -9,23 +9,34 @@ import ( "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" - - hcpclient "github.com/hashicorp/consul/agent/hcp/client" + metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) +// MetricsClient exports Consul metrics in OTLP format to the desired endpoint. +type MetricsClient interface { + ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error +} + +// EndpointProvider provides the endpoint where metrics are exported to by the OTELExporter. +// EndpointProvider exposes the GetEndpoint() interface method to fetch the endpoint. +// This abstraction layer offers flexibility, in particular for dynamic configuration or changes to the endpoint. +type EndpointProvider interface { + GetEndpoint() *url.URL +} + // OTELExporter is a custom implementation of a OTEL Metrics SDK metrics.Exporter. // The exporter is used by a OTEL Metrics SDK PeriodicReader to export aggregated metrics. // This allows us to use a custom client - HCP authenticated MetricsClient. type OTELExporter struct { - client hcpclient.MetricsClient - endpoint *url.URL + client MetricsClient + endpointProvider EndpointProvider } -// NewOTELExporter returns a configured OTELExporter -func NewOTELExporter(client hcpclient.MetricsClient, endpoint *url.URL) *OTELExporter { +// NewOTELExporter returns a configured OTELExporter. +func NewOTELExporter(client MetricsClient, endpointProvider EndpointProvider) *OTELExporter { return &OTELExporter{ - client: client, - endpoint: endpoint, + client: client, + endpointProvider: endpointProvider, } } @@ -54,11 +65,17 @@ func (e *OTELExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggre // Export serializes and transmits metric data to a receiver. func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceMetrics) error { + endpoint := e.endpointProvider.GetEndpoint() + if endpoint == nil { + return nil + } + otlpMetrics := transformOTLP(metrics) if isEmpty(otlpMetrics) { return nil } - err := e.client.ExportMetrics(ctx, otlpMetrics, e.endpoint.String()) + + err := e.client.ExportMetrics(ctx, otlpMetrics, endpoint.String()) if err != nil { goMetrics.IncrCounter(internalMetricExportFailure, 1) return fmt.Errorf("failed to export metrics: %w", err) diff --git a/agent/hcp/telemetry/otel_exporter_test.go b/agent/hcp/telemetry/otel_exporter_test.go index bc1a626f1c16..53b7bd316094 100644 --- a/agent/hcp/telemetry/otel_exporter_test.go +++ b/agent/hcp/telemetry/otel_exporter_test.go @@ -15,8 +15,10 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" +) - "github.com/hashicorp/consul/agent/hcp/client" +const ( + testExportEndpoint = "https://test.com/v1/metrics" ) type mockMetricsClient struct { @@ -27,6 +29,12 @@ func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *met return m.exportErr } +type mockEndpointProvider struct { + endpoint *url.URL +} + +func (m *mockEndpointProvider) GetEndpoint() *url.URL { return m.endpoint } + func TestTemporality(t *testing.T) { t.Parallel() exp := &OTELExporter{} @@ -64,10 +72,15 @@ func TestAggregation(t *testing.T) { func TestExport(t *testing.T) { t.Parallel() for name, test := range map[string]struct { - wantErr string - metrics *metricdata.ResourceMetrics - client client.MetricsClient + wantErr string + metrics *metricdata.ResourceMetrics + client MetricsClient + provider EndpointProvider }{ + "earlyReturnWithoutEndpoint": { + client: &mockMetricsClient{}, + provider: &mockEndpointProvider{}, + }, "earlyReturnWithoutScopeMetrics": { client: &mockMetricsClient{}, metrics: mutateMetrics(nil), @@ -100,7 +113,16 @@ func TestExport(t *testing.T) { test := test t.Run(name, func(t *testing.T) { t.Parallel() - exp := NewOTELExporter(test.client, &url.URL{}) + provider := test.provider + if provider == nil { + u, err := url.Parse(testExportEndpoint) + require.NoError(t, err) + provider = &mockEndpointProvider{ + endpoint: u, + } + } + + exp := NewOTELExporter(test.client, provider) err := exp.Export(context.Background(), test.metrics) if test.wantErr != "" { @@ -119,7 +141,7 @@ func TestExport(t *testing.T) { // sets a shared global sink. func TestExport_CustomMetrics(t *testing.T) { for name, tc := range map[string]struct { - client client.MetricsClient + client MetricsClient metricKey []string operation string }{ @@ -154,7 +176,12 @@ func TestExport_CustomMetrics(t *testing.T) { metrics.NewGlobal(cfg, sink) // Perform operation that emits metric. - exp := NewOTELExporter(tc.client, &url.URL{}) + u, err := url.Parse(testExportEndpoint) + require.NoError(t, err) + + exp := NewOTELExporter(tc.client, &mockEndpointProvider{ + endpoint: u, + }) ctx := context.Background() switch tc.operation { diff --git a/agent/hcp/telemetry/otel_sink.go b/agent/hcp/telemetry/otel_sink.go index 39e9aa599cd8..49a6d595076c 100644 --- a/agent/hcp/telemetry/otel_sink.go +++ b/agent/hcp/telemetry/otel_sink.go @@ -3,8 +3,7 @@ package telemetry import ( "bytes" "context" - "fmt" - "net/url" + "errors" "regexp" "strings" "sync" @@ -16,19 +15,24 @@ import ( otelmetric "go.opentelemetry.io/otel/metric" otelsdk "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" - - "github.com/hashicorp/consul/agent/hcp/client" ) // DefaultExportInterval is a default time interval between export of aggregated metrics. const DefaultExportInterval = 10 * time.Second +// ConfigProvider is required to provide custom metrics processing. +type ConfigProvider interface { + // GetLabels should return a set of OTEL attributes added by default all metrics. + GetLabels() map[string]string + // GetFilters should return filtesr that are required to enable metric processing. + // Filters act as an allowlist to collect only the required metrics. + GetFilters() *regexp.Regexp +} + // OTELSinkOpts is used to provide configuration when initializing an OTELSink using NewOTELSink. type OTELSinkOpts struct { - Reader otelsdk.Reader - Ctx context.Context - Filters []string - Labels map[string]string + Reader otelsdk.Reader + ConfigProvider ConfigProvider } // OTELSink captures and aggregates telemetry data as per the OpenTelemetry (OTEL) specification. @@ -38,7 +42,7 @@ type OTELSink struct { // spaceReplacer cleans the flattened key by removing any spaces. spaceReplacer *strings.Replacer logger hclog.Logger - filters *regexp.Regexp + cfgProvider ConfigProvider // meterProvider is an OTEL MeterProvider, the entrypoint to the OTEL Metrics SDK. // It handles reading/export of aggregated metric data. @@ -68,45 +72,32 @@ type OTELSink struct { // NewOTELReader returns a configured OTEL PeriodicReader to export metrics every X seconds. // It configures the reader with a custom OTELExporter with a MetricsClient to transform and export // metrics in OTLP format to an external url. -func NewOTELReader(client client.MetricsClient, url *url.URL, exportInterval time.Duration) otelsdk.Reader { - exporter := NewOTELExporter(client, url) +func NewOTELReader(client MetricsClient, endpointProvider EndpointProvider, exportInterval time.Duration) otelsdk.Reader { + exporter := NewOTELExporter(client, endpointProvider) return otelsdk.NewPeriodicReader(exporter, otelsdk.WithInterval(exportInterval)) } // NewOTELSink returns a sink which fits the Go Metrics MetricsSink interface. // It sets up a MeterProvider and Meter, key pieces of the OTEL Metrics SDK which // enable us to create OTEL Instruments to record measurements. -func NewOTELSink(opts *OTELSinkOpts) (*OTELSink, error) { +func NewOTELSink(ctx context.Context, opts *OTELSinkOpts) (*OTELSink, error) { if opts.Reader == nil { - return nil, fmt.Errorf("ferror: provide valid reader") + return nil, errors.New("ferror: provide valid reader") } - if opts.Ctx == nil { - return nil, fmt.Errorf("ferror: provide valid context") + if opts.ConfigProvider == nil { + return nil, errors.New("ferror: provide valid config provider") } - logger := hclog.FromContext(opts.Ctx).Named("otel_sink") - - filterList, err := newFilterRegex(opts.Filters) - if err != nil { - logger.Error("Failed to initialize all filters", "error", err) - } + logger := hclog.FromContext(ctx).Named("otel_sink") - attrs := make([]attribute.KeyValue, 0, len(opts.Labels)) - for k, v := range opts.Labels { - kv := attribute.KeyValue{ - Key: attribute.Key(k), - Value: attribute.StringValue(v), - } - attrs = append(attrs, kv) - } // Setup OTEL Metrics SDK to aggregate, convert and export metrics periodically. - res := resource.NewWithAttributes("", attrs...) + res := resource.NewSchemaless() meterProvider := otelsdk.NewMeterProvider(otelsdk.WithResource(res), otelsdk.WithReader(opts.Reader)) meter := meterProvider.Meter("github.com/hashicorp/consul/agent/hcp/telemetry") return &OTELSink{ - filters: filterList, + cfgProvider: opts.ConfigProvider, spaceReplacer: strings.NewReplacer(" ", "_"), logger: logger, meterProvider: meterProvider, @@ -138,12 +129,12 @@ func (o *OTELSink) IncrCounter(key []string, val float32) { func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometrics.Label) { k := o.flattenKey(key) - if !o.filters.MatchString(k) { + if !o.allowedMetric(k) { return } // Set value in global Gauge store. - o.gaugeStore.Set(k, float64(val), toAttributes(labels)) + o.gaugeStore.Set(k, float64(val), o.labelsToAttributes(labels)) o.mutex.Lock() defer o.mutex.Unlock() @@ -166,7 +157,7 @@ func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometr func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gometrics.Label) { k := o.flattenKey(key) - if !o.filters.MatchString(k) { + if !o.allowedMetric(k) { return } @@ -184,7 +175,7 @@ func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gomet o.histogramInstruments[k] = inst } - attrs := toAttributes(labels) + attrs := o.labelsToAttributes(labels) inst.Record(context.TODO(), float64(val), otelmetric.WithAttributes(attrs...)) } @@ -192,7 +183,7 @@ func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gomet func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gometrics.Label) { k := o.flattenKey(key) - if !o.filters.MatchString(k) { + if !o.allowedMetric(k) { return } @@ -211,7 +202,7 @@ func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gom o.counterInstruments[k] = inst } - attrs := toAttributes(labels) + attrs := o.labelsToAttributes(labels) inst.Add(context.TODO(), float64(val), otelmetric.WithAttributes(attrs...)) } @@ -228,17 +219,39 @@ func (o *OTELSink) flattenKey(parts []string) string { return buf.String() } -// toAttributes converts go metrics Labels into OTEL format []attributes.KeyValue -func toAttributes(labels []gometrics.Label) []attribute.KeyValue { - if len(labels) == 0 { - return nil +// filter checks the filter allowlist, if it exists, to verify if this metric should be recorded. +func (o *OTELSink) allowedMetric(key string) bool { + if filters := o.cfgProvider.GetFilters(); filters != nil { + return filters.MatchString(key) + } + + return true +} + +// labelsToAttributes converts go metrics and provider labels into OTEL format []attributes.KeyValue +func (o *OTELSink) labelsToAttributes(goMetricsLabels []gometrics.Label) []attribute.KeyValue { + providerLabels := o.cfgProvider.GetLabels() + + length := len(goMetricsLabels) + len(providerLabels) + if length == 0 { + return []attribute.KeyValue{} } - attrs := make([]attribute.KeyValue, len(labels)) - for i, label := range labels { - attrs[i] = attribute.KeyValue{ + + attrs := make([]attribute.KeyValue, 0, length) + // Convert provider labels to OTEL attributes. + for _, label := range goMetricsLabels { + attrs = append(attrs, attribute.KeyValue{ Key: attribute.Key(label.Name), Value: attribute.StringValue(label.Value), - } + }) + } + + // Convert provider labels to OTEL attributes. + for k, v := range providerLabels { + attrs = append(attrs, attribute.KeyValue{ + Key: attribute.Key(k), + Value: attribute.StringValue(v), + }) } return attrs diff --git a/agent/hcp/telemetry/otel_sink_test.go b/agent/hcp/telemetry/otel_sink_test.go index 34127bdf9d77..509dde299d62 100644 --- a/agent/hcp/telemetry/otel_sink_test.go +++ b/agent/hcp/telemetry/otel_sink_test.go @@ -3,6 +3,7 @@ package telemetry import ( "context" "fmt" + "regexp" "sort" "strings" "sync" @@ -16,15 +17,32 @@ import ( "go.opentelemetry.io/otel/sdk/resource" ) +type mockConfigProvider struct { + filter *regexp.Regexp + labels map[string]string +} + +func (m *mockConfigProvider) GetLabels() map[string]string { + return m.labels +} + +func (m *mockConfigProvider) GetFilters() *regexp.Regexp { + return m.filter +} + var ( - expectedResource = resource.NewWithAttributes("", attribute.KeyValue{ + expectedResource = resource.NewSchemaless() + + attrs = attribute.NewSet(attribute.KeyValue{ Key: attribute.Key("node_id"), Value: attribute.StringValue("test"), }) - - attrs = attribute.NewSet(attribute.KeyValue{ + attrsWithMetricLabel = attribute.NewSet(attribute.KeyValue{ Key: attribute.Key("metric.label"), Value: attribute.StringValue("test"), + }, attribute.KeyValue{ + Key: attribute.Key("node_id"), + Value: attribute.StringValue("test"), }) expectedSinkMetrics = map[string]metricdata.Metrics{ @@ -35,7 +53,7 @@ var ( Data: metricdata.Gauge[float64]{ DataPoints: []metricdata.DataPoint[float64]{ { - Attributes: *attribute.EmptySet(), + Attributes: attrs, Value: float64(float32(0)), }, }, @@ -48,7 +66,7 @@ var ( Data: metricdata.Gauge[float64]{ DataPoints: []metricdata.DataPoint[float64]{ { - Attributes: attrs, + Attributes: attrsWithMetricLabel, Value: float64(float32(1.23)), }, }, @@ -61,7 +79,7 @@ var ( Data: metricdata.Sum[float64]{ DataPoints: []metricdata.DataPoint[float64]{ { - Attributes: *attribute.EmptySet(), + Attributes: attrs, Value: float64(float32(23.23)), }, }, @@ -74,7 +92,7 @@ var ( Data: metricdata.Sum[float64]{ DataPoints: []metricdata.DataPoint[float64]{ { - Attributes: attrs, + Attributes: attrsWithMetricLabel, Value: float64(float32(1.44)), }, }, @@ -87,7 +105,7 @@ var ( Data: metricdata.Histogram[float64]{ DataPoints: []metricdata.HistogramDataPoint[float64]{ { - Attributes: *attribute.EmptySet(), + Attributes: attrs, Count: 1, Sum: float64(float32(45.32)), Min: metricdata.NewExtrema(float64(float32(45.32))), @@ -103,7 +121,7 @@ var ( Data: metricdata.Histogram[float64]{ DataPoints: []metricdata.HistogramDataPoint[float64]{ { - Attributes: attrs, + Attributes: attrsWithMetricLabel, Count: 1, Sum: float64(float32(26.34)), Min: metricdata.NewExtrema(float64(float32(26.34))), @@ -121,34 +139,30 @@ func TestNewOTELSink(t *testing.T) { wantErr string opts *OTELSinkOpts }{ - "failsWithEmptyLogger": { - wantErr: "ferror: provide valid context", + "failsWithEmptyReader": { + wantErr: "ferror: provide valid reader", opts: &OTELSinkOpts{ - Reader: metric.NewManualReader(), + Reader: nil, + ConfigProvider: &mockConfigProvider{}, }, }, - "failsWithEmptyReader": { - wantErr: "ferror: provide valid reader", + "failsWithEmptyConfigProvider": { + wantErr: "ferror: provide valid config provider", opts: &OTELSinkOpts{ - Reader: nil, - Ctx: context.Background(), + Reader: metric.NewManualReader(), }, }, "success": { opts: &OTELSinkOpts{ - Ctx: context.Background(), - Reader: metric.NewManualReader(), - Labels: map[string]string{ - "server": "test", - }, - Filters: []string{"raft"}, + Reader: metric.NewManualReader(), + ConfigProvider: &mockConfigProvider{}, }, }, } { test := test t.Run(name, func(t *testing.T) { t.Parallel() - sink, err := NewOTELSink(test.opts) + sink, err := NewOTELSink(context.Background(), test.opts) if test.wantErr != "" { require.Error(t, err) require.Contains(t, err.Error(), test.wantErr) @@ -168,15 +182,16 @@ func TestOTELSink(t *testing.T) { ctx := context.Background() opts := &OTELSinkOpts{ - Reader: reader, - Ctx: ctx, - Filters: []string{"raft", "autopilot"}, - Labels: map[string]string{ - "node_id": "test", + Reader: reader, + ConfigProvider: &mockConfigProvider{ + filter: regexp.MustCompile("raft|autopilot"), + labels: map[string]string{ + "node_id": "test", + }, }, } - sink, err := NewOTELSink(opts) + sink, err := NewOTELSink(ctx, opts) require.NoError(t, err) labels := []gometrics.Label{ @@ -186,12 +201,15 @@ func TestOTELSink(t *testing.T) { }, } + sink.SetGauge([]string{"test", "bad_filter", "gauge"}, float32(0)) sink.SetGauge([]string{"consul", "raft", "leader"}, float32(0)) sink.SetGaugeWithLabels([]string{"consul", "autopilot", "healthy"}, float32(1.23), labels) + sink.IncrCounter([]string{"test", "bad_filter", "counter"}, float32(23.23)) sink.IncrCounter([]string{"consul", "raft", "state", "leader"}, float32(23.23)) sink.IncrCounterWithLabels([]string{"consul", "raft", "apply"}, float32(1.44), labels) + sink.AddSample([]string{"test", "bad_filter", "sample"}, float32(45.32)) sink.AddSample([]string{"consul", "raft", "leader", "lastContact"}, float32(45.32)) sink.AddSampleWithLabels([]string{"consul", "raft", "commitTime"}, float32(26.34), labels) @@ -202,23 +220,147 @@ func TestOTELSink(t *testing.T) { isSame(t, expectedSinkMetrics, collected) } +func TestLabelsToAttributes(t *testing.T) { + for name, test := range map[string]struct { + providerLabels map[string]string + goMetricsLabels []gometrics.Label + expectedOTELAttributes []attribute.KeyValue + }{ + "emptyLabels": { + expectedOTELAttributes: []attribute.KeyValue{}, + }, + "emptyGoMetricsLabels": { + providerLabels: map[string]string{ + "node_id": "test", + }, + expectedOTELAttributes: []attribute.KeyValue{ + { + Key: attribute.Key("node_id"), + Value: attribute.StringValue("test"), + }, + }, + }, + "emptyProviderLabels": { + goMetricsLabels: []gometrics.Label{ + { + Name: "server_type", + Value: "internal", + }, + }, + expectedOTELAttributes: []attribute.KeyValue{ + { + Key: attribute.Key("server_type"), + Value: attribute.StringValue("internal"), + }, + }, + }, + "combinedLabels": { + goMetricsLabels: []gometrics.Label{ + { + Name: "server_type", + Value: "internal", + }, + { + Name: "method", + Value: "get", + }, + }, + providerLabels: map[string]string{ + "node_id": "test", + "node_name": "labels_test", + }, + expectedOTELAttributes: []attribute.KeyValue{ + { + Key: attribute.Key("server_type"), + Value: attribute.StringValue("internal"), + }, + { + Key: attribute.Key("method"), + Value: attribute.StringValue("get"), + }, + { + Key: attribute.Key("node_id"), + Value: attribute.StringValue("test"), + }, + { + Key: attribute.Key("node_name"), + Value: attribute.StringValue("labels_test"), + }, + }, + }, + } { + test := test + t.Run(name, func(t *testing.T) { + t.Parallel() + ctx := context.Background() + opts := &OTELSinkOpts{ + Reader: metric.NewManualReader(), + ConfigProvider: &mockConfigProvider{ + filter: regexp.MustCompile("raft|autopilot"), + labels: test.providerLabels, + }, + } + sink, err := NewOTELSink(ctx, opts) + require.NoError(t, err) + + require.Equal(t, test.expectedOTELAttributes, sink.labelsToAttributes(test.goMetricsLabels)) + }) + } +} + +func TestOTELSinkFilters(t *testing.T) { + t.Parallel() + for name, tc := range map[string]struct { + cfgProvider ConfigProvider + expected bool + }{ + "emptyMatch": { + cfgProvider: &mockConfigProvider{}, + expected: true, + }, + "matchingFilter": { + cfgProvider: &mockConfigProvider{ + filter: regexp.MustCompile("raft"), + }, + expected: true, + }, + "mismatchFilter": {cfgProvider: &mockConfigProvider{ + filter: regexp.MustCompile("test"), + }}, + } { + tc := tc + t.Run(name, func(t *testing.T) { + t.Parallel() + testMetricKey := "consul.raft" + s, err := NewOTELSink(context.Background(), &OTELSinkOpts{ + ConfigProvider: tc.cfgProvider, + Reader: metric.NewManualReader(), + }) + require.NoError(t, err) + require.Equal(t, tc.expected, s.allowedMetric(testMetricKey)) + }) + } +} + func TestOTELSink_Race(t *testing.T) { reader := metric.NewManualReader() ctx := context.Background() + defaultLabels := map[string]string{ + "node_id": "test", + } opts := &OTELSinkOpts{ - Ctx: ctx, Reader: reader, - Labels: map[string]string{ - "node_id": "test", + ConfigProvider: &mockConfigProvider{ + filter: regexp.MustCompile("test"), + labels: defaultLabels, }, - Filters: []string{"test"}, } - sink, err := NewOTELSink(opts) + sink, err := NewOTELSink(context.Background(), opts) require.NoError(t, err) samples := 100 - expectedMetrics := generateSamples(samples) + expectedMetrics := generateSamples(samples, defaultLabels) wg := &sync.WaitGroup{} errCh := make(chan error, samples) for k, v := range expectedMetrics { @@ -240,8 +382,17 @@ func TestOTELSink_Race(t *testing.T) { } // generateSamples generates n of each gauges, counter and histogram measurements to use for test purposes. -func generateSamples(n int) map[string]metricdata.Metrics { +func generateSamples(n int, labels map[string]string) map[string]metricdata.Metrics { generated := make(map[string]metricdata.Metrics, 3*n) + attrs := *attribute.EmptySet() + + kvs := make([]attribute.KeyValue, 0, len(labels)) + for k, v := range labels { + kvs = append(kvs, attribute.KeyValue{Key: attribute.Key(k), Value: attribute.StringValue(v)}) + } + if len(kvs) > 0 { + attrs = attribute.NewSet(kvs...) + } for i := 0; i < n; i++ { v := 12.3 @@ -251,7 +402,7 @@ func generateSamples(n int) map[string]metricdata.Metrics { Data: metricdata.Gauge[float64]{ DataPoints: []metricdata.DataPoint[float64]{ { - Attributes: *attribute.EmptySet(), + Attributes: attrs, Value: float64(float32(v)), }, }, @@ -267,7 +418,7 @@ func generateSamples(n int) map[string]metricdata.Metrics { Data: metricdata.Sum[float64]{ DataPoints: []metricdata.DataPoint[float64]{ { - Attributes: *attribute.EmptySet(), + Attributes: attrs, Value: float64(float32(v)), }, }, @@ -284,7 +435,7 @@ func generateSamples(n int) map[string]metricdata.Metrics { Data: metricdata.Histogram[float64]{ DataPoints: []metricdata.HistogramDataPoint[float64]{ { - Attributes: *attribute.EmptySet(), + Attributes: attrs, Sum: float64(float32(v)), Max: metricdata.NewExtrema(float64(float32(v))), Min: metricdata.NewExtrema(float64(float32(v))), diff --git a/agent/hcp/telemetry_provider.go b/agent/hcp/telemetry_provider.go new file mode 100644 index 000000000000..eb0f23e804f3 --- /dev/null +++ b/agent/hcp/telemetry_provider.go @@ -0,0 +1,156 @@ +package hcp + +import ( + "context" + "fmt" + "net/url" + "regexp" + "sync" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + + "github.com/hashicorp/consul/agent/hcp/client" + "github.com/hashicorp/consul/agent/hcp/telemetry" +) + +var ( + // internalMetricRefreshFailure is a metric to monitor refresh failures. + internalMetricRefreshFailure []string = []string{"hcp", "telemetry_config_provider", "refresh", "failure"} + // internalMetricRefreshSuccess is a metric to monitor refresh successes. + internalMetricRefreshSuccess []string = []string{"hcp", "telemetry_config_provider", "refresh", "success"} +) + +// Ensure hcpProviderImpl implements telemetry provider interfaces. +var _ telemetry.ConfigProvider = &hcpProviderImpl{} +var _ telemetry.EndpointProvider = &hcpProviderImpl{} + +// hcpProviderImpl holds telemetry configuration and settings for continuous fetch of new config from HCP. +// it updates configuration, if changes are detected. +type hcpProviderImpl struct { + // cfg holds configuration that can be dynamically updated. + cfg *dynamicConfig + + // A reader-writer mutex is used as the provider is read heavy. + // OTEL components access telemetryConfig during metrics collection and export (read). + // Meanwhile, config is only updated when there are changes (write). + rw sync.RWMutex + // hcpClient is an authenticated client used to make HTTP requests to HCP. + hcpClient client.Client +} + +// dynamicConfig is a set of configurable settings for metrics collection, processing and export. +// fields MUST be exported to compute hash for equals method. +type dynamicConfig struct { + Endpoint *url.URL + Labels map[string]string + Filters *regexp.Regexp + // refreshInterval controls the interval at which configuration is fetched from HCP to refresh config. + RefreshInterval time.Duration +} + +// NewHCPProvider initializes and starts a HCP Telemetry provider with provided params. +func NewHCPProvider(ctx context.Context, hcpClient client.Client, telemetryCfg *client.TelemetryConfig) (*hcpProviderImpl, error) { + refreshInterval := telemetryCfg.RefreshConfig.RefreshInterval + // refreshInterval must be greater than 0, otherwise time.Ticker panics. + if refreshInterval <= 0 { + return nil, fmt.Errorf("invalid refresh interval: %d", refreshInterval) + } + + cfg := &dynamicConfig{ + Endpoint: telemetryCfg.MetricsConfig.Endpoint, + Labels: telemetryCfg.MetricsConfig.Labels, + Filters: telemetryCfg.MetricsConfig.Filters, + RefreshInterval: refreshInterval, + } + + t := &hcpProviderImpl{ + cfg: cfg, + hcpClient: hcpClient, + } + + go t.run(ctx, refreshInterval) + + return t, nil +} + +// run continously checks for updates to the telemetry configuration by making a request to HCP. +func (h *hcpProviderImpl) run(ctx context.Context, refreshInterval time.Duration) { + ticker := time.NewTicker(refreshInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if newCfg := h.getUpdate(ctx); newCfg != nil { + ticker.Reset(newCfg.RefreshInterval) + } + case <-ctx.Done(): + return + } + } +} + +// getUpdate makes a HTTP request to HCP to return a new metrics configuration +// and updates the hcpProviderImpl. +func (h *hcpProviderImpl) getUpdate(ctx context.Context) *dynamicConfig { + logger := hclog.FromContext(ctx).Named("telemetry_config_provider") + + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + telemetryCfg, err := h.hcpClient.FetchTelemetryConfig(ctx) + if err != nil { + logger.Error("failed to fetch telemetry config from HCP", "error", err) + metrics.IncrCounter(internalMetricRefreshFailure, 1) + return nil + } + + // newRefreshInterval of 0 or less can cause ticker Reset() panic. + newRefreshInterval := telemetryCfg.RefreshConfig.RefreshInterval + if newRefreshInterval <= 0 { + logger.Error("invalid refresh interval duration", "refreshInterval", newRefreshInterval) + metrics.IncrCounter(internalMetricRefreshFailure, 1) + return nil + } + + newDynamicConfig := &dynamicConfig{ + Filters: telemetryCfg.MetricsConfig.Filters, + Endpoint: telemetryCfg.MetricsConfig.Endpoint, + Labels: telemetryCfg.MetricsConfig.Labels, + RefreshInterval: newRefreshInterval, + } + + // Acquire write lock to update new configuration. + h.rw.Lock() + h.cfg = newDynamicConfig + h.rw.Unlock() + + metrics.IncrCounter(internalMetricRefreshSuccess, 1) + + return newDynamicConfig +} + +// GetEndpoint acquires a read lock to return endpoint configuration for consumers. +func (h *hcpProviderImpl) GetEndpoint() *url.URL { + h.rw.RLock() + defer h.rw.RUnlock() + + return h.cfg.Endpoint +} + +// GetFilters acquires a read lock to return filters configuration for consumers. +func (h *hcpProviderImpl) GetFilters() *regexp.Regexp { + h.rw.RLock() + defer h.rw.RUnlock() + + return h.cfg.Filters +} + +// GetLabels acquires a read lock to return labels configuration for consumers. +func (h *hcpProviderImpl) GetLabels() map[string]string { + h.rw.RLock() + defer h.rw.RUnlock() + + return h.cfg.Labels +} diff --git a/agent/hcp/telemetry_provider_test.go b/agent/hcp/telemetry_provider_test.go new file mode 100644 index 000000000000..684593b4f38b --- /dev/null +++ b/agent/hcp/telemetry_provider_test.go @@ -0,0 +1,384 @@ +package hcp + +import ( + "context" + "fmt" + "net/url" + "regexp" + "strings" + "sync" + "testing" + "time" + + "github.com/armon/go-metrics" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/hcp/client" +) + +const ( + testRefreshInterval = 100 * time.Millisecond + testSinkServiceName = "test.telemetry_config_provider" + testRaceWriteSampleCount = 100 + testRaceReadSampleCount = 5000 +) + +var ( + // Test constants to verify inmem sink metrics. + testMetricKeyFailure = testSinkServiceName + "." + strings.Join(internalMetricRefreshFailure, ".") + testMetricKeySuccess = testSinkServiceName + "." + strings.Join(internalMetricRefreshSuccess, ".") +) + +type testConfig struct { + filters string + endpoint string + labels map[string]string + refreshInterval time.Duration +} + +func TestNewTelemetryConfigProvider(t *testing.T) { + t.Parallel() + for name, tc := range map[string]struct { + testInputs *testConfig + wantErr string + }{ + "success": { + testInputs: &testConfig{ + refreshInterval: 1 * time.Second, + }, + }, + "failsWithInvalidRefreshInterval": { + testInputs: &testConfig{ + refreshInterval: 0 * time.Second, + }, + wantErr: "invalid refresh interval", + }, + } { + tc := tc + t.Run(name, func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + testCfg, err := testTelemetryCfg(tc.testInputs) + require.NoError(t, err) + + cfgProvider, err := NewHCPProvider(ctx, client.NewMockClient(t), testCfg) + if tc.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.wantErr) + require.Nil(t, cfgProvider) + return + } + require.NotNil(t, cfgProvider) + }) + } +} + +func TestTelemetryConfigProviderGetUpdate(t *testing.T) { + for name, tc := range map[string]struct { + mockExpect func(*client.MockClient) + metricKey string + optsInputs *testConfig + expected *testConfig + }{ + "noChanges": { + optsInputs: &testConfig{ + endpoint: "http://test.com/v1/metrics", + filters: "test", + labels: map[string]string{ + "test_label": "123", + }, + refreshInterval: testRefreshInterval, + }, + mockExpect: func(m *client.MockClient) { + mockCfg, _ := testTelemetryCfg(&testConfig{ + endpoint: "http://test.com/v1/metrics", + filters: "test", + labels: map[string]string{ + "test_label": "123", + }, + refreshInterval: testRefreshInterval, + }) + m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mockCfg, nil) + }, + expected: &testConfig{ + endpoint: "http://test.com/v1/metrics", + labels: map[string]string{ + "test_label": "123", + }, + filters: "test", + refreshInterval: testRefreshInterval, + }, + metricKey: testMetricKeySuccess, + }, + "newConfig": { + optsInputs: &testConfig{ + endpoint: "http://test.com/v1/metrics", + filters: "test", + labels: map[string]string{ + "test_label": "123", + }, + refreshInterval: 2 * time.Second, + }, + mockExpect: func(m *client.MockClient) { + mockCfg, _ := testTelemetryCfg(&testConfig{ + endpoint: "http://newendpoint/v1/metrics", + filters: "consul", + labels: map[string]string{ + "new_label": "1234", + }, + refreshInterval: 2 * time.Second, + }) + m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mockCfg, nil) + }, + expected: &testConfig{ + endpoint: "http://newendpoint/v1/metrics", + filters: "consul", + labels: map[string]string{ + "new_label": "1234", + }, + refreshInterval: 2 * time.Second, + }, + metricKey: testMetricKeySuccess, + }, + "sameConfigInvalidRefreshInterval": { + optsInputs: &testConfig{ + endpoint: "http://test.com/v1/metrics", + filters: "test", + labels: map[string]string{ + "test_label": "123", + }, + refreshInterval: testRefreshInterval, + }, + mockExpect: func(m *client.MockClient) { + mockCfg, _ := testTelemetryCfg(&testConfig{ + refreshInterval: 0 * time.Second, + }) + m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mockCfg, nil) + }, + expected: &testConfig{ + endpoint: "http://test.com/v1/metrics", + labels: map[string]string{ + "test_label": "123", + }, + filters: "test", + refreshInterval: testRefreshInterval, + }, + metricKey: testMetricKeyFailure, + }, + "sameConfigHCPClientFailure": { + optsInputs: &testConfig{ + endpoint: "http://test.com/v1/metrics", + filters: "test", + labels: map[string]string{ + "test_label": "123", + }, + refreshInterval: testRefreshInterval, + }, + mockExpect: func(m *client.MockClient) { + m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("failure")) + }, + expected: &testConfig{ + endpoint: "http://test.com/v1/metrics", + filters: "test", + labels: map[string]string{ + "test_label": "123", + }, + refreshInterval: testRefreshInterval, + }, + metricKey: testMetricKeyFailure, + }, + } { + t.Run(name, func(t *testing.T) { + sink := initGlobalSink() + mockClient := client.NewMockClient(t) + tc.mockExpect(mockClient) + + dynamicCfg, err := testDynamicCfg(tc.optsInputs) + require.NoError(t, err) + + provider := &hcpProviderImpl{ + hcpClient: mockClient, + cfg: dynamicCfg, + } + + provider.getUpdate(context.Background()) + + // Verify endpoint provider returns correct config values. + require.Equal(t, tc.expected.endpoint, provider.GetEndpoint().String()) + require.Equal(t, tc.expected.filters, provider.GetFilters().String()) + require.Equal(t, tc.expected.labels, provider.GetLabels()) + + // Verify count for transform success metric. + interval := sink.Data()[0] + require.NotNil(t, interval, 1) + sv := interval.Counters[tc.metricKey] + assert.NotNil(t, sv.AggregateSample) + require.Equal(t, sv.AggregateSample.Count, 1) + }) + } +} + +// mockRaceClient is a mock HCP client that fetches TelemetryConfig. +// The mock TelemetryConfig returned can be manually updated at any time. +// It manages concurrent read/write access to config with a sync.RWMutex. +type mockRaceClient struct { + client.Client + cfg *client.TelemetryConfig + rw sync.RWMutex +} + +// updateCfg acquires a write lock and updates client config to a new value givent a count. +func (m *mockRaceClient) updateCfg(count int) (*client.TelemetryConfig, error) { + m.rw.Lock() + defer m.rw.Unlock() + + labels := map[string]string{fmt.Sprintf("label_%d", count): fmt.Sprintf("value_%d", count)} + + filters, err := regexp.Compile(fmt.Sprintf("consul_filter_%d", count)) + if err != nil { + return nil, err + } + + endpoint, err := url.Parse(fmt.Sprintf("http://consul-endpoint-%d.com", count)) + if err != nil { + return nil, err + } + + cfg := &client.TelemetryConfig{ + MetricsConfig: &client.MetricsConfig{ + Filters: filters, + Endpoint: endpoint, + Labels: labels, + }, + RefreshConfig: &client.RefreshConfig{ + RefreshInterval: testRefreshInterval, + }, + } + m.cfg = cfg + + return cfg, nil +} + +// FetchTelemetryConfig returns the current config held by the mockRaceClient. +func (m *mockRaceClient) FetchTelemetryConfig(ctx context.Context) (*client.TelemetryConfig, error) { + m.rw.RLock() + defer m.rw.RUnlock() + + return m.cfg, nil +} + +func TestTelemetryConfigProvider_Race(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + initCfg, err := testTelemetryCfg(&testConfig{ + endpoint: "test.com", + filters: "test", + labels: map[string]string{"test_label": "test_value"}, + refreshInterval: testRefreshInterval, + }) + require.NoError(t, err) + + m := &mockRaceClient{ + cfg: initCfg, + } + + // Start the provider goroutine, which fetches client TelemetryConfig every RefreshInterval. + provider, err := NewHCPProvider(ctx, m, m.cfg) + require.NoError(t, err) + + for count := 0; count < testRaceWriteSampleCount; count++ { + // Force a TelemetryConfig value change in the mockRaceClient. + newCfg, err := m.updateCfg(count) + require.NoError(t, err) + // Force provider to obtain new client TelemetryConfig immediately. + // This call is necessary to guarantee TelemetryConfig changes to assert on expected values below. + provider.getUpdate(context.Background()) + + // Start goroutines to access label configuration. + wg := &sync.WaitGroup{} + kickOff(wg, testRaceReadSampleCount, provider, func(provider *hcpProviderImpl) { + require.Equal(t, provider.GetLabels(), newCfg.MetricsConfig.Labels) + }) + + // Start goroutines to access endpoint configuration. + kickOff(wg, testRaceReadSampleCount, provider, func(provider *hcpProviderImpl) { + require.Equal(t, provider.GetFilters(), newCfg.MetricsConfig.Filters) + }) + + // Start goroutines to access filter configuration. + kickOff(wg, testRaceReadSampleCount, provider, func(provider *hcpProviderImpl) { + require.Equal(t, provider.GetEndpoint(), newCfg.MetricsConfig.Endpoint) + }) + + wg.Wait() + } +} + +func kickOff(wg *sync.WaitGroup, count int, provider *hcpProviderImpl, check func(cfgProvider *hcpProviderImpl)) { + for i := 0; i < count; i++ { + wg.Add(1) + go func() { + defer wg.Done() + check(provider) + }() + } +} + +// initGlobalSink is a helper function to initialize a Go metrics inmemsink. +func initGlobalSink() *metrics.InmemSink { + cfg := metrics.DefaultConfig(testSinkServiceName) + cfg.EnableHostname = false + + sink := metrics.NewInmemSink(10*time.Second, 10*time.Second) + metrics.NewGlobal(cfg, sink) + + return sink +} + +// testDynamicCfg converts testConfig inputs to a dynamicConfig to be used in tests. +func testDynamicCfg(testCfg *testConfig) (*dynamicConfig, error) { + filters, err := regexp.Compile(testCfg.filters) + if err != nil { + return nil, err + } + + endpoint, err := url.Parse(testCfg.endpoint) + if err != nil { + return nil, err + } + return &dynamicConfig{ + Endpoint: endpoint, + Filters: filters, + Labels: testCfg.labels, + RefreshInterval: testCfg.refreshInterval, + }, nil +} + +// testTelemetryCfg converts testConfig inputs to a TelemetryConfig to be used in tests. +func testTelemetryCfg(testCfg *testConfig) (*client.TelemetryConfig, error) { + filters, err := regexp.Compile(testCfg.filters) + if err != nil { + return nil, err + } + + endpoint, err := url.Parse(testCfg.endpoint) + if err != nil { + return nil, err + } + return &client.TelemetryConfig{ + MetricsConfig: &client.MetricsConfig{ + Endpoint: endpoint, + Filters: filters, + Labels: testCfg.labels, + }, + RefreshConfig: &client.RefreshConfig{ + RefreshInterval: testCfg.refreshInterval, + }, + }, nil +} diff --git a/go.mod b/go.mod index 43d103917349..f3a09080437d 100644 --- a/go.mod +++ b/go.mod @@ -60,7 +60,7 @@ require ( github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/hcl v1.0.0 github.com/hashicorp/hcp-scada-provider v0.2.3 - github.com/hashicorp/hcp-sdk-go v0.48.0 + github.com/hashicorp/hcp-sdk-go v0.55.0 github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038 github.com/hashicorp/memberlist v0.5.0 github.com/hashicorp/raft v1.5.0 diff --git a/go.sum b/go.sum index fbc1617b87b3..224d20d16cd3 100644 --- a/go.sum +++ b/go.sum @@ -543,8 +543,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/hcp-scada-provider v0.2.3 h1:AarYR+/Pcv+cMvPdAlb92uOBmZfEH6ny4+DT+4NY2VQ= github.com/hashicorp/hcp-scada-provider v0.2.3/go.mod h1:ZFTgGwkzNv99PLQjTsulzaCplCzOTBh0IUQsPKzrQFo= -github.com/hashicorp/hcp-sdk-go v0.48.0 h1:LWpFR7YVDz4uG4C/ixcy2tRbg7/BgjMcTh1bRkKaeBQ= -github.com/hashicorp/hcp-sdk-go v0.48.0/go.mod h1:hZqky4HEzsKwvLOt4QJlZUrjeQmb4UCZUhDP2HyQFfc= +github.com/hashicorp/hcp-sdk-go v0.55.0 h1:T4sQtgQfQJOD0uucT4hS+GZI1FmoHAQMADj277W++xw= +github.com/hashicorp/hcp-sdk-go v0.55.0/go.mod h1:hZqky4HEzsKwvLOt4QJlZUrjeQmb4UCZUhDP2HyQFfc= github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038 h1:n9J0rwVWXDpNd5iZnwY7w4WZyq53/rROeI7OVvLW8Ok= github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038/go.mod h1:n2TSygSNwsLJ76m8qFXTSc7beTb+auJxYdqrnoqwZWE= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=