diff --git a/agent/hcp/client/metrics_client.go b/agent/hcp/client/metrics_client.go index 76708301bcd41..16c17b368d883 100644 --- a/agent/hcp/client/metrics_client.go +++ b/agent/hcp/client/metrics_client.go @@ -3,6 +3,7 @@ package client import ( "bytes" "context" + "crypto/tls" "fmt" "io" "net/http" @@ -14,11 +15,11 @@ import ( colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" - "github.com/hashicorp/consul/agent/hcp/config" "github.com/hashicorp/consul/version" "github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-retryablehttp" + hcpcfg "github.com/hashicorp/hcp-sdk-go/config" ) const ( @@ -26,29 +27,50 @@ const ( defaultStreamTimeout = 15 * time.Second // Retry config - defaultRetryWaitMin = 15 * time.Second + defaultRetryWaitMin = 1 * time.Second defaultRetryWaitMax = 15 * time.Second defaultRetryMax = 4 ) +// MetricsClient exports Consul metrics in OTLP format to the HCP Telemetry Gateway. type MetricsClient interface { ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error } +// hcpConfig represents HCP config for TLS abstracted in an interface for easy testing. +type hcpConfig interface { + oauth2.TokenSource + APITLSConfig() *tls.Config +} + +// cloudConfig represents cloud config for TLS abstracted in an interface for easy testing. +type cloudConfig interface { + HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpConfig, error) +} + +// otlpClient is an implementation of MetricsClient with a retryable http client for retries and to honor throttle. +// It also holds default HTTP headers to add to export requests. type otlpClient struct { client *retryablehttp.Client headers map[string]string } +// TelemetryClientCfg is used to configure the MetricsClient. type TelemetryClientCfg struct { - cloudCfg config.CloudConfig - logger hclog.Logger + CloudCfg cloudConfig + Logger hclog.Logger } -func NewMetricsClient(cfg TelemetryClientCfg) (MetricsClient, error) { - c, err := newHTTPClient(cfg.cloudCfg, cfg.logger) +// NewMetricsClient returns a configured MetricsClient. +// The current implementation uses otlpClient to provide retry functionality. +func NewMetricsClient(cfg *TelemetryClientCfg) (MetricsClient, error) { + if cfg.CloudCfg == nil || cfg.Logger == nil { + return nil, fmt.Errorf("failed to init telemetry client: provide valid TelemetryClientCfg") + } + + c, err := newHTTPClient(cfg.CloudCfg, cfg.Logger) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to init telemetry client: %v", err) } headers := map[string]string{ @@ -62,7 +84,8 @@ func NewMetricsClient(cfg TelemetryClientCfg) (MetricsClient, error) { }, nil } -func newHTTPClient(cloudCfg config.CloudConfig, logger hclog.Logger) (*retryablehttp.Client, error) { +// newHTTPClient configures the retryable HTTP client. +func newHTTPClient(cloudCfg cloudConfig, logger hclog.Logger) (*retryablehttp.Client, error) { hcpCfg, err := cloudCfg.HCPConfig() if err != nil { return nil, err @@ -83,7 +106,7 @@ func newHTTPClient(cloudCfg config.CloudConfig, logger hclog.Logger) (*retryable retryClient := &retryablehttp.Client{ HTTPClient: client, - Logger: logger, + Logger: logger.Named("hcp_telemetry_client"), RetryWaitMin: defaultRetryWaitMin, RetryWaitMax: defaultRetryWaitMax, RetryMax: defaultRetryMax, @@ -94,6 +117,9 @@ func newHTTPClient(cloudCfg config.CloudConfig, logger hclog.Logger) (*retryable return retryClient, nil } +// ExportMetrics is the single method exposed by MetricsClient to export OTLP metrics to the desired HCP endpoint. +// The endpoint is configurable as the endpoint can change during periodic refresh of CCM telemetry config. +// By configuring the endpoint here, we can re-use the same client and override the endpoint when making a request. func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error { pbRequest := &colmetricpb.ExportMetricsServiceRequest{ ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics}, @@ -101,12 +127,12 @@ func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.R body, err := proto.Marshal(pbRequest) if err != nil { - return err + return fmt.Errorf("failed to export metrics: %v", err) } req, err := retryablehttp.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(body)) if err != nil { - return err + return fmt.Errorf("failed to export metrics: %v", err) } for k, v := range o.headers { @@ -115,23 +141,23 @@ func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.R resp, err := o.client.Do(req) if err != nil { - return err + return fmt.Errorf("failed to export metrics: %v", err) } var respData bytes.Buffer if _, err := io.Copy(&respData, resp.Body); err != nil { - return err + return fmt.Errorf("failed to export metrics: %v", err) } if respData.Len() != 0 { var respProto colmetricpb.ExportMetricsServiceResponse if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil { - return err + return fmt.Errorf("failed to export metrics: %v", err) } if respProto.PartialSuccess != nil { msg := respProto.PartialSuccess.GetErrorMessage() - return fmt.Errorf("failed to upload metrics: partial success: %s", msg) + return fmt.Errorf("failed to export metrics: partial success: %s", msg) } } diff --git a/agent/hcp/client/metrics_client_test.go b/agent/hcp/client/metrics_client_test.go new file mode 100644 index 0000000000000..46467e11ce707 --- /dev/null +++ b/agent/hcp/client/metrics_client_test.go @@ -0,0 +1,155 @@ +package client + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + + "golang.org/x/oauth2" + "google.golang.org/protobuf/proto" + + colpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" + + "github.com/hashicorp/consul/version" + "github.com/hashicorp/go-hclog" + hcpcfg "github.com/hashicorp/hcp-sdk-go/config" + "github.com/stretchr/testify/require" +) + +type mockHCPCfg struct{} + +func (m *mockHCPCfg) APITLSConfig() *tls.Config { + return nil +} + +func (m *mockHCPCfg) Token() (*oauth2.Token, error) { + return &oauth2.Token{ + AccessToken: "test-token", + }, nil +} + +type mockCloudCfg struct{} + +func (m mockCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpConfig, error) { + return &mockHCPCfg{}, nil +} + +type mockErrCloudCfg struct{} + +func (m mockErrCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpConfig, error) { + return nil, errors.New("test bad HCP config") +} + +func TestNewMetricsClient(t *testing.T) { + for name, test := range map[string]struct { + wantErr string + cfg *TelemetryClientCfg + }{ + "success": { + cfg: &TelemetryClientCfg{ + Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), + CloudCfg: &mockCloudCfg{}, + }, + }, + "failsWithoutCloudCfg": { + wantErr: "failed to init telemetry client", + cfg: &TelemetryClientCfg{ + Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), + CloudCfg: nil, + }, + }, + "failsWithoutLogger": { + wantErr: "failed to init telemetry client", + cfg: &TelemetryClientCfg{ + Logger: nil, + CloudCfg: &mockErrCloudCfg{}, + }, + }, + "failsHCPConfig": { + wantErr: "failed to init telemetry client", + cfg: &TelemetryClientCfg{ + Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), + CloudCfg: &mockErrCloudCfg{}, + }, + }, + } { + t.Run(name, func(t *testing.T) { + client, err := NewMetricsClient(test.cfg) + if test.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), test.wantErr) + return + } + + require.Nil(t, err) + require.NotNil(t, client) + }) + } +} + +func TestExportMetrics(t *testing.T) { + for name, test := range map[string]struct { + wantErr string + status int + }{ + "success": { + status: http.StatusOK, + }, + "failsWithNonRetryableError": { + status: http.StatusBadRequest, + wantErr: "failed to export metrics", + }, + } { + t.Run(name, func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, r.Header.Get("Content-Type"), "application/x-protobuf") + + require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token") + require.Equal(t, r.Header.Get("X-HCP-Source-Channel"), fmt.Sprintf("consul %s hcp-go-sdk/%s", version.GetHumanVersion(), version.Version)) + + body := colpb.ExportMetricsServiceResponse{} + + if test.wantErr != "" { + body.PartialSuccess = &colpb.ExportMetricsPartialSuccess{ + ErrorMessage: "partial failure", + } + } + bytes, err := proto.Marshal(&body) + + require.NoError(t, err) + + w.Header().Set("Content-Type", "application/x-protobuf") + w.WriteHeader(test.status) + w.Write(bytes) + })) + defer srv.Close() + + cfg := &TelemetryClientCfg{ + Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), + CloudCfg: mockCloudCfg{}, + } + + client, err := NewMetricsClient(cfg) + require.NoError(t, err) + + ctx := context.Background() + metrics := &metricpb.ResourceMetrics{} + err = client.ExportMetrics(ctx, metrics, srv.URL) + + if test.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), test.wantErr) + return + } + + require.NoError(t, err) + }) + } + +}