Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Activity E2E latency metrics #3108

Merged
merged 13 commits into from
Mar 13, 2020
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1622,6 +1622,7 @@ const (
TransferTaskThrottledCounter
TimerTaskThrottledCounter

ActivityE2ELatency
AckLevelUpdateCounter
AckLevelUpdateFailedCounter
DecisionTypeScheduleActivityCounter
Expand Down Expand Up @@ -1970,6 +1971,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
TaskBatchCompleteCounter: {metricName: "task_batch_complete_counter", metricType: Counter},
TransferTaskThrottledCounter: {metricName: "transfer_task_throttled_counter", metricType: Counter},
TimerTaskThrottledCounter: {metricName: "timer_task_throttled_counter", metricType: Counter},
ActivityE2ELatency: {metricName: "activity_end_to_end_latency", metricType: Timer},
AckLevelUpdateCounter: {metricName: "ack_level_update", metricType: Counter},
AckLevelUpdateFailedCounter: {metricName: "ack_level_update_failed", metricType: Counter},
DecisionTypeScheduleActivityCounter: {metricName: "schedule_activity_decision", metricType: Counter},
Expand Down
25 changes: 21 additions & 4 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1461,7 +1461,8 @@ func (e *historyEngineImpl) RespondActivityTaskCompleted(
RunId: common.StringPtr(token.RunID),
}

return e.updateWorkflowExecution(ctx, domainID, workflowExecution, true,
var activityStartedTime time.Time
err = e.updateWorkflowExecution(ctx, domainID, workflowExecution, true,
func(context workflowExecutionContext, mutableState mutableState) error {
if !mutableState.IsWorkflowExecutionRunning() {
return ErrWorkflowCompleted
Expand Down Expand Up @@ -1492,8 +1493,13 @@ func (e *historyEngineImpl) RespondActivityTaskCompleted(
// Unable to add ActivityTaskCompleted event to history
return &workflow.InternalServiceError{Message: "Unable to add ActivityTaskCompleted event to history."}
}
activityStartedTime = ai.StartedTime
return nil
})
if err == nil && !activityStartedTime.IsZero() {
e.metricsClient.RecordTimer(metrics.HistoryRespondActivityTaskCompletedScope, metrics.ActivityE2ELatency, time.Since(activityStartedTime))
vancexu marked this conversation as resolved.
Show resolved Hide resolved
}
return err
}

// RespondActivityTaskFailed completes an activity task failure.
Expand All @@ -1519,7 +1525,8 @@ func (e *historyEngineImpl) RespondActivityTaskFailed(
RunId: common.StringPtr(token.RunID),
}

return e.updateWorkflowExecutionWithAction(ctx, domainID, workflowExecution,
var activityStartedTime time.Time
err = e.updateWorkflowExecutionWithAction(ctx, domainID, workflowExecution,
func(context workflowExecutionContext, mutableState mutableState) (*updateWorkflowAction, error) {
if !mutableState.IsWorkflowExecutionRunning() {
return nil, ErrWorkflowCompleted
Expand Down Expand Up @@ -1560,8 +1567,13 @@ func (e *historyEngineImpl) RespondActivityTaskFailed(
postActions.createDecision = true
}

activityStartedTime = ai.StartedTime
return postActions, nil
})
if err == nil && !activityStartedTime.IsZero() {
e.metricsClient.RecordTimer(metrics.HistoryRespondActivityTaskFailedScope, metrics.ActivityE2ELatency, time.Since(activityStartedTime))
}
return err
}

// RespondActivityTaskCanceled completes an activity task failure.
Expand All @@ -1587,7 +1599,8 @@ func (e *historyEngineImpl) RespondActivityTaskCanceled(
RunId: common.StringPtr(token.RunID),
}

return e.updateWorkflowExecution(ctx, domainID, workflowExecution, true,
var activityStartedTime time.Time
err = e.updateWorkflowExecution(ctx, domainID, workflowExecution, true,
func(context workflowExecutionContext, mutableState mutableState) error {
if !mutableState.IsWorkflowExecutionRunning() {
return ErrWorkflowCompleted
Expand Down Expand Up @@ -1624,9 +1637,13 @@ func (e *historyEngineImpl) RespondActivityTaskCanceled(
return &workflow.InternalServiceError{Message: "Unable to add ActivityTaskCanceled event to history."}
}

activityStartedTime = ai.StartedTime
return nil
})

if err == nil && !activityStartedTime.IsZero() {
e.metricsClient.RecordTimer(metrics.HistoryClientRespondActivityTaskCanceledScope, metrics.ActivityE2ELatency, time.Since(activityStartedTime))
}
return err
}

// RecordActivityTaskHeartbeat records an hearbeat for a task.
Expand Down