Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/dapr/dapr into grpc-update
Browse files Browse the repository at this point in the history
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
  • Loading branch information
ItalyPaleAle committed Nov 17, 2022
2 parents 3626a93 + c4c39b5 commit 1fed84a
Show file tree
Hide file tree
Showing 41 changed files with 3,228 additions and 573 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@
},
"workspaceFolder": "/workspaces/dapr",
"workspaceMount": "type=bind,source=${localWorkspaceFolder},target=/workspaces/dapr",
}
}
3 changes: 3 additions & 0 deletions .github/workflows/dapr-perf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,9 @@ jobs:
DAPR_TEST_DURATION: 1m
DAPR_PAYLOAD: "{}"
run: make test-perf-actor_reminder
- name: Run stress test actor double activation with SDK
if: env.TEST_PREFIX != ''
run: make test-perf-actor_double_activation
- name: Save control plane logs
if: always() && env.TEST_PREFIX != ''
run: |
Expand Down
18 changes: 18 additions & 0 deletions dapr/proto/runtime/v1/dapr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ message GetMetadataResponse {
repeated ActiveActorsCount active_actors_count = 2;
repeated RegisteredComponents registered_components = 3;
map<string, string> extended_metadata = 4;
repeated PubsubSubscription subscriptions = 5;
}

