Skip to content

Commit

Permalink
Merge branch 'master' into scheduler-update-cron-staging
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshVanL authored Oct 29, 2024
2 parents 5840b47 + 65ad4cd commit 805f781
Show file tree
Hide file tree
Showing 12 changed files with 315 additions and 61 deletions.
29 changes: 13 additions & 16 deletions .github/workflows/kind-e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,31 +66,28 @@ jobs:
fail-fast: false # Keep running if one leg fails.
matrix:
k8s-version:
- v1.23.13
- v1.24.7
- v1.25.3
- v1.31.0
- v1.30.4
- v1.29.8
mode:
- ha
- non-ha
# Map between K8s and KinD versions.
# This is attempting to make it a bit clearer what's being tested.
# See: https://github.com/kubernetes-sigs/kind/releases/tag/v0.11.1
include:
- k8s-version: v1.23.13
kind-version: v0.17.0
kind-image-sha: sha256:ef453bb7c79f0e3caba88d2067d4196f427794086a7d0df8df4f019d5e336b61
- k8s-version: v1.31.0
kind-version: v0.24.0
kind-image-sha: sha256:53df588e04085fd41ae12de0c3fe4c72f7013bba32a20e7325357a1ac94ba865
dapr-test-config-store: "postgres"
- k8s-version: v1.24.7
kind-version: v0.17.0
kind-image-sha: sha256:577c630ce8e509131eab1aea12c022190978dd2f745aac5eb1fe65c0807eb315
- k8s-version: v1.30.4
kind-version: v0.24.0
kind-image-sha: sha256:976ea815844d5fa93be213437e3ff5754cd599b040946b5cca43ca45c2047114
dapr-test-config-store: "redis"
- k8s-version: v1.25.3
kind-version: v0.17.0
kind-image-sha: sha256:f52781bc0d7a19fb6c405c2af83abfeb311f130707a0e219175677e366cc45d1
- k8s-version: v1.29.8
kind-version: v0.24.0
kind-image-sha: sha256:d46b7aa29567e93b27f7531d258c372e829d7224b25e3fc6ffdefed12476d3aa
dapr-test-config-store: "redis"
exclude:
- k8s-version: v1.23.13
mode: non-ha
steps:
- name: Check out code
uses: actions/checkout@v4
Expand Down Expand Up @@ -134,7 +131,7 @@ jobs:
echo "DAPR_TEST_LOG_PATH=$GITHUB_WORKSPACE/test_logs/${{ matrix.k8s-version }}_${{ matrix.mode }}" >> $GITHUB_ENV
- name: Create KinD Cluster
uses: helm/kind-action@v1.5.0
uses: helm/kind-action@v1.10.0
with:
config: kind.yaml
cluster_name: kind
Expand Down
1 change: 1 addition & 0 deletions docs/development/dapr-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ Dapr uses prometheus process and go collectors by default.
* dapr_resiliency_loaded: The number of resiliency policies loaded.
* dapr_resiliency_count: The number of times a resiliency policy has been executed.
* dapr_resiliency_activations_total: Number of times a resiliency policy has been activated in a building block after a failure or after a state change.
* dapr_resiliency_cb_state: A resiliency policy's current CircuitBreakerState state. 4 series are generated, one for each possible state, with the tag "status" being [unknown, closed, half-open, open]. The current state is 1, all other states are 0.

#### Workflow metrics

