Skip to content

Commit

Permalink
adding some description instrumentation (uber#6242)
Browse files Browse the repository at this point in the history
This adds a metric about workflow describe status, because I need to get this information for a future project. This is functionally identical metric to the existing frontend instrumentation, but adds the workflow status as a dimension.
  • Loading branch information
davidporter-id-au committed Aug 21, 2024
1 parent 69d0ce7 commit 947c4f7
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 0 deletions.
9 changes: 9 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,9 @@ const (
FrontendQueryWorkflowScope
// FrontendDescribeWorkflowExecutionScope is the metric scope for frontend.DescribeWorkflowExecution
FrontendDescribeWorkflowExecutionScope
// FrontendDescribeWorkflowExecutionStatusScope is a custom metric for more
// rich details about workflow description calls, including workflow open/closed status
FrontendDescribeWorkflowExecutionStatusScope
// FrontendDescribeTaskListScope is the metric scope for frontend.DescribeTaskList
FrontendDescribeTaskListScope
// FrontendResetStickyTaskListScope is the metric scope for frontend.ResetStickyTaskList
Expand Down Expand Up @@ -1801,6 +1804,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
FrontendDeprecateDomainScope: {operation: "DeprecateDomain"},
FrontendQueryWorkflowScope: {operation: "QueryWorkflow"},
FrontendDescribeWorkflowExecutionScope: {operation: "DescribeWorkflowExecution"},
FrontendDescribeWorkflowExecutionStatusScope: {operation: "DescribeWorkflowExecutionStatus"},
FrontendListTaskListPartitionsScope: {operation: "FrontendListTaskListPartitions"},
FrontendGetTaskListsByDomainScope: {operation: "FrontendGetTaskListsByDomain"},
FrontendRefreshWorkflowTasksScope: {operation: "FrontendRefreshWorkflowTasks"},
Expand Down Expand Up @@ -2129,6 +2133,9 @@ const (
KafkaConsumerMessageNackDlqErr
KafkaConsumerSessionStart

DescribeWorkflowStatusCount
DescribeWorkflowStatusError

GracefulFailoverLatency
GracefulFailoverFailure

Expand Down Expand Up @@ -2880,6 +2887,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
IsolationGroupStateHealthy: {metricName: "isolation_group_healthy", metricType: Counter},
ValidatedWorkflowCount: {metricName: "task_validator_count", metricType: Counter},
HashringViewIdentifier: {metricName: "hashring_view_identifier", metricType: Counter},
DescribeWorkflowStatusError: {metricName: "describe_wf_error", metricType: Counter},
DescribeWorkflowStatusCount: {metricName: "describe_wf_status", metricType: Counter},

AsyncRequestPayloadSize: {metricName: "async_request_payload_size_per_domain", metricRollupName: "async_request_payload_size", metricType: Timer},

Expand Down
7 changes: 7 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const (
pollerIsolationGroup = "poller_isolation_group"
asyncWFRequestType = "async_wf_request_type"
workflowTerminationReason = "workflow_termination_reason"
workflowCloseStatus = "workflow_close_status"

// limiter-side tags
globalRatelimitKey = "global_ratelimit_key"
Expand Down Expand Up @@ -285,6 +286,12 @@ func WorkflowTerminationReasonTag(value string) Tag {
return simpleMetric{key: workflowTerminationReason, value: value}
}

// WorkflowCloseStatusTag is a stringified workflow status
func WorkflowCloseStatusTag(value string) Tag {
value = safeAlphaNumericStringRE.ReplaceAllString(value, "_")
return simpleMetric{key: workflowCloseStatus, value: value}
}

// PartitionConfigTags returns a list of partition config tags
func PartitionConfigTags(partitionConfig map[string]string) []Tag {
tags := make([]Tag, 0, len(partitionConfig))
Expand Down
19 changes: 19 additions & 0 deletions service/frontend/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3415,6 +3415,8 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution(
Request: request,
})

wh.emitDescribeWorkflowExecutionMetrics(domainName, response, err)

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -4071,6 +4073,23 @@ func (hs HealthStatus) String() string {
}
}

func (wh *WorkflowHandler) emitDescribeWorkflowExecutionMetrics(domain string, response *types.DescribeWorkflowExecutionResponse, err error) {
scope := wh.GetMetricsClient().Scope(metrics.FrontendDescribeWorkflowExecutionStatusScope, metrics.DomainTag(domain))

if err != nil || response == nil {
scope.IncCounter(metrics.DescribeWorkflowStatusError)
return
}

status := "unknown"
if response.WorkflowExecutionInfo != nil && response.WorkflowExecutionInfo.CloseStatus != nil {
status = response.WorkflowExecutionInfo.CloseStatus.String()
}

scope = scope.Tagged(metrics.WorkflowCloseStatusTag(status))
scope.IncCounter(metrics.DescribeWorkflowStatusCount)
}

func getDomainWfIDRunIDTags(
domainName string,
wf *types.WorkflowExecution,
Expand Down
116 changes: 116 additions & 0 deletions service/frontend/api/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"

"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
Expand Down Expand Up @@ -4363,3 +4364,118 @@ func (s *workflowHandlerSuite) TestTerminateWorkflowExecution() {
})

}

func TestWorkflowDescribeEmitStatusMetrics(t *testing.T) {

tests := map[string]struct {
res *types.DescribeWorkflowExecutionResponse
err error
expectedCounters map[string]tally.CounterSnapshot
}{
"valid closed workflow": {
res: &types.DescribeWorkflowExecutionResponse{
WorkflowExecutionInfo: &types.WorkflowExecutionInfo{
CloseStatus: common.Ptr(types.WorkflowExecutionCloseStatusCompleted),
},
},
expectedCounters: map[string]tally.CounterSnapshot{
"describe_wf_status+domain=some-domain,operation=DescribeWorkflowExecutionStatus,workflow_close_status=COMPLETED": &counterSnapshotMock{
name: "describe_wf_status",
tags: map[string]string{
"domain": "some-domain",
"workflow_close_status": "COMPLETED",
"operation": "DescribeWorkflowExecutionStatus",
},
value: 1,
},
},
},
"A workflow not found": {
res: nil,
err: &types.EntityNotExistsError{},
expectedCounters: map[string]tally.CounterSnapshot{
"describe_wf_error+domain=some-domain,operation=DescribeWorkflowExecutionStatus": &counterSnapshotMock{
name: "describe_wf_error",
tags: map[string]string{
"domain": "some-domain",
"operation": "DescribeWorkflowExecutionStatus",
},
value: 1,
},
},
},
"A invalid input 1": {
res: nil,
err: nil,
expectedCounters: map[string]tally.CounterSnapshot{
"describe_wf_error+domain=some-domain,operation=DescribeWorkflowExecutionStatus": &counterSnapshotMock{
name: "describe_wf_error",
tags: map[string]string{
"domain": "some-domain",
"operation": "DescribeWorkflowExecutionStatus",
},
value: 1,
},
},
},
"invalid input 2": {
res: &types.DescribeWorkflowExecutionResponse{
WorkflowExecutionInfo: &types.WorkflowExecutionInfo{
// intentionally nil
// CloseStatus: common.Ptr(types.WorkflowExecutionCloseStatusCompleted),
},
},
expectedCounters: map[string]tally.CounterSnapshot{
"describe_wf_status+domain=some-domain,operation=DescribeWorkflowExecutionStatus,workflow_close_status=unknown": &counterSnapshotMock{
name: "describe_wf_status",
tags: map[string]string{
"domain": "some-domain",
"workflow_close_status": "unknown",
"operation": "DescribeWorkflowExecutionStatus",
},
value: 1,
},
},
},
}

for name, td := range tests {
t.Run(name, func(t *testing.T) {

scope := tally.NewTestScope("", nil)
mockR := resource.Test{
MetricsScope: scope,
MetricsClient: metrics.NewClient(scope, 1),
}

wh := WorkflowHandler{
Resource: &mockR,
}

wh.emitDescribeWorkflowExecutionMetrics("some-domain", td.res, td.err)
snap := scope.Snapshot()

for k, v := range td.expectedCounters {
_, ok := snap.Counters()[k]
if !ok {
t.Errorf("the metric string expected was not found. Expected a map with this key: %q\ngot %v", k, snap.Counters())
return
}

assert.Equal(t, snap.Counters()[k].Name(), v.Name())
assert.Equal(t, snap.Counters()[k].Value(), v.Value())
assert.Equal(t, snap.Counters()[k].Tags(), v.Tags())
}
})
}
}

type counterSnapshotMock struct {
name string
tags map[string]string
value int64
}

func (cs *counterSnapshotMock) Name() string { return cs.name }
func (cs *counterSnapshotMock) Tags() map[string]string { return cs.tags }
func (cs *counterSnapshotMock) Value() int64 { return cs.value }

0 comments on commit 947c4f7

Please sign in to comment.