Skip to content

Commit

Permalink
UpdateWorkflow ShardId based metrics (cadence-workflow#5080)
Browse files Browse the repository at this point in the history
Add shardId metrics to updateworkflow execution in persistence. Also created a reusable and explicitly named function if other operations require shardId metrics
  • Loading branch information
allenchen2244 authored Feb 8, 2023
1 parent 14142e0 commit f5efe3a
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 5 deletions.
4 changes: 4 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1839,8 +1839,10 @@ const (
PersistenceEmptyResponseCounter

PersistenceRequestsPerDomain
PersistenceRequestsPerShard
PersistenceFailuresPerDomain
PersistenceLatencyPerDomain
PersistenceLatencyPerShard
PersistenceErrShardExistsCounterPerDomain
PersistenceErrShardOwnershipLostCounterPerDomain
PersistenceErrConditionFailedCounterPerDomain
Expand Down Expand Up @@ -2417,8 +2419,10 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
PersistenceSampledCounter: {metricName: "persistence_sampled", metricType: Counter},
PersistenceEmptyResponseCounter: {metricName: "persistence_empty_response", metricType: Counter},
PersistenceRequestsPerDomain: {metricName: "persistence_requests_per_domain", metricRollupName: "persistence_requests", metricType: Counter},
PersistenceRequestsPerShard: {metricName: "persistence_requests_per_shard", metricType: Counter},
PersistenceFailuresPerDomain: {metricName: "persistence_errors_per_domain", metricRollupName: "persistence_errors", metricType: Counter},
PersistenceLatencyPerDomain: {metricName: "persistence_latency_per_domain", metricRollupName: "persistence_latency", metricType: Timer},
PersistenceLatencyPerShard: {metricName: "persistence_latency_per_shard", metricType: Timer},
PersistenceErrShardExistsCounterPerDomain: {metricName: "persistence_errors_shard_exists_per_domain", metricRollupName: "persistence_errors_shard_exists", metricType: Counter},
PersistenceErrShardOwnershipLostCounterPerDomain: {metricName: "persistence_errors_shard_ownership_lost_per_domain", metricRollupName: "persistence_errors_shard_ownership_lost", metricType: Counter},
PersistenceErrConditionFailedCounterPerDomain: {metricName: "persistence_errors_condition_failed_per_domain", metricRollupName: "persistence_errors_condition_failed", metricType: Counter},
Expand Down
5 changes: 5 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
caller = "caller"
signalName = "signalName"
workflowVersion = "workflow_version"
shardID = "shard_id"

allValue = "all"
unknownValue = "_unknown_"
Expand Down Expand Up @@ -80,6 +81,10 @@ func metricWithUnknown(key, value string) Tag {
return simpleMetric{key: key, value: value}
}

func ShardIDTag(shardIDStr string) Tag {
return metricWithUnknown(shardID, shardIDStr)
}

// DomainTag returns a new domain tag. For timers, this also ensures that we
// dual emit the metric with the all tag. If a blank domain is provided then
// this converts that to an unknown domain.
Expand Down
26 changes: 25 additions & 1 deletion common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package persistence

import (
"context"
"strconv"
"time"

"github.com/uber/cadence/common/config"
Expand Down Expand Up @@ -292,6 +293,29 @@ func (p *persistenceMetricsClientBase) updateErrorMetric(scope int, err error, m
}
}

func (p *persistenceMetricsClientBase) callWithDomainAndShardScope(scope int, op func() error, domainTag metrics.Tag, shardIDTag metrics.Tag) error {
domainMetricsScope := p.metricClient.Scope(scope, domainTag)
shardMetricsScope := p.metricClient.Scope(scope, shardIDTag)

domainMetricsScope.IncCounter(metrics.PersistenceRequestsPerDomain)
shardMetricsScope.IncCounter(metrics.PersistenceRequestsPerShard)

before := time.Now()
err := op()
duration := time.Since(before)

domainMetricsScope.RecordTimer(metrics.PersistenceLatencyPerDomain, duration)
shardMetricsScope.RecordTimer(metrics.PersistenceLatencyPerShard, duration)

if p.enableLatencyHistogramMetrics {
domainMetricsScope.RecordHistogramDuration(metrics.PersistenceLatencyHistogram, duration)
}
if err != nil {
p.updateErrorMetricPerDomain(scope, err, domainMetricsScope)
}
return err
}

func (p *persistenceMetricsClientBase) call(scope int, op func() error, tags ...metrics.Tag) error {
metricsScope := p.metricClient.Scope(scope, tags...)
if len(tags) > 0 {
Expand Down Expand Up @@ -418,7 +442,7 @@ func (p *workflowExecutionPersistenceClient) UpdateWorkflowExecution(
resp, err = p.persistence.UpdateWorkflowExecution(ctx, request)
return err
}
err := p.call(metrics.PersistenceUpdateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName))
err := p.callWithDomainAndShardScope(metrics.PersistenceUpdateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName), metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
if err != nil {
return nil, err
}
Expand Down
3 changes: 0 additions & 3 deletions service/history/decision/task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ func (handler *taskHandlerImpl) handleDecisions(

var results []*decisionResult
for _, decision := range decisions {

result, err := handler.handleDecisionWithResult(ctx, decision)
if err != nil || handler.stopProcessing {
return nil, err
Expand Down Expand Up @@ -214,12 +213,10 @@ func (handler *taskHandlerImpl) handleDecisionScheduleActivity(
ctx context.Context,
attr *types.ScheduleActivityTaskDecisionAttributes,
) (*decisionResult, error) {

handler.metricsClient.IncCounter(
metrics.HistoryRespondDecisionTaskCompletedScope,
metrics.DecisionTypeScheduleActivityCounter,
)

executionInfo := handler.mutableState.GetExecutionInfo()
domainID := executionInfo.DomainID
targetDomainID := domainID
Expand Down
1 change: 0 additions & 1 deletion service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,6 @@ func (s *contextImpl) UpdateWorkflowExecution(
if s.isClosed() {
return nil, ErrShardClosed
}

ctx, cancel, err := s.ensureMinContextTimeout(ctx)
if err != nil {
return nil, err
Expand Down

0 comments on commit f5efe3a

Please sign in to comment.