From 1ac185d85cf8e33079400b359d54a821ce161ce4 Mon Sep 17 00:00:00 2001 From: Josh van Leeuwen Date: Wed, 10 Apr 2024 14:25:12 +0100 Subject: [PATCH] Restore legacy `dapr_http_server_response_count` HTTP metric (#7662) The legacy `dapr_http_server_response_count` metric had been removed from being served. This metric was relied upon by users. Adds metric back to be served when in legacy metric mode. Should be backported and patch released in 1.13. Signed-off-by: joshvanl --- pkg/diagnostics/http_monitoring.go | 20 ++- .../framework/process/daprd/daprd.go | 40 ++++- .../framework/process/daprd/options.go | 27 ++++ .../metrics/{grpcserver.go => grpc/basic.go} | 68 +++++---- .../suite/daprd/metrics/helpers.go | 141 ------------------ .../suite/daprd/metrics/http/basic.go | 78 ++++++++++ .../daprd/metrics/http/cardinality/high.go | 86 +++++++++++ .../daprd/metrics/http/cardinality/low.go | 86 +++++++++++ .../suite/daprd/metrics/http/http.go | 18 +++ .../metrics/httpserver_defaultcardinality.go | 83 ----------- .../metrics/httpserver_highcardinality.go | 97 ------------ .../metrics/httpserver_lowcardinality.go | 97 ------------ .../suite/daprd/metrics/metrics.go | 19 +++ .../suite/daprd/metrics/workflow.go | 48 +++--- 14 files changed, 439 insertions(+), 469 deletions(-) rename tests/integration/suite/daprd/metrics/{grpcserver.go => grpc/basic.go} (56%) delete mode 100644 tests/integration/suite/daprd/metrics/helpers.go create mode 100644 tests/integration/suite/daprd/metrics/http/basic.go create mode 100644 tests/integration/suite/daprd/metrics/http/cardinality/high.go create mode 100644 tests/integration/suite/daprd/metrics/http/cardinality/low.go create mode 100644 tests/integration/suite/daprd/metrics/http/http.go delete mode 100644 tests/integration/suite/daprd/metrics/httpserver_defaultcardinality.go delete mode 100644 tests/integration/suite/daprd/metrics/httpserver_highcardinality.go delete mode 100644 tests/integration/suite/daprd/metrics/httpserver_lowcardinality.go create mode 100644 tests/integration/suite/daprd/metrics/metrics.go diff --git a/pkg/diagnostics/http_monitoring.go b/pkg/diagnostics/http_monitoring.go index 52ecfc99f7e..893f3885288 100644 --- a/pkg/diagnostics/http_monitoring.go +++ b/pkg/diagnostics/http_monitoring.go @@ -49,6 +49,7 @@ type httpMetrics struct { serverResponseBytes *stats.Int64Measure serverLatency *stats.Float64Measure serverRequestCount *stats.Int64Measure + serverResponseCount *stats.Int64Measure clientSentBytes *stats.Int64Measure clientReceivedBytes *stats.Int64Measure @@ -83,6 +84,10 @@ func newHTTPMetrics() *httpMetrics { "http/server/request_count", "Count of HTTP requests processed by the server.", stats.UnitDimensionless), + serverResponseCount: stats.Int64( + "http/server/response_count", + "The number of HTTP responses", + stats.UnitDimensionless), clientSentBytes: stats.Int64( "http/client/sent_bytes", "Total bytes sent in request body (not including headers)", @@ -130,6 +135,10 @@ func (h *httpMetrics) ServerRequestCompleted(ctx context.Context, method, path, ctx, diagUtils.WithTags(h.serverLatency.Name(), appIDKey, h.appID, httpMethodKey, method, httpPathKey, path, httpStatusCodeKey, status), h.serverLatency.M(elapsed)) + stats.RecordWithTags( + ctx, + diagUtils.WithTags(h.serverResponseCount.Name(), appIDKey, h.appID, httpPathKey, path, httpMethodKey, method, httpStatusCodeKey, status), + h.serverResponseCount.M(1)) } else { stats.RecordWithTags( ctx, @@ -234,7 +243,8 @@ func (h *httpMetrics) Init(appID string, legacy bool) error { serverTags = []tag.Key{appIDKey, httpMethodKey, httpStatusCodeKey} clientTags = []tag.Key{appIDKey, httpStatusCodeKey} } - return view.Register( + + views := []*view.View{ diagUtils.NewMeasureView(h.serverRequestBytes, tags, defaultSizeDistribution), diagUtils.NewMeasureView(h.serverResponseBytes, tags, defaultSizeDistribution), diagUtils.NewMeasureView(h.serverLatency, serverTags, defaultLatencyDistribution), @@ -245,7 +255,13 @@ func (h *httpMetrics) Init(appID string, legacy bool) error { diagUtils.NewMeasureView(h.clientCompletedCount, clientTags, view.Count()), diagUtils.NewMeasureView(h.healthProbeRoundripLatency, []tag.Key{appIDKey, httpStatusCodeKey}, defaultLatencyDistribution), diagUtils.NewMeasureView(h.healthProbeCompletedCount, []tag.Key{appIDKey, httpStatusCodeKey}, view.Count()), - ) + } + + if h.legacy { + views = append(views, diagUtils.NewMeasureView(h.serverResponseCount, serverTags, view.Count())) + } + + return view.Register(views...) } // HTTPMiddleware is the middleware to track HTTP server-side requests. diff --git a/tests/integration/framework/process/daprd/daprd.go b/tests/integration/framework/process/daprd/daprd.go index 610cdbac3b3..1316f2ef15a 100644 --- a/tests/integration/framework/process/daprd/daprd.go +++ b/tests/integration/framework/process/daprd/daprd.go @@ -17,6 +17,7 @@ import ( "context" "encoding/json" "fmt" + "io" "net" "net/http" "os" @@ -288,14 +289,14 @@ func (d *Daprd) Namespace() string { return d.namespace } -func (d *Daprd) AppPort() int { - return d.appPort -} - func (d *Daprd) ipPort(port int) string { return "127.0.0.1:" + strconv.Itoa(port) } +func (d *Daprd) AppPort() int { + return d.appPort +} + func (d *Daprd) AppAddress() string { return d.ipPort(d.AppPort()) } @@ -370,3 +371,34 @@ func (d *Daprd) Metrics(t *testing.T, ctx context.Context) map[string]float64 { return metrics } + +func (d *Daprd) HTTPGet2xx(t *testing.T, ctx context.Context, path string) { + t.Helper() + d.http2xx(t, ctx, http.MethodGet, path, nil) +} + +func (d *Daprd) HTTPPost2xx(t *testing.T, ctx context.Context, path string, body io.Reader, headers ...string) { + t.Helper() + d.http2xx(t, ctx, http.MethodPost, path, body, headers...) +} + +func (d *Daprd) http2xx(t *testing.T, ctx context.Context, method, path string, body io.Reader, headers ...string) { + t.Helper() + + require.Zero(t, len(headers)%2, "headers must be key-value pairs") + + path = strings.TrimPrefix(path, "/") + url := fmt.Sprintf("http://%s/%s", d.HTTPAddress(), path) + req, err := http.NewRequestWithContext(ctx, method, url, body) + require.NoError(t, err) + + for i := 0; i < len(headers); i += 2 { + req.Header.Set(headers[i], headers[i+1]) + } + + resp, err := d.httpClient.Do(req) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.GreaterOrEqual(t, resp.StatusCode, 200, "expected 2xx status code") + require.Less(t, resp.StatusCode, 300, "expected 2xx status code") +} diff --git a/tests/integration/framework/process/daprd/options.go b/tests/integration/framework/process/daprd/options.go index e112315306b..73e0bf676af 100644 --- a/tests/integration/framework/process/daprd/options.go +++ b/tests/integration/framework/process/daprd/options.go @@ -14,6 +14,9 @@ limitations under the License. package daprd import ( + "fmt" + "os" + "path/filepath" "testing" "github.com/stretchr/testify/require" @@ -168,6 +171,17 @@ func WithResourceFiles(files ...string) Option { } } +func WithInMemoryStateStore(storeName string) Option { + return WithResourceFiles(`apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: ` + storeName + ` +spec: + type: state.in-memory + version: v1 +`) +} + // WithInMemoryActorStateStore adds an in-memory state store component, which is also enabled as actor state store. func WithInMemoryActorStateStore(storeName string) Option { return WithResourceFiles(`apiVersion: dapr.io/v1alpha1 @@ -195,6 +209,19 @@ func WithConfigs(configs ...string) Option { } } +func WithConfigManifests(t *testing.T, manifests ...string) Option { + configs := make([]string, len(manifests)) + for i, manifest := range manifests { + f := filepath.Join(t.TempDir(), fmt.Sprintf("config-%d.yaml", i)) + require.NoError(t, os.WriteFile(f, []byte(manifest), 0o600)) + configs[i] = f + } + + return func(o *options) { + o.configs = append(o.configs, configs...) + } +} + func WithPlacementAddresses(addresses ...string) Option { return func(o *options) { o.placementAddresses = addresses diff --git a/tests/integration/suite/daprd/metrics/grpcserver.go b/tests/integration/suite/daprd/metrics/grpc/basic.go similarity index 56% rename from tests/integration/suite/daprd/metrics/grpcserver.go rename to tests/integration/suite/daprd/metrics/grpc/basic.go index 99d3ae0539b..fddb2b7101f 100644 --- a/tests/integration/suite/daprd/metrics/grpcserver.go +++ b/tests/integration/suite/daprd/metrics/grpc/basic.go @@ -11,81 +11,95 @@ See the License for the specific language governing permissions and limitations under the License. */ -package metrics +package grpc import ( "context" + "fmt" + "net/http" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1" - runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1" + commonv1 "github.com/dapr/dapr/pkg/proto/common/v1" + rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1" "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + "github.com/dapr/dapr/tests/integration/framework/process/http/app" "github.com/dapr/dapr/tests/integration/suite" ) func init() { - suite.Register(new(grpcServer)) + suite.Register(new(basic)) } -// grpcServer tests daprd metrics for the gRPC server -type grpcServer struct { - base +// basic tests daprd metrics for the gRPC server +type basic struct { + daprd *daprd.Daprd } -func (m *grpcServer) Setup(t *testing.T) []framework.Option { - return m.testSetup(t) +func (b *basic) Setup(t *testing.T) []framework.Option { + app := app.New(t, + app.WithHandlerFunc("/hi", func(w http.ResponseWriter, _ *http.Request) { + fmt.Fprint(w, "OK") + }), + ) + + b.daprd = daprd.New(t, + daprd.WithAppPort(app.Port()), + daprd.WithAppProtocol("http"), + daprd.WithAppID("myapp"), + daprd.WithInMemoryStateStore("mystore"), + ) + + return []framework.Option{ + framework.WithProcesses(app, b.daprd), + } } -func (m *grpcServer) Run(t *testing.T, ctx context.Context) { - m.beforeRun(t, ctx) +func (b *basic) Run(t *testing.T, ctx context.Context) { + b.daprd.WaitUntilRunning(t, ctx) - t.Run("service invocation", func(t *testing.T) { - reqCtx, reqCancel := context.WithTimeout(ctx, 5*time.Second) - t.Cleanup(reqCancel) + client := b.daprd.GRPCClient(t, ctx) + t.Run("service invocation", func(t *testing.T) { // Invoke - _, err := m.grpcClient.InvokeService(reqCtx, &runtimev1pb.InvokeServiceRequest{ + _, err := client.InvokeService(ctx, &rtv1.InvokeServiceRequest{ Id: "myapp", - Message: &commonv1pb.InvokeRequest{ + Message: &commonv1.InvokeRequest{ Method: "hi", - HttpExtension: &commonv1pb.HTTPExtension{ - Verb: commonv1pb.HTTPExtension_GET, + HttpExtension: &commonv1.HTTPExtension{ + Verb: commonv1.HTTPExtension_GET, }, }, }) require.NoError(t, err) // Verify metrics - metrics := m.getMetrics(t, ctx) + metrics := b.daprd.Metrics(t, ctx) assert.Equal(t, 1, int(metrics["dapr_grpc_io_server_completed_rpcs|app_id:myapp|grpc_server_method:/dapr.proto.runtime.v1.Dapr/InvokeService|grpc_server_status:OK"])) }) t.Run("state stores", func(t *testing.T) { - reqCtx, reqCancel := context.WithTimeout(ctx, 5*time.Second) - t.Cleanup(reqCancel) - // Write state - _, err := m.grpcClient.SaveState(reqCtx, &runtimev1pb.SaveStateRequest{ + _, err := client.SaveState(ctx, &rtv1.SaveStateRequest{ StoreName: "mystore", - States: []*commonv1pb.StateItem{ + States: []*commonv1.StateItem{ {Key: "myvalue", Value: []byte(`"hello world"`)}, }, }) require.NoError(t, err) // Get state - _, err = m.grpcClient.GetState(reqCtx, &runtimev1pb.GetStateRequest{ + _, err = client.GetState(ctx, &rtv1.GetStateRequest{ StoreName: "mystore", Key: "myvalue", }) require.NoError(t, err) // Verify metrics - metrics := m.getMetrics(t, ctx) + metrics := b.daprd.Metrics(t, ctx) assert.Equal(t, 1, int(metrics["dapr_grpc_io_server_completed_rpcs|app_id:myapp|grpc_server_method:/dapr.proto.runtime.v1.Dapr/SaveState|grpc_server_status:OK"])) assert.Equal(t, 1, int(metrics["dapr_grpc_io_server_completed_rpcs|app_id:myapp|grpc_server_method:/dapr.proto.runtime.v1.Dapr/GetState|grpc_server_status:OK"])) }) diff --git a/tests/integration/suite/daprd/metrics/helpers.go b/tests/integration/suite/daprd/metrics/helpers.go deleted file mode 100644 index e7580a80666..00000000000 --- a/tests/integration/suite/daprd/metrics/helpers.go +++ /dev/null @@ -1,141 +0,0 @@ -/* -Copyright 2024 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package metrics - -import ( - "context" - "fmt" - "io" - "net/http" - "testing" - "time" - - "github.com/prometheus/common/expfmt" - "github.com/stretchr/testify/require" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - - runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1" - "github.com/dapr/dapr/tests/integration/framework" - procdaprd "github.com/dapr/dapr/tests/integration/framework/process/daprd" - prochttp "github.com/dapr/dapr/tests/integration/framework/process/http" - "github.com/dapr/dapr/tests/integration/framework/process/placement" - "github.com/dapr/dapr/tests/integration/framework/util" -) - -// Base struct for metrics tests -type base struct { - daprd *procdaprd.Daprd - httpClient *http.Client - grpcClient runtimev1pb.DaprClient - grpcConn *grpc.ClientConn - place *placement.Placement -} - -func (m *base) testSetup(t *testing.T, daprdOpts ...procdaprd.Option) []framework.Option { - handler := http.NewServeMux() - handler.HandleFunc("/hi", func(w http.ResponseWriter, r *http.Request) { - fmt.Fprint(w, "OK") - }) - handler.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte("")) - }) - srv := prochttp.New(t, prochttp.WithHandler(handler)) - m.place = placement.New(t) - - daprdOpts = append(daprdOpts, - procdaprd.WithAppID("myapp"), - procdaprd.WithAppPort(srv.Port()), - procdaprd.WithAppProtocol("http"), - procdaprd.WithPlacementAddresses(m.place.Address()), - procdaprd.WithInMemoryActorStateStore("mystore"), - ) - m.daprd = procdaprd.New(t, daprdOpts...) - - return []framework.Option{ - framework.WithProcesses(m.place, srv, m.daprd), - } -} - -func (m *base) beforeRun(t *testing.T, ctx context.Context) { - m.place.WaitUntilRunning(t, ctx) - m.daprd.WaitUntilRunning(t, ctx) - - m.httpClient = util.HTTPClient(t) - - var err error - m.grpcConn, err = grpc.DialContext(ctx, - m.daprd.GRPCAddress(), - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), - ) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, m.grpcConn.Close()) }) - m.grpcClient = runtimev1pb.NewDaprClient(m.grpcConn) - - t.Logf("Metrics URL: http://localhost:%d/metrics", m.daprd.MetricsPort()) -} - -// Returns a subset of metrics scraped from the metrics endpoint -func (m *base) getMetrics(t *testing.T, ctx context.Context) map[string]float64 { - t.Helper() - - reqCtx, reqCancel := context.WithTimeout(ctx, 5*time.Second) - defer reqCancel() - - req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, fmt.Sprintf("http://localhost:%d/metrics", m.daprd.MetricsPort()), nil) - require.NoError(t, err) - - // Body is closed below but the linter isn't seeing that - //nolint:bodyclose - res, err := m.httpClient.Do(req) - require.NoError(t, err) - defer closeBody(t, res.Body) - require.Equal(t, http.StatusOK, res.StatusCode) - - // Extract the metrics - parser := expfmt.TextParser{} - metricFamilies, err := parser.TextToMetricFamilies(res.Body) - require.NoError(t, err) - - metrics := make(map[string]float64) - for _, mf := range metricFamilies { - for _, m := range mf.GetMetric() { - key := mf.GetName() - for _, l := range m.GetLabel() { - key += "|" + l.GetName() + ":" + l.GetValue() - } - metrics[key] = m.GetCounter().GetValue() - } - } - - return metrics -} - -func (m *base) doRequest(t *testing.T, req *http.Request) { - t.Helper() - - // Body is closed below but the linter isn't seeing that - //nolint:bodyclose - res, err := m.httpClient.Do(req) - require.NoError(t, err) - defer closeBody(t, res.Body) - require.True(t, res.StatusCode >= 200 && res.StatusCode <= 299) -} - -// Drain body before closing -func closeBody(t *testing.T, body io.ReadCloser) { - _, err := io.Copy(io.Discard, body) - require.NoError(t, err) -} diff --git a/tests/integration/suite/daprd/metrics/http/basic.go b/tests/integration/suite/daprd/metrics/http/basic.go new file mode 100644 index 00000000000..0235fff97a0 --- /dev/null +++ b/tests/integration/suite/daprd/metrics/http/basic.go @@ -0,0 +1,78 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package http + +import ( + "context" + "fmt" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + "github.com/dapr/dapr/tests/integration/framework/process/http/app" + "github.com/dapr/dapr/tests/integration/suite" +) + +func init() { + suite.Register(new(basic)) +} + +// basic tests daprd metrics for the HTTP server +type basic struct { + daprd *daprd.Daprd +} + +func (b *basic) Setup(t *testing.T) []framework.Option { + app := app.New(t, + app.WithHandlerFunc("/hi", func(w http.ResponseWriter, _ *http.Request) { + fmt.Fprint(w, "OK") + }), + ) + + b.daprd = daprd.New(t, + daprd.WithAppPort(app.Port()), + daprd.WithAppProtocol("http"), + daprd.WithAppID("myapp"), + daprd.WithInMemoryStateStore("mystore"), + ) + + return []framework.Option{ + framework.WithProcesses(app, b.daprd), + } +} + +func (b *basic) Run(t *testing.T, ctx context.Context) { + b.daprd.WaitUntilRunning(t, ctx) + + t.Run("service invocation", func(t *testing.T) { + b.daprd.HTTPGet2xx(t, ctx, "/v1.0/invoke/myapp/method/hi") + metrics := b.daprd.Metrics(t, ctx) + assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:/v1.0/invoke/myapp/method/hi|status:200"])) + }) + + t.Run("state stores", func(t *testing.T) { + body := `[{"key":"myvalue", "value":"hello world"}]` + b.daprd.HTTPPost2xx(t, ctx, "/v1.0/state/mystore", strings.NewReader(body), "content-type", "application/json") + + b.daprd.HTTPGet2xx(t, ctx, "/v1.0/state/mystore/myvalue") + + metrics := b.daprd.Metrics(t, ctx) + assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:POST|path:/v1.0/state/mystore|status:204"])) + assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:/v1.0/state/mystore|status:200"])) + }) +} diff --git a/tests/integration/suite/daprd/metrics/http/cardinality/high.go b/tests/integration/suite/daprd/metrics/http/cardinality/high.go new file mode 100644 index 00000000000..7bdbd32ad11 --- /dev/null +++ b/tests/integration/suite/daprd/metrics/http/cardinality/high.go @@ -0,0 +1,86 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cardinality + +import ( + "context" + "fmt" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + "github.com/dapr/dapr/tests/integration/framework/process/http/app" + "github.com/dapr/dapr/tests/integration/suite" +) + +func init() { + suite.Register(new(high)) +} + +// high tests daprd metrics for the HTTP server configured with high cardinality +type high struct { + daprd *daprd.Daprd +} + +func (h *high) Setup(t *testing.T) []framework.Option { + app := app.New(t, + app.WithHandlerFunc("/hi", func(w http.ResponseWriter, _ *http.Request) { + fmt.Fprint(w, "OK") + }), + ) + + h.daprd = daprd.New(t, + daprd.WithAppPort(app.Port()), + daprd.WithAppProtocol("http"), + daprd.WithAppID("myapp"), + daprd.WithInMemoryStateStore("mystore"), + daprd.WithConfigManifests(t, ` +apiVersion: dapr.io/v1alpha1 +kind: Configuration +metadata: + name: lowcardinality +spec: + metrics: + http: + increasedCardinality: true +`), + ) + + return []framework.Option{ + framework.WithProcesses(app, h.daprd), + } +} + +func (h *high) Run(t *testing.T, ctx context.Context) { + h.daprd.WaitUntilRunning(t, ctx) + + t.Run("service invocation", func(t *testing.T) { + h.daprd.HTTPGet2xx(t, ctx, "/v1.0/invoke/myapp/method/hi") + metrics := h.daprd.Metrics(t, ctx) + assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:/v1.0/invoke/myapp/method/hi|status:200"])) + }) + + t.Run("state stores", func(t *testing.T) { + body := `[{"key":"myvalue", "value":"hello world"}]` + h.daprd.HTTPPost2xx(t, ctx, "/v1.0/state/mystore", strings.NewReader(body), "content-type", "application/json") + h.daprd.HTTPGet2xx(t, ctx, "/v1.0/state/mystore/myvalue") + metrics := h.daprd.Metrics(t, ctx) + assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:POST|path:/v1.0/state/mystore|status:204"])) + assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:/v1.0/state/mystore|status:200"])) + }) +} diff --git a/tests/integration/suite/daprd/metrics/http/cardinality/low.go b/tests/integration/suite/daprd/metrics/http/cardinality/low.go new file mode 100644 index 00000000000..0e1712894a2 --- /dev/null +++ b/tests/integration/suite/daprd/metrics/http/cardinality/low.go @@ -0,0 +1,86 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cardinality + +import ( + "context" + "fmt" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + "github.com/dapr/dapr/tests/integration/framework/process/http/app" + "github.com/dapr/dapr/tests/integration/suite" +) + +func init() { + suite.Register(new(low)) +} + +// low tests daprd metrics for the HTTP server configured with low cardinality +type low struct { + daprd *daprd.Daprd +} + +func (l *low) Setup(t *testing.T) []framework.Option { + app := app.New(t, + app.WithHandlerFunc("/hi", func(w http.ResponseWriter, _ *http.Request) { + fmt.Fprint(w, "OK") + }), + ) + + l.daprd = daprd.New(t, + daprd.WithAppPort(app.Port()), + daprd.WithAppProtocol("http"), + daprd.WithAppID("myapp"), + daprd.WithInMemoryStateStore("mystore"), + daprd.WithConfigManifests(t, ` +apiVersion: dapr.io/v1alpha1 +kind: Configuration +metadata: + name: lowcardinality +spec: + metrics: + http: + increasedCardinality: false +`), + ) + + return []framework.Option{ + framework.WithProcesses(app, l.daprd), + } +} + +func (l *low) Run(t *testing.T, ctx context.Context) { + l.daprd.WaitUntilRunning(t, ctx) + + t.Run("service invocation", func(t *testing.T) { + l.daprd.HTTPGet2xx(t, ctx, "/v1.0/invoke/myapp/method/hi") + metrics := l.daprd.Metrics(t, ctx) + assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:InvokeService/myapp|status:200"])) + }) + + t.Run("state stores", func(t *testing.T) { + body := `[{"key":"myvalue", "value":"hello world"}]` + l.daprd.HTTPPost2xx(t, ctx, "/v1.0/state/mystore", strings.NewReader(body), "content-type", "application/json") + l.daprd.HTTPGet2xx(t, ctx, "/v1.0/state/mystore/myvalue") + metrics := l.daprd.Metrics(t, ctx) + assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:SaveState|status:204"])) + assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GetState|status:200"])) + }) +} diff --git a/tests/integration/suite/daprd/metrics/http/http.go b/tests/integration/suite/daprd/metrics/http/http.go new file mode 100644 index 00000000000..344aad27c52 --- /dev/null +++ b/tests/integration/suite/daprd/metrics/http/http.go @@ -0,0 +1,18 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieh. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package http + +import ( + _ "github.com/dapr/dapr/tests/integration/suite/daprd/metrics/http/cardinality" +) diff --git a/tests/integration/suite/daprd/metrics/httpserver_defaultcardinality.go b/tests/integration/suite/daprd/metrics/httpserver_defaultcardinality.go deleted file mode 100644 index e7285491775..00000000000 --- a/tests/integration/suite/daprd/metrics/httpserver_defaultcardinality.go +++ /dev/null @@ -1,83 +0,0 @@ -/* -Copyright 2024 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package metrics - -import ( - "context" - "fmt" - "net/http" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/dapr/dapr/tests/integration/framework" - "github.com/dapr/dapr/tests/integration/suite" -) - -func init() { - suite.Register(new(httpServerDefaultCardinality)) -} - -// httpServerDefaultCardinality tests daprd metrics for the HTTP server configured with the default cardinality -// TODO @ItalyPaleAle [MetricsCardinality] Change default in 1.14 to be based on low cardinality -type httpServerDefaultCardinality struct { - base -} - -func (m *httpServerDefaultCardinality) Setup(t *testing.T) []framework.Option { - return m.testSetup(t) -} - -func (m *httpServerDefaultCardinality) Run(t *testing.T, ctx context.Context) { - m.beforeRun(t, ctx) - - t.Run("service invocation", func(t *testing.T) { - reqCtx, reqCancel := context.WithTimeout(ctx, 5*time.Second) - t.Cleanup(reqCancel) - - // Invoke - req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, fmt.Sprintf("http://localhost:%d/v1.0/invoke/myapp/method/hi", m.daprd.HTTPPort()), nil) - require.NoError(t, err) - m.doRequest(t, req) - - // Verify metrics - metrics := m.getMetrics(t, ctx) - assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:/v1.0/invoke/myapp/method/hi|status:200"])) - }) - - t.Run("state stores", func(t *testing.T) { - reqCtx, reqCancel := context.WithTimeout(ctx, 5*time.Second) - t.Cleanup(reqCancel) - - // Write state - body := `[{"key":"myvalue", "value":"hello world"}]` - req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, fmt.Sprintf("http://localhost:%d/v1.0/state/mystore", m.daprd.HTTPPort()), strings.NewReader(body)) - require.NoError(t, err) - req.Header.Set("content-type", "application/json") - m.doRequest(t, req) - - // Get state - req, err = http.NewRequestWithContext(reqCtx, http.MethodGet, fmt.Sprintf("http://localhost:%d/v1.0/state/mystore/myvalue", m.daprd.HTTPPort()), nil) - require.NoError(t, err) - m.doRequest(t, req) - - // Verify metrics - metrics := m.getMetrics(t, ctx) - assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:POST|path:/v1.0/state/mystore|status:204"])) - assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:/v1.0/state/mystore|status:200"])) - }) -} diff --git a/tests/integration/suite/daprd/metrics/httpserver_highcardinality.go b/tests/integration/suite/daprd/metrics/httpserver_highcardinality.go deleted file mode 100644 index c5571f4d188..00000000000 --- a/tests/integration/suite/daprd/metrics/httpserver_highcardinality.go +++ /dev/null @@ -1,97 +0,0 @@ -/* -Copyright 2024 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package metrics - -import ( - "context" - "fmt" - "net/http" - "os" - "path/filepath" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/dapr/dapr/tests/integration/framework" - procdaprd "github.com/dapr/dapr/tests/integration/framework/process/daprd" - "github.com/dapr/dapr/tests/integration/suite" -) - -func init() { - suite.Register(new(httpServerHighCardinality)) -} - -// httpServerHighCardinality tests daprd metrics for the HTTP server configured with high cardinality -type httpServerHighCardinality struct { - base -} - -func (m *httpServerHighCardinality) Setup(t *testing.T) []framework.Option { - configFile := filepath.Join(t.TempDir(), "config.yaml") - require.NoError(t, os.WriteFile(configFile, []byte(` -apiVersion: dapr.io/v1alpha1 -kind: Configuration -metadata: - name: lowcardinality -spec: - metrics: - http: - increasedCardinality: true -`), 0o600)) - - return m.testSetup(t, procdaprd.WithConfigs(configFile)) -} - -func (m *httpServerHighCardinality) Run(t *testing.T, ctx context.Context) { - m.beforeRun(t, ctx) - - t.Run("service invocation", func(t *testing.T) { - reqCtx, reqCancel := context.WithTimeout(ctx, 5*time.Second) - t.Cleanup(reqCancel) - - // Invoke - req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, fmt.Sprintf("http://localhost:%d/v1.0/invoke/myapp/method/hi", m.daprd.HTTPPort()), nil) - require.NoError(t, err) - m.doRequest(t, req) - - // Verify metrics - metrics := m.getMetrics(t, ctx) - assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:/v1.0/invoke/myapp/method/hi|status:200"])) - }) - - t.Run("state stores", func(t *testing.T) { - reqCtx, reqCancel := context.WithTimeout(ctx, 5*time.Second) - t.Cleanup(reqCancel) - - // Write state - body := `[{"key":"myvalue", "value":"hello world"}]` - req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, fmt.Sprintf("http://localhost:%d/v1.0/state/mystore", m.daprd.HTTPPort()), strings.NewReader(body)) - require.NoError(t, err) - req.Header.Set("content-type", "application/json") - m.doRequest(t, req) - - // Get state - req, err = http.NewRequestWithContext(reqCtx, http.MethodGet, fmt.Sprintf("http://localhost:%d/v1.0/state/mystore/myvalue", m.daprd.HTTPPort()), nil) - require.NoError(t, err) - m.doRequest(t, req) - - // Verify metrics - metrics := m.getMetrics(t, ctx) - assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:POST|path:/v1.0/state/mystore|status:204"])) - assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:/v1.0/state/mystore|status:200"])) - }) -} diff --git a/tests/integration/suite/daprd/metrics/httpserver_lowcardinality.go b/tests/integration/suite/daprd/metrics/httpserver_lowcardinality.go deleted file mode 100644 index 3396b2ca6f2..00000000000 --- a/tests/integration/suite/daprd/metrics/httpserver_lowcardinality.go +++ /dev/null @@ -1,97 +0,0 @@ -/* -Copyright 2024 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package metrics - -import ( - "context" - "fmt" - "net/http" - "os" - "path/filepath" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/dapr/dapr/tests/integration/framework" - procdaprd "github.com/dapr/dapr/tests/integration/framework/process/daprd" - "github.com/dapr/dapr/tests/integration/suite" -) - -func init() { - suite.Register(new(httpServerLowCardinality)) -} - -// httpServerLowCardinality tests daprd metrics for the HTTP server configured with low cardinality -type httpServerLowCardinality struct { - base -} - -func (m *httpServerLowCardinality) Setup(t *testing.T) []framework.Option { - configFile := filepath.Join(t.TempDir(), "config.yaml") - require.NoError(t, os.WriteFile(configFile, []byte(` -apiVersion: dapr.io/v1alpha1 -kind: Configuration -metadata: - name: lowcardinality -spec: - metrics: - http: - increasedCardinality: false -`), 0o600)) - - return m.testSetup(t, procdaprd.WithConfigs(configFile)) -} - -func (m *httpServerLowCardinality) Run(t *testing.T, ctx context.Context) { - m.beforeRun(t, ctx) - - t.Run("service invocation", func(t *testing.T) { - reqCtx, reqCancel := context.WithTimeout(ctx, 5*time.Second) - t.Cleanup(reqCancel) - - // Invoke - req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, fmt.Sprintf("http://localhost:%d/v1.0/invoke/myapp/method/hi", m.daprd.HTTPPort()), nil) - require.NoError(t, err) - m.doRequest(t, req) - - // Verify metrics - metrics := m.getMetrics(t, ctx) - assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:InvokeService/myapp|status:200"])) - }) - - t.Run("state stores", func(t *testing.T) { - reqCtx, reqCancel := context.WithTimeout(ctx, 5*time.Second) - t.Cleanup(reqCancel) - - // Write state - body := `[{"key":"myvalue", "value":"hello world"}]` - req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, fmt.Sprintf("http://localhost:%d/v1.0/state/mystore", m.daprd.HTTPPort()), strings.NewReader(body)) - require.NoError(t, err) - req.Header.Set("content-type", "application/json") - m.doRequest(t, req) - - // Get state - req, err = http.NewRequestWithContext(reqCtx, http.MethodGet, fmt.Sprintf("http://localhost:%d/v1.0/state/mystore/myvalue", m.daprd.HTTPPort()), nil) - require.NoError(t, err) - m.doRequest(t, req) - - // Verify metrics - metrics := m.getMetrics(t, ctx) - assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:SaveState|status:204"])) - assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GetState|status:200"])) - }) -} diff --git a/tests/integration/suite/daprd/metrics/metrics.go b/tests/integration/suite/daprd/metrics/metrics.go new file mode 100644 index 00000000000..28066d38e4b --- /dev/null +++ b/tests/integration/suite/daprd/metrics/metrics.go @@ -0,0 +1,19 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieh. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + _ "github.com/dapr/dapr/tests/integration/suite/daprd/metrics/grpc" + _ "github.com/dapr/dapr/tests/integration/suite/daprd/metrics/http" +) diff --git a/tests/integration/suite/daprd/metrics/workflow.go b/tests/integration/suite/daprd/metrics/workflow.go index dd222577b4b..23233babece 100644 --- a/tests/integration/suite/daprd/metrics/workflow.go +++ b/tests/integration/suite/daprd/metrics/workflow.go @@ -17,7 +17,6 @@ import ( "context" "fmt" "testing" - "time" "github.com/microsoft/durabletask-go/api" "github.com/microsoft/durabletask-go/backend" @@ -27,6 +26,9 @@ import ( "github.com/stretchr/testify/require" "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + "github.com/dapr/dapr/tests/integration/framework/process/http/app" + "github.com/dapr/dapr/tests/integration/framework/process/placement" "github.com/dapr/dapr/tests/integration/suite" ) @@ -36,15 +38,31 @@ func init() { // workflow tests daprd metrics for workflows type workflow struct { - base + daprd *daprd.Daprd + place *placement.Placement } -func (m *workflow) Setup(t *testing.T) []framework.Option { - return m.testSetup(t) +func (w *workflow) Setup(t *testing.T) []framework.Option { + w.place = placement.New(t) + + app := app.New(t) + + w.daprd = daprd.New(t, + daprd.WithAppPort(app.Port()), + daprd.WithAppProtocol("http"), + daprd.WithAppID("myapp"), + daprd.WithPlacementAddresses(w.place.Address()), + daprd.WithInMemoryActorStateStore("mystore"), + ) + + return []framework.Option{ + framework.WithProcesses(w.place, app, w.daprd), + } } -func (m *workflow) Run(t *testing.T, ctx context.Context) { - m.beforeRun(t, ctx) +func (w *workflow) Run(t *testing.T, ctx context.Context) { + w.place.WaitUntilRunning(t, ctx) + w.daprd.WaitUntilRunning(t, ctx) // Register workflow r := task.NewTaskRegistry() @@ -66,22 +84,18 @@ func (m *workflow) Run(t *testing.T, ctx context.Context) { } return nil, nil }) - taskhubClient := client.NewTaskHubGrpcClient(m.grpcConn, backend.DefaultLogger()) - taskhubCtx, cancelTaskhub := context.WithCancel(ctx) - taskhubClient.StartWorkItemListener(taskhubCtx, r) - defer cancelTaskhub() + taskhubClient := client.NewTaskHubGrpcClient(w.daprd.GRPCConn(t, ctx), backend.DefaultLogger()) + taskhubClient.StartWorkItemListener(ctx, r) t.Run("successful workflow execution", func(t *testing.T) { id, err := taskhubClient.ScheduleNewOrchestration(ctx, "workflow", api.WithInput("activity_success")) require.NoError(t, err) - timeoutCtx, cancelTimeout := context.WithTimeout(ctx, 30*time.Second) - t.Cleanup(cancelTimeout) - metadata, err := taskhubClient.WaitForOrchestrationCompletion(timeoutCtx, id, api.WithFetchPayloads(true)) + metadata, err := taskhubClient.WaitForOrchestrationCompletion(ctx, id, api.WithFetchPayloads(true)) require.NoError(t, err) assert.True(t, metadata.IsComplete()) // Verify metrics - metrics := m.getMetrics(t, ctx) + metrics := w.daprd.Metrics(t, ctx) assert.Equal(t, 1, int(metrics["dapr_runtime_workflow_operation_count|app_id:myapp|namespace:|operation:create_workflow|status:success"])) assert.Equal(t, 1, int(metrics["dapr_runtime_workflow_execution_count|app_id:myapp|namespace:|status:success|workflow_name:workflow"])) assert.Equal(t, 1, int(metrics["dapr_runtime_workflow_activity_execution_count|activity_name:activity_success|app_id:myapp|namespace:|status:success"])) @@ -91,14 +105,12 @@ func (m *workflow) Run(t *testing.T, ctx context.Context) { t.Run("failed workflow execution", func(t *testing.T) { id, err := taskhubClient.ScheduleNewOrchestration(ctx, "workflow", api.WithInput("activity_failure")) require.NoError(t, err) - timeoutCtx, cancelTimeout := context.WithTimeout(ctx, 30*time.Second) - t.Cleanup(cancelTimeout) - metadata, err := taskhubClient.WaitForOrchestrationCompletion(timeoutCtx, id, api.WithFetchPayloads(true)) + metadata, err := taskhubClient.WaitForOrchestrationCompletion(ctx, id, api.WithFetchPayloads(true)) require.NoError(t, err) assert.True(t, metadata.IsComplete()) // Verify metrics - metrics := m.getMetrics(t, ctx) + metrics := w.daprd.Metrics(t, ctx) assert.Equal(t, 2, int(metrics["dapr_runtime_workflow_operation_count|app_id:myapp|namespace:|operation:create_workflow|status:success"])) assert.Equal(t, 1, int(metrics["dapr_runtime_workflow_execution_count|app_id:myapp|namespace:|status:failed|workflow_name:workflow"])) assert.Equal(t, 1, int(metrics["dapr_runtime_workflow_activity_execution_count|activity_name:activity_failure|app_id:myapp|namespace:|status:failed"]))