Skip to content

Commit

Permalink
Add more metrics to async task dispatch (uber#6202)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Jul 31, 2024
1 parent 8fc4d27 commit 38c295d
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 43 deletions.
96 changes: 55 additions & 41 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2543,6 +2543,13 @@ const (
TaskLagPerTaskListGauge
TaskBacklogPerTaskListGauge
TaskCountPerTaskListGauge
AsyncMatchLocalPollCounterPerTaskList
AsyncMatchLocalPollLatencyPerTaskList
AsyncMatchForwardPollCounterPerTaskList
AsyncMatchForwardPollLatencyPerTaskList
AsyncMatchLocalPollAfterForwardFailedCounterPerTaskList
AsyncMatchLocalPollAfterForwardFailedLatencyPerTaskList
AsyncMatchAttemptPerTaskList

NumMatchingMetrics
)
Expand Down Expand Up @@ -3156,47 +3163,54 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
WorkflowIDCacheRequestsInternalRatelimitedCounter: {metricName: "workflow_id_internal_requests_ratelimited", metricType: Counter},
},
Matching: {
PollSuccessPerTaskListCounter: {metricName: "poll_success_per_tl", metricRollupName: "poll_success"},
PollTimeoutPerTaskListCounter: {metricName: "poll_timeouts_per_tl", metricRollupName: "poll_timeouts"},
PollSuccessWithSyncPerTaskListCounter: {metricName: "poll_success_sync_per_tl", metricRollupName: "poll_success_sync"},
LeaseRequestPerTaskListCounter: {metricName: "lease_requests_per_tl", metricRollupName: "lease_requests"},
LeaseFailurePerTaskListCounter: {metricName: "lease_failures_per_tl", metricRollupName: "lease_failures"},
ConditionFailedErrorPerTaskListCounter: {metricName: "condition_failed_errors_per_tl", metricRollupName: "condition_failed_errors"},
RespondQueryTaskFailedPerTaskListCounter: {metricName: "respond_query_failed_per_tl", metricRollupName: "respond_query_failed"},
SyncThrottlePerTaskListCounter: {metricName: "sync_throttle_count_per_tl", metricRollupName: "sync_throttle_count"},
BufferThrottlePerTaskListCounter: {metricName: "buffer_throttle_count_per_tl", metricRollupName: "buffer_throttle_count"},
BufferUnknownTaskDispatchError: {metricName: "buffer_unknown_task_dispatch_error_per_tl", metricRollupName: "buffer_unknown_task_dispatch_error"},
BufferIsolationGroupRedirectCounter: {metricName: "buffer_isolation_group_redirected_per_tl", metricRollupName: "buffer_isolation_group_redirected"},
BufferIsolationGroupRedirectFailureCounter: {metricName: "buffer_isolation_group_redirect_failure_per_tl", metricRollupName: "buffer_isolation_group_redirect_failure"},
BufferIsolationGroupMisconfiguredCounter: {metricName: "buffer_isolation_group_misconfigured_failure_per_tl", metricRollupName: "buffer_isolation_group_misconfigured_failure"},
ExpiredTasksPerTaskListCounter: {metricName: "tasks_expired_per_tl", metricRollupName: "tasks_expired"},
ForwardedPerTaskListCounter: {metricName: "forwarded_per_tl", metricRollupName: "forwarded"},
ForwardTaskCallsPerTaskList: {metricName: "forward_task_calls_per_tl", metricRollupName: "forward_task_calls"},
ForwardTaskErrorsPerTaskList: {metricName: "forward_task_errors_per_tl", metricRollupName: "forward_task_errors"},
SyncMatchForwardTaskThrottleErrorPerTasklist: {metricName: "sync_forward_task_throttle_errors_per_tl", metricRollupName: "sync_forward_task_throttle_errors"},
AsyncMatchForwardTaskThrottleErrorPerTasklist: {metricName: "async_forward_task_throttle_errors_per_tl", metricRollupName: "async_forward_task_throttle_errors"},
ForwardQueryCallsPerTaskList: {metricName: "forward_query_calls_per_tl", metricRollupName: "forward_query_calls"},
ForwardQueryErrorsPerTaskList: {metricName: "forward_query_errors_per_tl", metricRollupName: "forward_query_errors"},
ForwardPollCallsPerTaskList: {metricName: "forward_poll_calls_per_tl", metricRollupName: "forward_poll_calls"},
ForwardPollErrorsPerTaskList: {metricName: "forward_poll_errors_per_tl", metricRollupName: "forward_poll_errors"},
SyncMatchLatencyPerTaskList: {metricName: "syncmatch_latency_per_tl", metricRollupName: "syncmatch_latency", metricType: Timer},
AsyncMatchLatencyPerTaskList: {metricName: "asyncmatch_latency_per_tl", metricRollupName: "asyncmatch_latency", metricType: Timer},
AsyncMatchDispatchLatencyPerTaskList: {metricName: "asyncmatch_dispatch_latency_per_tl", metricRollupName: "asyncmatch_dispatch_latency", metricType: Timer},
AsyncMatchDispatchTimeoutCounterPerTaskList: {metricName: "asyncmatch_dispatch_timeouts_per_tl", metricRollupName: "asyncmatch_dispatch_timeouts"},
ForwardTaskLatencyPerTaskList: {metricName: "forward_task_latency_per_tl", metricRollupName: "forward_task_latency"},
ForwardQueryLatencyPerTaskList: {metricName: "forward_query_latency_per_tl", metricRollupName: "forward_query_latency"},
ForwardPollLatencyPerTaskList: {metricName: "forward_poll_latency_per_tl", metricRollupName: "forward_poll_latency"},
LocalToLocalMatchPerTaskListCounter: {metricName: "local_to_local_matches_per_tl", metricRollupName: "local_to_local_matches"},
LocalToRemoteMatchPerTaskListCounter: {metricName: "local_to_remote_matches_per_tl", metricRollupName: "local_to_remote_matches"},
RemoteToLocalMatchPerTaskListCounter: {metricName: "remote_to_local_matches_per_tl", metricRollupName: "remote_to_local_matches"},
RemoteToRemoteMatchPerTaskListCounter: {metricName: "remote_to_remote_matches_per_tl", metricRollupName: "remote_to_remote_matches"},
IsolationTaskMatchPerTaskListCounter: {metricName: "isolation_task_matches_per_tl", metricType: Counter},
PollerPerTaskListCounter: {metricName: "poller_count_per_tl", metricRollupName: "poller_count"},
PollerInvalidIsolationGroupCounter: {metricName: "poller_invalid_isolation_group_per_tl", metricType: Counter},
TaskListManagersGauge: {metricName: "tasklist_managers", metricType: Gauge},
TaskLagPerTaskListGauge: {metricName: "task_lag_per_tl", metricType: Gauge},
TaskBacklogPerTaskListGauge: {metricName: "task_backlog_per_tl", metricType: Gauge},
TaskCountPerTaskListGauge: {metricName: "task_count_per_tl", metricType: Gauge},
PollSuccessPerTaskListCounter: {metricName: "poll_success_per_tl", metricRollupName: "poll_success"},
PollTimeoutPerTaskListCounter: {metricName: "poll_timeouts_per_tl", metricRollupName: "poll_timeouts"},
PollSuccessWithSyncPerTaskListCounter: {metricName: "poll_success_sync_per_tl", metricRollupName: "poll_success_sync"},
LeaseRequestPerTaskListCounter: {metricName: "lease_requests_per_tl", metricRollupName: "lease_requests"},
LeaseFailurePerTaskListCounter: {metricName: "lease_failures_per_tl", metricRollupName: "lease_failures"},
ConditionFailedErrorPerTaskListCounter: {metricName: "condition_failed_errors_per_tl", metricRollupName: "condition_failed_errors"},
RespondQueryTaskFailedPerTaskListCounter: {metricName: "respond_query_failed_per_tl", metricRollupName: "respond_query_failed"},
SyncThrottlePerTaskListCounter: {metricName: "sync_throttle_count_per_tl", metricRollupName: "sync_throttle_count"},
BufferThrottlePerTaskListCounter: {metricName: "buffer_throttle_count_per_tl", metricRollupName: "buffer_throttle_count"},
BufferUnknownTaskDispatchError: {metricName: "buffer_unknown_task_dispatch_error_per_tl", metricRollupName: "buffer_unknown_task_dispatch_error"},
BufferIsolationGroupRedirectCounter: {metricName: "buffer_isolation_group_redirected_per_tl", metricRollupName: "buffer_isolation_group_redirected"},
BufferIsolationGroupRedirectFailureCounter: {metricName: "buffer_isolation_group_redirect_failure_per_tl", metricRollupName: "buffer_isolation_group_redirect_failure"},
BufferIsolationGroupMisconfiguredCounter: {metricName: "buffer_isolation_group_misconfigured_failure_per_tl", metricRollupName: "buffer_isolation_group_misconfigured_failure"},
ExpiredTasksPerTaskListCounter: {metricName: "tasks_expired_per_tl", metricRollupName: "tasks_expired"},
ForwardedPerTaskListCounter: {metricName: "forwarded_per_tl", metricRollupName: "forwarded"},
ForwardTaskCallsPerTaskList: {metricName: "forward_task_calls_per_tl", metricRollupName: "forward_task_calls"},
ForwardTaskErrorsPerTaskList: {metricName: "forward_task_errors_per_tl", metricRollupName: "forward_task_errors"},
SyncMatchForwardTaskThrottleErrorPerTasklist: {metricName: "sync_forward_task_throttle_errors_per_tl", metricRollupName: "sync_forward_task_throttle_errors"},
AsyncMatchForwardTaskThrottleErrorPerTasklist: {metricName: "async_forward_task_throttle_errors_per_tl", metricRollupName: "async_forward_task_throttle_errors"},
ForwardQueryCallsPerTaskList: {metricName: "forward_query_calls_per_tl", metricRollupName: "forward_query_calls"},
ForwardQueryErrorsPerTaskList: {metricName: "forward_query_errors_per_tl", metricRollupName: "forward_query_errors"},
ForwardPollCallsPerTaskList: {metricName: "forward_poll_calls_per_tl", metricRollupName: "forward_poll_calls"},
ForwardPollErrorsPerTaskList: {metricName: "forward_poll_errors_per_tl", metricRollupName: "forward_poll_errors"},
SyncMatchLatencyPerTaskList: {metricName: "syncmatch_latency_per_tl", metricRollupName: "syncmatch_latency", metricType: Timer},
AsyncMatchLatencyPerTaskList: {metricName: "asyncmatch_latency_per_tl", metricRollupName: "asyncmatch_latency", metricType: Timer},
AsyncMatchDispatchLatencyPerTaskList: {metricName: "asyncmatch_dispatch_latency_per_tl", metricRollupName: "asyncmatch_dispatch_latency", metricType: Timer},
AsyncMatchDispatchTimeoutCounterPerTaskList: {metricName: "asyncmatch_dispatch_timeouts_per_tl", metricRollupName: "asyncmatch_dispatch_timeouts"},
ForwardTaskLatencyPerTaskList: {metricName: "forward_task_latency_per_tl", metricRollupName: "forward_task_latency"},
ForwardQueryLatencyPerTaskList: {metricName: "forward_query_latency_per_tl", metricRollupName: "forward_query_latency"},
ForwardPollLatencyPerTaskList: {metricName: "forward_poll_latency_per_tl", metricRollupName: "forward_poll_latency"},
LocalToLocalMatchPerTaskListCounter: {metricName: "local_to_local_matches_per_tl", metricRollupName: "local_to_local_matches"},
LocalToRemoteMatchPerTaskListCounter: {metricName: "local_to_remote_matches_per_tl", metricRollupName: "local_to_remote_matches"},
RemoteToLocalMatchPerTaskListCounter: {metricName: "remote_to_local_matches_per_tl", metricRollupName: "remote_to_local_matches"},
RemoteToRemoteMatchPerTaskListCounter: {metricName: "remote_to_remote_matches_per_tl", metricRollupName: "remote_to_remote_matches"},
IsolationTaskMatchPerTaskListCounter: {metricName: "isolation_task_matches_per_tl", metricType: Counter},
PollerPerTaskListCounter: {metricName: "poller_count_per_tl", metricRollupName: "poller_count"},
PollerInvalidIsolationGroupCounter: {metricName: "poller_invalid_isolation_group_per_tl", metricType: Counter},
TaskListManagersGauge: {metricName: "tasklist_managers", metricType: Gauge},
TaskLagPerTaskListGauge: {metricName: "task_lag_per_tl", metricType: Gauge},
TaskBacklogPerTaskListGauge: {metricName: "task_backlog_per_tl", metricType: Gauge},
TaskCountPerTaskListGauge: {metricName: "task_count_per_tl", metricType: Gauge},
AsyncMatchLocalPollCounterPerTaskList: {metricName: "asyncmatch_local_poll_per_tl", metricRollupName: "asyncmatch_local_poll"},
AsyncMatchLocalPollLatencyPerTaskList: {metricName: "asyncmatch_local_poll_latency_per_tl", metricRollupName: "asyncmatch_local_poll_latency"},
AsyncMatchForwardPollCounterPerTaskList: {metricName: "asyncmatch_forward_poll_per_tl", metricRollupName: "asyncmatch_forward_poll"},
AsyncMatchForwardPollLatencyPerTaskList: {metricName: "asyncmatch_forward_poll_latency_per_tl", metricRollupName: "asyncmatch_forward_poll_latency"},
AsyncMatchLocalPollAfterForwardFailedCounterPerTaskList: {metricName: "asyncmatch_local_poll_after_forward_failed_per_tl", metricRollupName: "asyncmatch_local_poll_after_forward_failed"},
AsyncMatchLocalPollAfterForwardFailedLatencyPerTaskList: {metricName: "asyncmatch_local_poll_after_forward_failed_latency_per_tl", metricRollupName: "asyncmatch_local_poll_after_forward_failed_latency"},
AsyncMatchAttemptPerTaskList: {metricName: "asyncmatch_attempt_per_tl", metricRollupName: "asyncmatch_attempt"},
},
Worker: {
ReplicatorMessages: {metricName: "replicator_messages"},
Expand Down
17 changes: 15 additions & 2 deletions service/matching/tasklist/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,28 +221,34 @@ func (tm *TaskMatcher) MustOffer(ctx context.Context, task *InternalTask) error
return fmt.Errorf("rate limit error dispatching: %w", err)
}

