Skip to content

Commit

Permalink
Restore legacy dapr_http_server_response_count HTTP metric (dapr#7662)
Browse files Browse the repository at this point in the history
The legacy `dapr_http_server_response_count` metric had been removed
from being served. This metric was relied upon by users.

Adds metric back to be served when in legacy metric mode. Should be
backported and patch released in 1.13.

Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL authored Apr 10, 2024
1 parent f09b193 commit 1ac185d
Show file tree
Hide file tree
Showing 14 changed files with 439 additions and 469 deletions.
20 changes: 18 additions & 2 deletions pkg/diagnostics/http_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type httpMetrics struct {
serverResponseBytes *stats.Int64Measure
serverLatency *stats.Float64Measure
serverRequestCount *stats.Int64Measure
serverResponseCount *stats.Int64Measure

clientSentBytes *stats.Int64Measure
clientReceivedBytes *stats.Int64Measure
Expand Down Expand Up @@ -83,6 +84,10 @@ func newHTTPMetrics() *httpMetrics {
"http/server/request_count",
"Count of HTTP requests processed by the server.",
stats.UnitDimensionless),
serverResponseCount: stats.Int64(
"http/server/response_count",
"The number of HTTP responses",
stats.UnitDimensionless),
clientSentBytes: stats.Int64(
"http/client/sent_bytes",
"Total bytes sent in request body (not including headers)",
Expand Down Expand Up @@ -130,6 +135,10 @@ func (h *httpMetrics) ServerRequestCompleted(ctx context.Context, method, path,
ctx,
diagUtils.WithTags(h.serverLatency.Name(), appIDKey, h.appID, httpMethodKey, method, httpPathKey, path, httpStatusCodeKey, status),
h.serverLatency.M(elapsed))
stats.RecordWithTags(
ctx,
diagUtils.WithTags(h.serverResponseCount.Name(), appIDKey, h.appID, httpPathKey, path, httpMethodKey, method, httpStatusCodeKey, status),
h.serverResponseCount.M(1))
} else {
stats.RecordWithTags(
ctx,
Expand Down Expand Up @@ -234,7 +243,8 @@ func (h *httpMetrics) Init(appID string, legacy bool) error {
serverTags = []tag.Key{appIDKey, httpMethodKey, httpStatusCodeKey}
clientTags = []tag.Key{appIDKey, httpStatusCodeKey}
}
return view.Register(

views := []*view.View{
diagUtils.NewMeasureView(h.serverRequestBytes, tags, defaultSizeDistribution),
diagUtils.NewMeasureView(h.serverResponseBytes, tags, defaultSizeDistribution),
diagUtils.NewMeasureView(h.serverLatency, serverTags, defaultLatencyDistribution),
Expand All @@ -245,7 +255,13 @@ func (h *httpMetrics) Init(appID string, legacy bool) error {
diagUtils.NewMeasureView(h.clientCompletedCount, clientTags, view.Count()),
diagUtils.NewMeasureView(h.healthProbeRoundripLatency, []tag.Key{appIDKey, httpStatusCodeKey}, defaultLatencyDistribution),
diagUtils.NewMeasureView(h.healthProbeCompletedCount, []tag.Key{appIDKey, httpStatusCodeKey}, view.Count()),
)
}

if h.legacy {
views = append(views, diagUtils.NewMeasureView(h.serverResponseCount, serverTags, view.Count()))
}

return view.Register(views...)
}

// HTTPMiddleware is the middleware to track HTTP server-side requests.
Expand Down
40 changes: 36 additions & 4 deletions tests/integration/framework/process/daprd/daprd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -288,14 +289,14 @@ func (d *Daprd) Namespace() string {
return d.namespace
}

func (d *Daprd) AppPort() int {
return d.appPort
}

func (d *Daprd) ipPort(port int) string {
return "127.0.0.1:" + strconv.Itoa(port)
}

func (d *Daprd) AppPort() int {
return d.appPort
}

func (d *Daprd) AppAddress() string {
return d.ipPort(d.AppPort())
}
Expand Down Expand Up @@ -370,3 +371,34 @@ func (d *Daprd) Metrics(t *testing.T, ctx context.Context) map[string]float64 {

return metrics
}

func (d *Daprd) HTTPGet2xx(t *testing.T, ctx context.Context, path string) {
t.Helper()
d.http2xx(t, ctx, http.MethodGet, path, nil)
}

func (d *Daprd) HTTPPost2xx(t *testing.T, ctx context.Context, path string, body io.Reader, headers ...string) {
t.Helper()
d.http2xx(t, ctx, http.MethodPost, path, body, headers...)
}

func (d *Daprd) http2xx(t *testing.T, ctx context.Context, method, path string, body io.Reader, headers ...string) {
t.Helper()

require.Zero(t, len(headers)%2, "headers must be key-value pairs")

path = strings.TrimPrefix(path, "/")
url := fmt.Sprintf("http://%s/%s", d.HTTPAddress(), path)
req, err := http.NewRequestWithContext(ctx, method, url, body)
require.NoError(t, err)

for i := 0; i < len(headers); i += 2 {
req.Header.Set(headers[i], headers[i+1])
}

resp, err := d.httpClient.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.GreaterOrEqual(t, resp.StatusCode, 200, "expected 2xx status code")
require.Less(t, resp.StatusCode, 300, "expected 2xx status code")
}
27 changes: 27 additions & 0 deletions tests/integration/framework/process/daprd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ limitations under the License.
package daprd

import (
"fmt"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -168,6 +171,17 @@ func WithResourceFiles(files ...string) Option {
}
}

func WithInMemoryStateStore(storeName string) Option {
return WithResourceFiles(`apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: ` + storeName + `
spec:
type: state.in-memory
version: v1
`)
}

// WithInMemoryActorStateStore adds an in-memory state store component, which is also enabled as actor state store.
func WithInMemoryActorStateStore(storeName string) Option {
return WithResourceFiles(`apiVersion: dapr.io/v1alpha1
Expand Down Expand Up @@ -195,6 +209,19 @@ func WithConfigs(configs ...string) Option {
}
}

func WithConfigManifests(t *testing.T, manifests ...string) Option {
configs := make([]string, len(manifests))
for i, manifest := range manifests {
f := filepath.Join(t.TempDir(), fmt.Sprintf("config-%d.yaml", i))
require.NoError(t, os.WriteFile(f, []byte(manifest), 0o600))
configs[i] = f
}

return func(o *options) {
o.configs = append(o.configs, configs...)
}
}

func WithPlacementAddresses(addresses ...string) Option {
return func(o *options) {
o.placementAddresses = addresses
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,81 +11,95 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics
package grpc

import (
"context"
"fmt"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
commonv1 "github.com/dapr/dapr/pkg/proto/common/v1"
rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
"github.com/dapr/dapr/tests/integration/framework/process/http/app"
"github.com/dapr/dapr/tests/integration/suite"
)

func init() {
suite.Register(new(grpcServer))
suite.Register(new(basic))
}

// grpcServer tests daprd metrics for the gRPC server
type grpcServer struct {
base
// basic tests daprd metrics for the gRPC server
type basic struct {
daprd *daprd.Daprd
}

func (m *grpcServer) Setup(t *testing.T) []framework.Option {
return m.testSetup(t)
func (b *basic) Setup(t *testing.T) []framework.Option {
app := app.New(t,
app.WithHandlerFunc("/hi", func(w http.ResponseWriter, _ *http.Request) {
fmt.Fprint(w, "OK")
}),
)

b.daprd = daprd.New(t,
daprd.WithAppPort(app.Port()),
daprd.WithAppProtocol("http"),
daprd.WithAppID("myapp"),
daprd.WithInMemoryStateStore("mystore"),
)

return []framework.Option{
framework.WithProcesses(app, b.daprd),
}
}

func (m *grpcServer) Run(t *testing.T, ctx context.Context) {
m.beforeRun(t, ctx)
func (b *basic) Run(t *testing.T, ctx context.Context) {
b.daprd.WaitUntilRunning(t, ctx)

t.Run("service invocation", func(t *testing.T) {
reqCtx, reqCancel := context.WithTimeout(ctx, 5*time.Second)
t.Cleanup(reqCancel)
client := b.daprd.GRPCClient(t, ctx)

t.Run("service invocation", func(t *testing.T) {
// Invoke
_, err := m.grpcClient.InvokeService(reqCtx, &runtimev1pb.InvokeServiceRequest{
_, err := client.InvokeService(ctx, &rtv1.InvokeServiceRequest{
Id: "myapp",
Message: &commonv1pb.InvokeRequest{
Message: &commonv1.InvokeRequest{
Method: "hi",
HttpExtension: &commonv1pb.HTTPExtension{
Verb: commonv1pb.HTTPExtension_GET,
HttpExtension: &commonv1.HTTPExtension{
Verb: commonv1.HTTPExtension_GET,
},
},
})
require.NoError(t, err)

// Verify metrics
metrics := m.getMetrics(t, ctx)
metrics := b.daprd.Metrics(t, ctx)
assert.Equal(t, 1, int(metrics["dapr_grpc_io_server_completed_rpcs|app_id:myapp|grpc_server_method:/dapr.proto.runtime.v1.Dapr/InvokeService|grpc_server_status:OK"]))
})

t.Run("state stores", func(t *testing.T) {
reqCtx, reqCancel := context.WithTimeout(ctx, 5*time.Second)
t.Cleanup(reqCancel)

// Write state
_, err := m.grpcClient.SaveState(reqCtx, &runtimev1pb.SaveStateRequest{
_, err := client.SaveState(ctx, &rtv1.SaveStateRequest{
StoreName: "mystore",
States: []*commonv1pb.StateItem{
States: []*commonv1.StateItem{
{Key: "myvalue", Value: []byte(`"hello world"`)},
},
})
require.NoError(t, err)

// Get state
_, err = m.grpcClient.GetState(reqCtx, &runtimev1pb.GetStateRequest{
_, err = client.GetState(ctx, &rtv1.GetStateRequest{
StoreName: "mystore",
Key: "myvalue",
})
require.NoError(t, err)

// Verify metrics
metrics := m.getMetrics(t, ctx)
metrics := b.daprd.Metrics(t, ctx)
assert.Equal(t, 1, int(metrics["dapr_grpc_io_server_completed_rpcs|app_id:myapp|grpc_server_method:/dapr.proto.runtime.v1.Dapr/SaveState|grpc_server_status:OK"]))
assert.Equal(t, 1, int(metrics["dapr_grpc_io_server_completed_rpcs|app_id:myapp|grpc_server_method:/dapr.proto.runtime.v1.Dapr/GetState|grpc_server_status:OK"]))
})
Expand Down
Loading

0 comments on commit 1ac185d

Please sign in to comment.