diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 1cc19dcbdcc..cfa7b9e781f 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -1839,8 +1839,10 @@ const ( PersistenceEmptyResponseCounter PersistenceRequestsPerDomain + PersistenceRequestsPerShard PersistenceFailuresPerDomain PersistenceLatencyPerDomain + PersistenceLatencyPerShard PersistenceErrShardExistsCounterPerDomain PersistenceErrShardOwnershipLostCounterPerDomain PersistenceErrConditionFailedCounterPerDomain @@ -2417,8 +2419,10 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ PersistenceSampledCounter: {metricName: "persistence_sampled", metricType: Counter}, PersistenceEmptyResponseCounter: {metricName: "persistence_empty_response", metricType: Counter}, PersistenceRequestsPerDomain: {metricName: "persistence_requests_per_domain", metricRollupName: "persistence_requests", metricType: Counter}, + PersistenceRequestsPerShard: {metricName: "persistence_requests_per_shard", metricType: Counter}, PersistenceFailuresPerDomain: {metricName: "persistence_errors_per_domain", metricRollupName: "persistence_errors", metricType: Counter}, PersistenceLatencyPerDomain: {metricName: "persistence_latency_per_domain", metricRollupName: "persistence_latency", metricType: Timer}, + PersistenceLatencyPerShard: {metricName: "persistence_latency_per_shard", metricType: Timer}, PersistenceErrShardExistsCounterPerDomain: {metricName: "persistence_errors_shard_exists_per_domain", metricRollupName: "persistence_errors_shard_exists", metricType: Counter}, PersistenceErrShardOwnershipLostCounterPerDomain: {metricName: "persistence_errors_shard_ownership_lost_per_domain", metricRollupName: "persistence_errors_shard_ownership_lost", metricType: Counter}, PersistenceErrConditionFailedCounterPerDomain: {metricName: "persistence_errors_condition_failed_per_domain", metricRollupName: "persistence_errors_condition_failed", metricType: Counter}, diff --git a/common/metrics/tags.go b/common/metrics/tags.go index cda46141fb1..8f990541e6f 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -52,6 +52,7 @@ const ( caller = "caller" signalName = "signalName" workflowVersion = "workflow_version" + shardID = "shard_id" allValue = "all" unknownValue = "_unknown_" @@ -80,6 +81,10 @@ func metricWithUnknown(key, value string) Tag { return simpleMetric{key: key, value: value} } +func ShardIDTag(shardIDStr string) Tag { + return metricWithUnknown(shardID, shardIDStr) +} + // DomainTag returns a new domain tag. For timers, this also ensures that we // dual emit the metric with the all tag. If a blank domain is provided then // this converts that to an unknown domain. diff --git a/common/persistence/persistenceMetricClients.go b/common/persistence/persistenceMetricClients.go index 92e0c809d52..c212ddbe1f0 100644 --- a/common/persistence/persistenceMetricClients.go +++ b/common/persistence/persistenceMetricClients.go @@ -22,6 +22,7 @@ package persistence import ( "context" + "strconv" "time" "github.com/uber/cadence/common/config" @@ -292,6 +293,29 @@ func (p *persistenceMetricsClientBase) updateErrorMetric(scope int, err error, m } } +func (p *persistenceMetricsClientBase) callWithDomainAndShardScope(scope int, op func() error, domainTag metrics.Tag, shardIDTag metrics.Tag) error { + domainMetricsScope := p.metricClient.Scope(scope, domainTag) + shardMetricsScope := p.metricClient.Scope(scope, shardIDTag) + + domainMetricsScope.IncCounter(metrics.PersistenceRequestsPerDomain) + shardMetricsScope.IncCounter(metrics.PersistenceRequestsPerShard) + + before := time.Now() + err := op() + duration := time.Since(before) + + domainMetricsScope.RecordTimer(metrics.PersistenceLatencyPerDomain, duration) + shardMetricsScope.RecordTimer(metrics.PersistenceLatencyPerShard, duration) + + if p.enableLatencyHistogramMetrics { + domainMetricsScope.RecordHistogramDuration(metrics.PersistenceLatencyHistogram, duration) + } + if err != nil { + p.updateErrorMetricPerDomain(scope, err, domainMetricsScope) + } + return err +} + func (p *persistenceMetricsClientBase) call(scope int, op func() error, tags ...metrics.Tag) error { metricsScope := p.metricClient.Scope(scope, tags...) if len(tags) > 0 { @@ -418,7 +442,7 @@ func (p *workflowExecutionPersistenceClient) UpdateWorkflowExecution( resp, err = p.persistence.UpdateWorkflowExecution(ctx, request) return err } - err := p.call(metrics.PersistenceUpdateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName)) + err := p.callWithDomainAndShardScope(metrics.PersistenceUpdateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName), metrics.ShardIDTag(strconv.Itoa(p.GetShardID()))) if err != nil { return nil, err } diff --git a/service/history/decision/task_handler.go b/service/history/decision/task_handler.go index ba0b29a7b14..d5cf742334c 100644 --- a/service/history/decision/task_handler.go +++ b/service/history/decision/task_handler.go @@ -138,7 +138,6 @@ func (handler *taskHandlerImpl) handleDecisions( var results []*decisionResult for _, decision := range decisions { - result, err := handler.handleDecisionWithResult(ctx, decision) if err != nil || handler.stopProcessing { return nil, err @@ -214,12 +213,10 @@ func (handler *taskHandlerImpl) handleDecisionScheduleActivity( ctx context.Context, attr *types.ScheduleActivityTaskDecisionAttributes, ) (*decisionResult, error) { - handler.metricsClient.IncCounter( metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DecisionTypeScheduleActivityCounter, ) - executionInfo := handler.mutableState.GetExecutionInfo() domainID := executionInfo.DomainID targetDomainID := domainID diff --git a/service/history/shard/context.go b/service/history/shard/context.go index ee6017e0ed8..118e96260aa 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -681,7 +681,6 @@ func (s *contextImpl) UpdateWorkflowExecution( if s.isClosed() { return nil, ErrShardClosed } - ctx, cancel, err := s.ensureMinContextTimeout(ctx) if err != nil { return nil, err