startT := time.Now()
// attempt a match with local poller first. When that
// doesn't succeed, try both local match and remote match
taskC := tm.getTaskC(task)
select {
case taskC <- task: // poller picked up the task
tm.scope.IncCounter(metrics.AsyncMatchLocalPollCounterPerTaskList)
tm.scope.RecordTimer(metrics.AsyncMatchLocalPollLatencyPerTaskList, time.Since(startT))
return nil
case <-ctx.Done():
return fmt.Errorf("context done when trying to forward local task: %w", ctx.Err())
default:
}

attempt := 0
forLoop:
for {
select {
case taskC <- task: // poller picked up the task
tm.scope.IncCounter(metrics.AsyncMatchLocalPollCounterPerTaskList)
tm.scope.RecordTimer(metrics.AsyncMatchAttemptPerTaskList, time.Duration(attempt))
tm.scope.RecordTimer(metrics.AsyncMatchLocalPollLatencyPerTaskList, time.Since(startT))
return nil
case token := <-tm.fwdrAddReqTokenC():
childCtx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second*2))
childCtx, cancel := context.WithTimeout(ctx, time.Second*2)
err := tm.fwdr.ForwardTask(childCtx, task)
token.release("")
if err != nil {

if errors.Is(err, ErrForwarderSlowDown) {
tm.scope.IncCounter(metrics.AsyncMatchForwardTaskThrottleErrorPerTasklist)
}
Expand All @@ -257,16 +263,23 @@ forLoop:
select {
case taskC <- task: // poller picked up the task
cancel()
tm.scope.IncCounter(metrics.AsyncMatchLocalPollAfterForwardFailedCounterPerTaskList)
tm.scope.RecordTimer(metrics.AsyncMatchAttemptPerTaskList, time.Duration(attempt))
tm.scope.RecordTimer(metrics.AsyncMatchLocalPollAfterForwardFailedLatencyPerTaskList, time.Since(startT))
return nil
case <-childCtx.Done():
case <-ctx.Done():
cancel()
return fmt.Errorf("failed to dispatch after failing to forward task: %w", ctx.Err())
}
cancel()
attempt++
continue forLoop
}
cancel()
tm.scope.IncCounter(metrics.AsyncMatchForwardPollCounterPerTaskList)
tm.scope.RecordTimer(metrics.AsyncMatchAttemptPerTaskList, time.Duration(attempt))
tm.scope.RecordTimer(metrics.AsyncMatchForwardPollLatencyPerTaskList, time.Since(startT))
// at this point, we forwarded the task to a parent partition which
// in turn dispatched the task to a poller. Make sure we delete the
// task from the database
Expand Down
4 changes: 4 additions & 0 deletions service/matching/tasklist/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/pborman/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/yarpc"
Expand Down Expand Up @@ -494,6 +495,9 @@ func (t *MatcherTestSuite) TestMustOfferRemoteRateLimit() {
t.matcher.scope = &scope
completionFunc := func(*persistence.TaskInfo, error) {}
for i := 0; i < 5; i++ {
scope.On("IncCounter", metrics.AsyncMatchForwardPollCounterPerTaskList)
scope.On("RecordTimer", metrics.AsyncMatchAttemptPerTaskList, mock.Anything)
scope.On("RecordTimer", metrics.AsyncMatchForwardPollLatencyPerTaskList, mock.Anything)
t.client.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any()).Return(nil)
task := newInternalTask(t.newTaskInfo(), completionFunc, types.TaskSourceDbBacklog, "", false, nil, "")
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
Expand Down

0 comments on commit 38c295d

Please sign in to comment.