Expand Down
8 changes: 4 additions & 4 deletions pkg/api/grpc/proxy/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,10 @@ func (s *proxyTestSuite) TestResiliencyUnary() {
require.NoError(t, err)
assert.Len(t, rows, 2)
// 2 Ping failures
assert.Equal(t, int64(2), diag.GetValueForObservationWithTagSet(
assert.Equal(t, int64(2), diag.GetCountValueForObservationWithTagSet(
rows, map[tag.Tag]bool{diag.NewTag("status", strconv.Itoa(int(codes.Internal))): true}))
// 1 success
assert.Equal(t, int64(1), diag.GetValueForObservationWithTagSet(
assert.Equal(t, int64(1), diag.GetCountValueForObservationWithTagSet(
rows, map[tag.Tag]bool{diag.NewTag("status", strconv.Itoa(int(codes.OK))): true}))
})

Expand Down Expand Up @@ -548,7 +548,7 @@ func assertResponseReceiveMetricsSameCode(t *testing.T, requestType string, code
rows, err := view.RetrieveData(serviceInvocationResponseRecvName)
require.NoError(t, err)
assert.Len(t, rows, 1)
count := diag.GetValueForObservationWithTagSet(
count := diag.GetCountValueForObservationWithTagSet(
rows, map[tag.Tag]bool{
diag.NewTag("status", strconv.Itoa(int(code))): true,
diag.NewTag("type", requestType): true,
Expand All @@ -562,7 +562,7 @@ func assertRequestSentMetrics(t *testing.T, requestType string, requestsSentExpe
rows, err := view.RetrieveData(serviceInvocationRequestSentName)
require.NoError(t, err)
assert.Len(t, rows, 1)
requestsSent := diag.GetValueForObservationWithTagSet(
requestsSent := diag.GetCountValueForObservationWithTagSet(
rows, map[tag.Tag]bool{diag.NewTag("type", requestType): true})

if assertEqualFn == nil {
Expand Down
58 changes: 51 additions & 7 deletions pkg/diagnostics/resiliency_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.opencensus.io/tag"

diagUtils "github.com/dapr/dapr/pkg/diagnostics/utils"
"github.com/dapr/dapr/pkg/resiliency/breaker"
)

var (
Expand All @@ -17,16 +18,24 @@ var (

OutboundPolicyFlowDirection PolicyFlowDirection = "outbound"
InboundPolicyFlowDirection PolicyFlowDirection = "inbound"

cbStatuses = []string{
string(breaker.StateClosed),
string(breaker.StateHalfOpen),
string(breaker.StateOpen),
string(breaker.StateUnknown),
}
)

type PolicyType string

type PolicyFlowDirection string

type resiliencyMetrics struct {
policiesLoadCount *stats.Int64Measure
executionCount *stats.Int64Measure
activationsCount *stats.Int64Measure
policiesLoadCount *stats.Int64Measure
executionCount *stats.Int64Measure
activationsCount *stats.Int64Measure
circuitbreakerState *stats.Int64Measure

appID string
ctx context.Context
Expand All @@ -47,7 +56,10 @@ func newResiliencyMetrics() *resiliencyMetrics {
"resiliency/activations_total",
"Number of times a resiliency policyKey has been activated in a building block after a failure or after a state change.",
stats.UnitDimensionless),

circuitbreakerState: stats.Int64(
"resiliency/cb_state",
"A resiliency policy's current CircuitBreakerState state. 0 is closed, 1 is half-open, 2 is open, and -1 is unknown.",
stats.UnitDimensionless),
// TODO: how to use correct context
ctx: context.Background(),
enabled: false,
Expand All @@ -63,6 +75,7 @@ func (m *resiliencyMetrics) Init(id string) error {
diagUtils.NewMeasureView(m.policiesLoadCount, []tag.Key{appIDKey, resiliencyNameKey, namespaceKey}, view.Count()),
diagUtils.NewMeasureView(m.executionCount, []tag.Key{appIDKey, resiliencyNameKey, policyKey, namespaceKey, flowDirectionKey, targetKey, statusKey}, view.Count()),
diagUtils.NewMeasureView(m.activationsCount, []tag.Key{appIDKey, resiliencyNameKey, policyKey, namespaceKey, flowDirectionKey, targetKey, statusKey}, view.Count()),
diagUtils.NewMeasureView(m.circuitbreakerState, []tag.Key{appIDKey, resiliencyNameKey, policyKey, namespaceKey, flowDirectionKey, targetKey, statusKey}, view.LastValue()),
)
}

Expand All @@ -80,12 +93,42 @@ func (m *resiliencyMetrics) PolicyLoaded(resiliencyName, namespace string) {
// PolicyWithStatusExecuted records metric when policy is executed with added status information (e.g., circuit breaker open).
func (m *resiliencyMetrics) PolicyWithStatusExecuted(resiliencyName, namespace string, policy PolicyType, flowDirection PolicyFlowDirection, target string, status string) {
if m.enabled {
// Common tags for all metrics
commonTags := []interface{}{
appIDKey, m.appID,
resiliencyNameKey, resiliencyName,
policyKey, string(policy),
namespaceKey, namespace,
flowDirectionKey, string(flowDirection),
targetKey, target,
statusKey, // status appened on each recording
}

// Record count metric for all resiliency executions
_ = stats.RecordWithTags(
m.ctx,
diagUtils.WithTags(m.executionCount.Name(), appIDKey, m.appID, resiliencyNameKey, resiliencyName, policyKey, string(policy),
namespaceKey, namespace, flowDirectionKey, string(flowDirection), targetKey, target, statusKey, status),
diagUtils.WithTags(m.executionCount.Name(), append(commonTags, status)...),
m.executionCount.M(1),
)

// Record cb gauge, 4 metrics, one for each cb state, with the active state having a value of 1, otherwise 0
if policy == CircuitBreakerPolicy {
for _, s := range cbStatuses {
if s == status {
_ = stats.RecordWithTags(
m.ctx,
diagUtils.WithTags(m.circuitbreakerState.Name(), append(commonTags, s)...),
m.circuitbreakerState.M(1),
)
} else {
_ = stats.RecordWithTags(
m.ctx,
diagUtils.WithTags(m.circuitbreakerState.Name(), append(commonTags, s)...),
m.circuitbreakerState.M(0),
)
}
}
}
}
}

Expand All @@ -99,9 +142,10 @@ func (m *resiliencyMetrics) PolicyActivated(resiliencyName, namespace string, po
m.PolicyWithStatusActivated(resiliencyName, namespace, policy, flowDirection, target, "")
}

// PolicyWithStatusActivated records metric when policy is activated after a failure or in the case of circuit breaker after a state change. with added state/status (e.g., circuit breaker open).
// PolicyWithStatusActivated records metrics when policy is activated after a failure or in the case of circuit breaker after a state change. with added state/status (e.g., circuit breaker open).
func (m *resiliencyMetrics) PolicyWithStatusActivated(resiliencyName, namespace string, policy PolicyType, flowDirection PolicyFlowDirection, target string, status string) {
if m.enabled {
// Record combined activation measure
_ = stats.RecordWithTags(
m.ctx,
diagUtils.WithTags(m.activationsCount.Name(), appIDKey, m.appID, resiliencyNameKey, resiliencyName, policyKey, string(policy),
Expand Down
42 changes: 31 additions & 11 deletions pkg/diagnostics/resiliency_monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
const (
resiliencyCountViewName = "resiliency/count"
resiliencyActivationViewName = "resiliency/activations_total"
resiliencyCBStateViewName = "resiliency/cb_state"
resiliencyLoadedViewName = "resiliency/loaded"
testAppID = "fakeID"
testResiliencyName = "testResiliency"
Expand All @@ -33,7 +34,8 @@ func cleanupRegisteredViews() {
diag.CleanupRegisteredViews(
resiliencyCountViewName,
resiliencyLoadedViewName,
resiliencyActivationViewName)
resiliencyActivationViewName,
resiliencyCBStateViewName)
}

func TestResiliencyCountMonitoring(t *testing.T) {
Expand Down Expand Up @@ -202,10 +204,11 @@ func TestResiliencyCountMonitoring(t *testing.T) {

func TestResiliencyCountMonitoringCBStates(t *testing.T) {
tests := []struct {
name string
unitFn func()
wantNumberOfRows int
wantCbStateTagCount map[tag.Tag]int64
name string
unitFn func()
wantNumberOfRows int
wantCbStateTagCount map[tag.Tag]int64
wantCbStateLastValue tag.Tag
}{
{
name: "EndpointPolicyCloseState",
Expand All @@ -219,8 +222,9 @@ func TestResiliencyCountMonitoringCBStates(t *testing.T) {
})
}
},
wantNumberOfRows: 3,
wantCbStateTagCount: map[tag.Tag]int64{diag.NewTag(diag.StatusKey.Name(), "closed"): 2},
wantNumberOfRows: 3,
wantCbStateTagCount: map[tag.Tag]int64{diag.NewTag(diag.StatusKey.Name(), "closed"): 2},
wantCbStateLastValue: diag.NewTag(diag.StatusKey.Name(), "closed"),
},
{
name: "EndpointPolicyOpenState",
Expand All @@ -239,6 +243,7 @@ func TestResiliencyCountMonitoringCBStates(t *testing.T) {
diag.NewTag(diag.StatusKey.Name(), string(breaker.StateClosed)): 2,
diag.NewTag(diag.StatusKey.Name(), string(breaker.StateOpen)): 1,
},
wantCbStateLastValue: diag.NewTag(diag.StatusKey.Name(), "open"),
},
{
name: "EndpointPolicyHalfOpenState",
Expand All @@ -265,6 +270,7 @@ func TestResiliencyCountMonitoringCBStates(t *testing.T) {
diag.NewTag(diag.StatusKey.Name(), string(breaker.StateOpen)): 1,
diag.NewTag(diag.StatusKey.Name(), string(breaker.StateHalfOpen)): 1,
},
wantCbStateLastValue: diag.NewTag(diag.StatusKey.Name(), "half-open"),
},
}

Expand All @@ -277,6 +283,10 @@ func TestResiliencyCountMonitoringCBStates(t *testing.T) {
require.NoError(t, err)
require.Len(t, rows, test.wantNumberOfRows)

rowsCbState, err := view.RetrieveData(resiliencyCBStateViewName)
require.NoError(t, err)
require.NotNil(t, rowsCbState)

wantedTags := []tag.Tag{
diag.NewTag("app_id", testAppID),
diag.NewTag("name", testResiliencyName),
Expand All @@ -291,9 +301,19 @@ func TestResiliencyCountMonitoringCBStates(t *testing.T) {
diag.RequireTagExist(t, rows, wantTag)
}
for cbTag, wantCount := range test.wantCbStateTagCount {
gotCount := diag.GetValueForObservationWithTagSet(
gotCount := diag.GetCountValueForObservationWithTagSet(
rows, map[tag.Tag]bool{cbTag: true, diag.NewTag(diag.PolicyKey.Name(), string(diag.CircuitBreakerPolicy)): true})
require.Equal(t, wantCount, gotCount)

// Current (last value) state should have a value of 1, others should be 0
found, gotValue := diag.GetLastValueForObservationWithTagset(
rowsCbState, map[tag.Tag]bool{cbTag: true, diag.NewTag(diag.PolicyKey.Name(), string(diag.CircuitBreakerPolicy)): true})
require.True(t, found)
if cbTag.Value == test.wantCbStateLastValue.Value {
require.InDelta(t, float64(1), gotValue, 0)
} else {
require.InDelta(t, float64(0), gotValue, 0)
}
}
})
}
Expand Down Expand Up @@ -460,15 +480,15 @@ func TestResiliencyActivationsCountMonitoring(t *testing.T) {
diag.RequireTagExist(t, rows, wantTag)
}
for cbTag, wantCount := range test.wantCbStateTagCount {
gotCount := diag.GetValueForObservationWithTagSet(
gotCount := diag.GetCountValueForObservationWithTagSet(
rows, map[tag.Tag]bool{cbTag: true, diag.NewTag(diag.PolicyKey.Name(), string(diag.CircuitBreakerPolicy)): true})
require.Equal(t, wantCount, gotCount)
}
gotRetriesCount := diag.GetValueForObservationWithTagSet(
gotRetriesCount := diag.GetCountValueForObservationWithTagSet(
rows, map[tag.Tag]bool{diag.NewTag(diag.PolicyKey.Name(), string(diag.RetryPolicy)): true})
require.Equal(t, test.wantRetriesCount, gotRetriesCount)

gotTimeoutCount := diag.GetValueForObservationWithTagSet(
gotTimeoutCount := diag.GetCountValueForObservationWithTagSet(
rows, map[tag.Tag]bool{diag.NewTag(diag.PolicyKey.Name(), string(diag.TimeoutPolicy)): true})
require.Equal(t, test.wantTimeoutCount, gotTimeoutCount)
})
Expand Down
21 changes: 19 additions & 2 deletions pkg/diagnostics/testutils_unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func NewTag(key string, value string) tag.Tag {
}
}

// GetValueForObservationWithTagSet is a helper to find a row out of a slice of rows retrieved when executing view.RetrieveData
// GetCountValueForObservationWithTagSet is a helper to find a row out of a slice of rows retrieved when executing view.RetrieveData
// This particular row should have the tags present in the tag set.
func GetValueForObservationWithTagSet(rows []*view.Row, wantedTagSetCount map[tag.Tag]bool) int64 {
func GetCountValueForObservationWithTagSet(rows []*view.Row, wantedTagSetCount map[tag.Tag]bool) int64 {
for _, row := range rows {
foundTags := 0
for _, aTag := range row.Tags {
Expand All @@ -39,6 +39,23 @@ func GetValueForObservationWithTagSet(rows []*view.Row, wantedTagSetCount map[ta
return 0
}

// GetLastValueForObservationWithTagset is a helper to find a row out of a slice of rows retrieved when executing view.RetrieveData
// This particular row should have the tags present in the tag set.
func GetLastValueForObservationWithTagset(rows []*view.Row, wantedTagSetCount map[tag.Tag]bool) (bool, float64) {
for _, row := range rows {
foundTags := 0
for _, aTag := range row.Tags {
if wantedTagSetCount[aTag] {
foundTags++
}
}
if foundTags == len(wantedTagSetCount) {
return true, row.Data.(*view.LastValueData).Value
}
}
return false, -1
}

// RequireTagExist tries to find a tag in a slice of rows return from view.RetrieveData
func RequireTagExist(t *testing.T, rows []*view.Row, wantedTag tag.Tag) {
t.Helper()
Expand Down
8 changes: 0 additions & 8 deletions tests/config/kafka_override.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ affinity:
operator: In
values:
- linux
- key: kubernetes.io/arch
operator: In
values:
- amd64

zookeeper:
persistence:
Expand All @@ -47,7 +43,3 @@ zookeeper:
operator: In
values:
- linux
- key: kubernetes.io/arch
operator: In
values:
- amd64
4 changes: 0 additions & 4 deletions tests/config/mongodb_override.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,3 @@ affinity:
operator: In
values:
- linux
- key: kubernetes.io/arch
operator: In
values:
- amd64
4 changes: 0 additions & 4 deletions tests/config/postgres_override.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ primary:
operator: In
values:
- linux
- key: kubernetes.io/arch
operator: In
values:
- amd64
persistence:
enabled: false
tls:
Expand Down
4 changes: 0 additions & 4 deletions tests/config/redis_override.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,3 @@ master:
operator: In
values:
- linux
- key: kubernetes.io/arch
operator: In
values:
- amd64
Loading

0 comments on commit 805f781

Please sign in to comment.