message ActiveActorsCount {
Expand All @@ -541,6 +542,23 @@ message RegisteredComponents {
repeated string capabilities = 4;
}

message PubsubSubscription {
string pubsub_name = 1;
string topic = 2;
map<string,string> metadata = 3;
PubsubSubscriptionRules rules = 4;
string dead_letter_topic = 5;
}

message PubsubSubscriptionRules {
repeated PubsubSubscriptionRule rules = 1;
}

message PubsubSubscriptionRule {
string match = 1;
string path = 2;
}

message SetMetadataRequest {
string key = 1;
string value = 2;
Expand Down
8 changes: 8 additions & 0 deletions docs/development/dapr-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ Dapr uses prometheus process and go collectors by default.
* dapr_runtime_component_init_total: The number of initialized components
* dapr_runtime_component_init_fail_total: The number of component initialization failures

#### Service Invocation

* dapr_runtime_service_invocation_req_sent_total: The number of remote service invocation requests sent
* dapr_runtime_service_invocation_req_recv_total: The number of remote service invocation requests received
* dapr_runtime_service_invocation_res_sent_total: The number of remote service invocation responses sent
* dapr_runtime_service_invocation_res_recv_total: The number of remote service invocation responses received
* dapr_runtime_service_invocation_res_recv_latency_ms: The remote service invocation round trip latency

#### Security

* dapr_runtime_mtls_init_total: The number of successful mTLS authenticator initialization.
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ require (
github.com/googleapis/gax-go/v2 v2.6.0 // indirect
github.com/gorilla/css v1.0.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grafana/k6-operator v0.0.7
github.com/grandcat/zeroconf v1.0.0 // indirect
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
Expand Down Expand Up @@ -266,7 +267,7 @@ require (
github.com/linkedin/goavro/v2 v2.9.8 // indirect
github.com/machinebox/graphql v0.2.2 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mailru/easyjson v0.7.6 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matoous/go-nanoid/v2 v2.0.0 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-ieproxy v0.0.1 // indirect
Expand Down
5 changes: 4 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,8 @@ github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/k6-operator v0.0.7 h1:i19pns94OU2UYnOeMiDnuAQEmLm5Yh7Sd+jFNVYS9TY=
github.com/grafana/k6-operator v0.0.7/go.mod h1:5HKB/+2FEQY1w9JgnkJM8JVmh/jQcILIeOZe11eYmRU=
github.com/grandcat/zeroconf v1.0.0 h1:uHhahLBKqwWBV6WZUDAT71044vwOTL+McW0mBJvo6kE=
github.com/grandcat/zeroconf v1.0.0/go.mod h1:lTKmG1zh86XyCoUeIHSA4FJMBwCJiQmGfcP2PdzytEs=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM=
Expand Down Expand Up @@ -1044,8 +1046,9 @@ github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamh
github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA=
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/matoous/go-nanoid v1.5.0/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U=
github.com/matoous/go-nanoid/v2 v2.0.0 h1:d19kur2QuLeHmJBkvYkFdhFBzLoo1XVm2GgTpL+9Tj0=
github.com/matoous/go-nanoid/v2 v2.0.0/go.mod h1:FtS4aGPVfEkxKxhdWPAspZpZSh1cOjtM7Ej/So3hR0g=
Expand Down
26 changes: 1 addition & 25 deletions pkg/diagnostics/component_monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/stretchr/testify/assert"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

const (
Expand All @@ -21,29 +20,6 @@ func componentsMetrics() *componentMetrics {
return c
}

func allTagsPresent(t *testing.T, v *view.View, tags []tag.Tag) {
for _, k := range v.TagKeys {
found := false

if k.Name() == "" {
continue
}

for _, tag := range tags {
if tag.Key.Name() == "" {
continue
}

if k.Name() == tag.Key.Name() {
found = true
break
}
}

assert.True(t, found)
}
}

func TestPubSub(t *testing.T) {
t.Run("record ingress count", func(t *testing.T) {
c := componentsMetrics()
Expand Down Expand Up @@ -210,7 +186,7 @@ func TestSecrets(t *testing.T) {
})
}

func TestInit(t *testing.T) {
func TestComponentMetricsInit(t *testing.T) {
c := componentsMetrics()
assert.True(t, c.enabled)
assert.Equal(t, c.appID, "test")
Expand Down
125 changes: 115 additions & 10 deletions pkg/diagnostics/service_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package diagnostics

import (
"context"
"strconv"
"time"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
Expand All @@ -12,16 +14,20 @@ import (

// Tag keys.
var (
componentKey = tag.MustNewKey("component")
failReasonKey = tag.MustNewKey("reason")
operationKey = tag.MustNewKey("operation")
actorTypeKey = tag.MustNewKey("actor_type")
trustDomainKey = tag.MustNewKey("trustDomain")
namespaceKey = tag.MustNewKey("namespace")
policyActionKey = tag.MustNewKey("policyAction")
resiliencyNameKey = tag.MustNewKey("name")
policyKey = tag.MustNewKey("policy")
componentNameKey = tag.MustNewKey("componentName")
componentKey = tag.MustNewKey("component")
failReasonKey = tag.MustNewKey("reason")
operationKey = tag.MustNewKey("operation")
actorTypeKey = tag.MustNewKey("actor_type")
trustDomainKey = tag.MustNewKey("trustDomain")
namespaceKey = tag.MustNewKey("namespace")
policyActionKey = tag.MustNewKey("policyAction")
resiliencyNameKey = tag.MustNewKey("name")
policyKey = tag.MustNewKey("policy")
componentNameKey = tag.MustNewKey("componentName")
destinationAppIDKey = tag.MustNewKey("dst_app_id")
sourceAppIDKey = tag.MustNewKey("src_app_id")
methodKey = tag.MustNewKey("method")
statusKey = tag.MustNewKey("status")
)

// serviceMetrics holds dapr runtime metric monitoring methods.
Expand Down Expand Up @@ -52,6 +58,13 @@ type serviceMetrics struct {
appPolicyActionBlocked *stats.Int64Measure
globalPolicyActionBlocked *stats.Int64Measure

// Service Invocation metrics
serviceInvocationRequestSentTotal *stats.Int64Measure
serviceInvocationRequestReceivedTotal *stats.Int64Measure
serviceInvocationResponseSentTotal *stats.Int64Measure
serviceInvocationResponseReceivedTotal *stats.Int64Measure
serviceInvocationResponseReceivedLatency *stats.Float64Measure

appID string
ctx context.Context
enabled bool
Expand Down Expand Up @@ -140,6 +153,28 @@ func newServiceMetrics() *serviceMetrics {
"The number of requests blocked by the global action specified in the access control policy.",
stats.UnitDimensionless),

// Service Invocation
serviceInvocationRequestSentTotal: stats.Int64(
"runtime/service_invocation/req_sent_total",
"The number of requests sent via service invocation.",
stats.UnitDimensionless),
serviceInvocationRequestReceivedTotal: stats.Int64(
"runtime/service_invocation/req_recv_total",
"The number of requests received via service invocation.",
stats.UnitDimensionless),
serviceInvocationResponseSentTotal: stats.Int64(
"runtime/service_invocation/res_sent_total",
"The number of responses sent via service invocation.",
stats.UnitDimensionless),
serviceInvocationResponseReceivedTotal: stats.Int64(
"runtime/service_invocation/res_recv_total",
"The number of responses received via service invocation.",
stats.UnitDimensionless),
serviceInvocationResponseReceivedLatency: stats.Float64(
"runtime/service_invocation/res_recv_latency_ms",
"The latency of service invocation response.",
stats.UnitMilliseconds),

// TODO: use the correct context for each request
ctx: context.Background(),
enabled: false,
Expand Down Expand Up @@ -172,6 +207,12 @@ func (s *serviceMetrics) Init(appID string) error {
diagUtils.NewMeasureView(s.globalPolicyActionAllowed, []tag.Key{appIDKey, trustDomainKey, namespaceKey, operationKey, httpMethodKey, policyActionKey}, view.Count()),
diagUtils.NewMeasureView(s.appPolicyActionBlocked, []tag.Key{appIDKey, trustDomainKey, namespaceKey, operationKey, httpMethodKey, policyActionKey}, view.Count()),
diagUtils.NewMeasureView(s.globalPolicyActionBlocked, []tag.Key{appIDKey, trustDomainKey, namespaceKey, operationKey, httpMethodKey, policyActionKey}, view.Count()),

diagUtils.NewMeasureView(s.serviceInvocationRequestSentTotal, []tag.Key{appIDKey, destinationAppIDKey, methodKey}, view.Count()),
diagUtils.NewMeasureView(s.serviceInvocationRequestReceivedTotal, []tag.Key{appIDKey, sourceAppIDKey, methodKey}, view.Count()),
diagUtils.NewMeasureView(s.serviceInvocationResponseSentTotal, []tag.Key{appIDKey, destinationAppIDKey, methodKey, statusKey}, view.Count()),
diagUtils.NewMeasureView(s.serviceInvocationResponseReceivedTotal, []tag.Key{appIDKey, sourceAppIDKey, methodKey, statusKey}, view.Count()),
diagUtils.NewMeasureView(s.serviceInvocationResponseReceivedLatency, []tag.Key{appIDKey, sourceAppIDKey, methodKey, statusKey}, defaultLatencyDistribution),
)
}

Expand Down Expand Up @@ -364,3 +405,67 @@ func (s *serviceMetrics) RequestBlockedByGlobalAction(appID, trustDomain, namesp
s.globalPolicyActionBlocked.M(1))
}
}

// ServiceInvocationRequestSent records the number of service invocation requests sent.
func (s *serviceMetrics) ServiceInvocationRequestSent(destinationAppID, method string) {
if s.enabled {
stats.RecordWithTags(
s.ctx,
diagUtils.WithTags(
appIDKey, s.appID,
destinationAppIDKey, destinationAppID,
methodKey, method),
s.serviceInvocationRequestSentTotal.M(1))
}
}

// ServiceInvocationRequestReceived records the number of service invocation requests received.
func (s *serviceMetrics) ServiceInvocationRequestReceived(sourceAppID, method string) {
if s.enabled {
stats.RecordWithTags(
s.ctx,
diagUtils.WithTags(
appIDKey, s.appID,
sourceAppIDKey, sourceAppID,
methodKey, method),
s.serviceInvocationRequestReceivedTotal.M(1))
}
}

// ServiceInvocationResponseSent records the number of service invocation responses sent.
func (s *serviceMetrics) ServiceInvocationResponseSent(destinationAppID, method string, status int32) {
if s.enabled {
statusCode := strconv.Itoa(int(status))
stats.RecordWithTags(
s.ctx,
diagUtils.WithTags(
appIDKey, s.appID,
destinationAppIDKey, destinationAppID,
methodKey, method,
statusKey, statusCode),
s.serviceInvocationResponseSentTotal.M(1))
}
}

// ServiceInvocationResponseReceived records the number of service invocation responses received.
func (s *serviceMetrics) ServiceInvocationResponseReceived(sourceAppID, method string, status int32, start time.Time) {
if s.enabled {
statusCode := strconv.Itoa(int(status))
stats.RecordWithTags(
s.ctx,
diagUtils.WithTags(
appIDKey, s.appID,
sourceAppIDKey, sourceAppID,
methodKey, method,
statusKey, statusCode),
s.serviceInvocationResponseReceivedTotal.M(1))
stats.RecordWithTags(
s.ctx,
diagUtils.WithTags(
appIDKey, s.appID,
sourceAppIDKey, sourceAppID,
methodKey, method,
statusKey, statusCode),
s.serviceInvocationResponseReceivedLatency.M(ElapsedSince(start)))
}
}
73 changes: 73 additions & 0 deletions pkg/diagnostics/service_monitoring_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package diagnostics

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opencensus.io/stats/view"
)

func servicesMetrics() *serviceMetrics {
s := newServiceMetrics()
s.Init("testAppId")

return s
}

func TestServiceInvocation(t *testing.T) {
t.Run("record service invocation request sent", func(t *testing.T) {
s := servicesMetrics()

s.ServiceInvocationRequestSent("testAppId2", "testMethod")

viewData, _ := view.RetrieveData("runtime/service_invocation/req_sent_total")
v := view.Find("runtime/service_invocation/req_sent_total")

allTagsPresent(t, v, viewData[0].Tags)
})

t.Run("record service invoation request received", func(t *testing.T) {
s := servicesMetrics()

s.ServiceInvocationRequestReceived("testAppId", "testMethod")

viewData, _ := view.RetrieveData("runtime/service_invocation/req_recv_total")
v := view.Find("runtime/service_invocation/req_recv_total")

allTagsPresent(t, v, viewData[0].Tags)
})

t.Run("record service invocation response sent", func(t *testing.T) {
s := servicesMetrics()

s.ServiceInvocationResponseSent("testAppId2", "testMethod", 200)

viewData, _ := view.RetrieveData("runtime/service_invocation/res_sent_total")
v := view.Find("runtime/service_invocation/res_sent_total")

allTagsPresent(t, v, viewData[0].Tags)
})

t.Run("record service invocation response received", func(t *testing.T) {
s := servicesMetrics()

s.ServiceInvocationResponseReceived("testAppId", "testMethod", 200, time.Now())

viewData, _ := view.RetrieveData("runtime/service_invocation/res_recv_total")
v := view.Find("runtime/service_invocation/res_recv_total")

allTagsPresent(t, v, viewData[0].Tags)

viewData2, _ := view.RetrieveData("runtime/service_invocation/res_recv_latency_ms")
v2 := view.Find("runtime/service_invocation/res_recv_latency_ms")

allTagsPresent(t, v2, viewData2[0].Tags)
})
}

func TestSerivceMonitoringInit(t *testing.T) {
c := servicesMetrics()
assert.True(t, c.enabled)
assert.Equal(t, c.appID, "testAppId")
}
Loading

0 comments on commit 1fed84a

Please sign in to comment.