diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 8d443672d3b..32ef2fd126a 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -2543,6 +2543,13 @@ const ( TaskLagPerTaskListGauge TaskBacklogPerTaskListGauge TaskCountPerTaskListGauge + AsyncMatchLocalPollCounterPerTaskList + AsyncMatchLocalPollLatencyPerTaskList + AsyncMatchForwardPollCounterPerTaskList + AsyncMatchForwardPollLatencyPerTaskList + AsyncMatchLocalPollAfterForwardFailedCounterPerTaskList + AsyncMatchLocalPollAfterForwardFailedLatencyPerTaskList + AsyncMatchAttemptPerTaskList NumMatchingMetrics ) @@ -3156,47 +3163,54 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ WorkflowIDCacheRequestsInternalRatelimitedCounter: {metricName: "workflow_id_internal_requests_ratelimited", metricType: Counter}, }, Matching: { - PollSuccessPerTaskListCounter: {metricName: "poll_success_per_tl", metricRollupName: "poll_success"}, - PollTimeoutPerTaskListCounter: {metricName: "poll_timeouts_per_tl", metricRollupName: "poll_timeouts"}, - PollSuccessWithSyncPerTaskListCounter: {metricName: "poll_success_sync_per_tl", metricRollupName: "poll_success_sync"}, - LeaseRequestPerTaskListCounter: {metricName: "lease_requests_per_tl", metricRollupName: "lease_requests"}, - LeaseFailurePerTaskListCounter: {metricName: "lease_failures_per_tl", metricRollupName: "lease_failures"}, - ConditionFailedErrorPerTaskListCounter: {metricName: "condition_failed_errors_per_tl", metricRollupName: "condition_failed_errors"}, - RespondQueryTaskFailedPerTaskListCounter: {metricName: "respond_query_failed_per_tl", metricRollupName: "respond_query_failed"}, - SyncThrottlePerTaskListCounter: {metricName: "sync_throttle_count_per_tl", metricRollupName: "sync_throttle_count"}, - BufferThrottlePerTaskListCounter: {metricName: "buffer_throttle_count_per_tl", metricRollupName: "buffer_throttle_count"}, - BufferUnknownTaskDispatchError: {metricName: "buffer_unknown_task_dispatch_error_per_tl", metricRollupName: "buffer_unknown_task_dispatch_error"}, - BufferIsolationGroupRedirectCounter: {metricName: "buffer_isolation_group_redirected_per_tl", metricRollupName: "buffer_isolation_group_redirected"}, - BufferIsolationGroupRedirectFailureCounter: {metricName: "buffer_isolation_group_redirect_failure_per_tl", metricRollupName: "buffer_isolation_group_redirect_failure"}, - BufferIsolationGroupMisconfiguredCounter: {metricName: "buffer_isolation_group_misconfigured_failure_per_tl", metricRollupName: "buffer_isolation_group_misconfigured_failure"}, - ExpiredTasksPerTaskListCounter: {metricName: "tasks_expired_per_tl", metricRollupName: "tasks_expired"}, - ForwardedPerTaskListCounter: {metricName: "forwarded_per_tl", metricRollupName: "forwarded"}, - ForwardTaskCallsPerTaskList: {metricName: "forward_task_calls_per_tl", metricRollupName: "forward_task_calls"}, - ForwardTaskErrorsPerTaskList: {metricName: "forward_task_errors_per_tl", metricRollupName: "forward_task_errors"}, - SyncMatchForwardTaskThrottleErrorPerTasklist: {metricName: "sync_forward_task_throttle_errors_per_tl", metricRollupName: "sync_forward_task_throttle_errors"}, - AsyncMatchForwardTaskThrottleErrorPerTasklist: {metricName: "async_forward_task_throttle_errors_per_tl", metricRollupName: "async_forward_task_throttle_errors"}, - ForwardQueryCallsPerTaskList: {metricName: "forward_query_calls_per_tl", metricRollupName: "forward_query_calls"}, - ForwardQueryErrorsPerTaskList: {metricName: "forward_query_errors_per_tl", metricRollupName: "forward_query_errors"}, - ForwardPollCallsPerTaskList: {metricName: "forward_poll_calls_per_tl", metricRollupName: "forward_poll_calls"}, - ForwardPollErrorsPerTaskList: {metricName: "forward_poll_errors_per_tl", metricRollupName: "forward_poll_errors"}, - SyncMatchLatencyPerTaskList: {metricName: "syncmatch_latency_per_tl", metricRollupName: "syncmatch_latency", metricType: Timer}, - AsyncMatchLatencyPerTaskList: {metricName: "asyncmatch_latency_per_tl", metricRollupName: "asyncmatch_latency", metricType: Timer}, - AsyncMatchDispatchLatencyPerTaskList: {metricName: "asyncmatch_dispatch_latency_per_tl", metricRollupName: "asyncmatch_dispatch_latency", metricType: Timer}, - AsyncMatchDispatchTimeoutCounterPerTaskList: {metricName: "asyncmatch_dispatch_timeouts_per_tl", metricRollupName: "asyncmatch_dispatch_timeouts"}, - ForwardTaskLatencyPerTaskList: {metricName: "forward_task_latency_per_tl", metricRollupName: "forward_task_latency"}, - ForwardQueryLatencyPerTaskList: {metricName: "forward_query_latency_per_tl", metricRollupName: "forward_query_latency"}, - ForwardPollLatencyPerTaskList: {metricName: "forward_poll_latency_per_tl", metricRollupName: "forward_poll_latency"}, - LocalToLocalMatchPerTaskListCounter: {metricName: "local_to_local_matches_per_tl", metricRollupName: "local_to_local_matches"}, - LocalToRemoteMatchPerTaskListCounter: {metricName: "local_to_remote_matches_per_tl", metricRollupName: "local_to_remote_matches"}, - RemoteToLocalMatchPerTaskListCounter: {metricName: "remote_to_local_matches_per_tl", metricRollupName: "remote_to_local_matches"}, - RemoteToRemoteMatchPerTaskListCounter: {metricName: "remote_to_remote_matches_per_tl", metricRollupName: "remote_to_remote_matches"}, - IsolationTaskMatchPerTaskListCounter: {metricName: "isolation_task_matches_per_tl", metricType: Counter}, - PollerPerTaskListCounter: {metricName: "poller_count_per_tl", metricRollupName: "poller_count"}, - PollerInvalidIsolationGroupCounter: {metricName: "poller_invalid_isolation_group_per_tl", metricType: Counter}, - TaskListManagersGauge: {metricName: "tasklist_managers", metricType: Gauge}, - TaskLagPerTaskListGauge: {metricName: "task_lag_per_tl", metricType: Gauge}, - TaskBacklogPerTaskListGauge: {metricName: "task_backlog_per_tl", metricType: Gauge}, - TaskCountPerTaskListGauge: {metricName: "task_count_per_tl", metricType: Gauge}, + PollSuccessPerTaskListCounter: {metricName: "poll_success_per_tl", metricRollupName: "poll_success"}, + PollTimeoutPerTaskListCounter: {metricName: "poll_timeouts_per_tl", metricRollupName: "poll_timeouts"}, + PollSuccessWithSyncPerTaskListCounter: {metricName: "poll_success_sync_per_tl", metricRollupName: "poll_success_sync"}, + LeaseRequestPerTaskListCounter: {metricName: "lease_requests_per_tl", metricRollupName: "lease_requests"}, + LeaseFailurePerTaskListCounter: {metricName: "lease_failures_per_tl", metricRollupName: "lease_failures"}, + ConditionFailedErrorPerTaskListCounter: {metricName: "condition_failed_errors_per_tl", metricRollupName: "condition_failed_errors"}, + RespondQueryTaskFailedPerTaskListCounter: {metricName: "respond_query_failed_per_tl", metricRollupName: "respond_query_failed"}, + SyncThrottlePerTaskListCounter: {metricName: "sync_throttle_count_per_tl", metricRollupName: "sync_throttle_count"}, + BufferThrottlePerTaskListCounter: {metricName: "buffer_throttle_count_per_tl", metricRollupName: "buffer_throttle_count"}, + BufferUnknownTaskDispatchError: {metricName: "buffer_unknown_task_dispatch_error_per_tl", metricRollupName: "buffer_unknown_task_dispatch_error"}, + BufferIsolationGroupRedirectCounter: {metricName: "buffer_isolation_group_redirected_per_tl", metricRollupName: "buffer_isolation_group_redirected"}, + BufferIsolationGroupRedirectFailureCounter: {metricName: "buffer_isolation_group_redirect_failure_per_tl", metricRollupName: "buffer_isolation_group_redirect_failure"}, + BufferIsolationGroupMisconfiguredCounter: {metricName: "buffer_isolation_group_misconfigured_failure_per_tl", metricRollupName: "buffer_isolation_group_misconfigured_failure"}, + ExpiredTasksPerTaskListCounter: {metricName: "tasks_expired_per_tl", metricRollupName: "tasks_expired"}, + ForwardedPerTaskListCounter: {metricName: "forwarded_per_tl", metricRollupName: "forwarded"}, + ForwardTaskCallsPerTaskList: {metricName: "forward_task_calls_per_tl", metricRollupName: "forward_task_calls"}, + ForwardTaskErrorsPerTaskList: {metricName: "forward_task_errors_per_tl", metricRollupName: "forward_task_errors"}, + SyncMatchForwardTaskThrottleErrorPerTasklist: {metricName: "sync_forward_task_throttle_errors_per_tl", metricRollupName: "sync_forward_task_throttle_errors"}, + AsyncMatchForwardTaskThrottleErrorPerTasklist: {metricName: "async_forward_task_throttle_errors_per_tl", metricRollupName: "async_forward_task_throttle_errors"}, + ForwardQueryCallsPerTaskList: {metricName: "forward_query_calls_per_tl", metricRollupName: "forward_query_calls"}, + ForwardQueryErrorsPerTaskList: {metricName: "forward_query_errors_per_tl", metricRollupName: "forward_query_errors"}, + ForwardPollCallsPerTaskList: {metricName: "forward_poll_calls_per_tl", metricRollupName: "forward_poll_calls"}, + ForwardPollErrorsPerTaskList: {metricName: "forward_poll_errors_per_tl", metricRollupName: "forward_poll_errors"}, + SyncMatchLatencyPerTaskList: {metricName: "syncmatch_latency_per_tl", metricRollupName: "syncmatch_latency", metricType: Timer}, + AsyncMatchLatencyPerTaskList: {metricName: "asyncmatch_latency_per_tl", metricRollupName: "asyncmatch_latency", metricType: Timer}, + AsyncMatchDispatchLatencyPerTaskList: {metricName: "asyncmatch_dispatch_latency_per_tl", metricRollupName: "asyncmatch_dispatch_latency", metricType: Timer}, + AsyncMatchDispatchTimeoutCounterPerTaskList: {metricName: "asyncmatch_dispatch_timeouts_per_tl", metricRollupName: "asyncmatch_dispatch_timeouts"}, + ForwardTaskLatencyPerTaskList: {metricName: "forward_task_latency_per_tl", metricRollupName: "forward_task_latency"}, + ForwardQueryLatencyPerTaskList: {metricName: "forward_query_latency_per_tl", metricRollupName: "forward_query_latency"}, + ForwardPollLatencyPerTaskList: {metricName: "forward_poll_latency_per_tl", metricRollupName: "forward_poll_latency"}, + LocalToLocalMatchPerTaskListCounter: {metricName: "local_to_local_matches_per_tl", metricRollupName: "local_to_local_matches"}, + LocalToRemoteMatchPerTaskListCounter: {metricName: "local_to_remote_matches_per_tl", metricRollupName: "local_to_remote_matches"}, + RemoteToLocalMatchPerTaskListCounter: {metricName: "remote_to_local_matches_per_tl", metricRollupName: "remote_to_local_matches"}, + RemoteToRemoteMatchPerTaskListCounter: {metricName: "remote_to_remote_matches_per_tl", metricRollupName: "remote_to_remote_matches"}, + IsolationTaskMatchPerTaskListCounter: {metricName: "isolation_task_matches_per_tl", metricType: Counter}, + PollerPerTaskListCounter: {metricName: "poller_count_per_tl", metricRollupName: "poller_count"}, + PollerInvalidIsolationGroupCounter: {metricName: "poller_invalid_isolation_group_per_tl", metricType: Counter}, + TaskListManagersGauge: {metricName: "tasklist_managers", metricType: Gauge}, + TaskLagPerTaskListGauge: {metricName: "task_lag_per_tl", metricType: Gauge}, + TaskBacklogPerTaskListGauge: {metricName: "task_backlog_per_tl", metricType: Gauge}, + TaskCountPerTaskListGauge: {metricName: "task_count_per_tl", metricType: Gauge}, + AsyncMatchLocalPollCounterPerTaskList: {metricName: "asyncmatch_local_poll_per_tl", metricRollupName: "asyncmatch_local_poll"}, + AsyncMatchLocalPollLatencyPerTaskList: {metricName: "asyncmatch_local_poll_latency_per_tl", metricRollupName: "asyncmatch_local_poll_latency"}, + AsyncMatchForwardPollCounterPerTaskList: {metricName: "asyncmatch_forward_poll_per_tl", metricRollupName: "asyncmatch_forward_poll"}, + AsyncMatchForwardPollLatencyPerTaskList: {metricName: "asyncmatch_forward_poll_latency_per_tl", metricRollupName: "asyncmatch_forward_poll_latency"}, + AsyncMatchLocalPollAfterForwardFailedCounterPerTaskList: {metricName: "asyncmatch_local_poll_after_forward_failed_per_tl", metricRollupName: "asyncmatch_local_poll_after_forward_failed"}, + AsyncMatchLocalPollAfterForwardFailedLatencyPerTaskList: {metricName: "asyncmatch_local_poll_after_forward_failed_latency_per_tl", metricRollupName: "asyncmatch_local_poll_after_forward_failed_latency"}, + AsyncMatchAttemptPerTaskList: {metricName: "asyncmatch_attempt_per_tl", metricRollupName: "asyncmatch_attempt"}, }, Worker: { ReplicatorMessages: {metricName: "replicator_messages"}, diff --git a/service/matching/tasklist/matcher.go b/service/matching/tasklist/matcher.go index 0d0b78afa7c..243a0b36d84 100644 --- a/service/matching/tasklist/matcher.go +++ b/service/matching/tasklist/matcher.go @@ -221,28 +221,34 @@ func (tm *TaskMatcher) MustOffer(ctx context.Context, task *InternalTask) error return fmt.Errorf("rate limit error dispatching: %w", err) } + startT := time.Now() // attempt a match with local poller first. When that // doesn't succeed, try both local match and remote match taskC := tm.getTaskC(task) select { case taskC <- task: // poller picked up the task + tm.scope.IncCounter(metrics.AsyncMatchLocalPollCounterPerTaskList) + tm.scope.RecordTimer(metrics.AsyncMatchLocalPollLatencyPerTaskList, time.Since(startT)) return nil case <-ctx.Done(): return fmt.Errorf("context done when trying to forward local task: %w", ctx.Err()) default: } + attempt := 0 forLoop: for { select { case taskC <- task: // poller picked up the task + tm.scope.IncCounter(metrics.AsyncMatchLocalPollCounterPerTaskList) + tm.scope.RecordTimer(metrics.AsyncMatchAttemptPerTaskList, time.Duration(attempt)) + tm.scope.RecordTimer(metrics.AsyncMatchLocalPollLatencyPerTaskList, time.Since(startT)) return nil case token := <-tm.fwdrAddReqTokenC(): - childCtx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second*2)) + childCtx, cancel := context.WithTimeout(ctx, time.Second*2) err := tm.fwdr.ForwardTask(childCtx, task) token.release("") if err != nil { - if errors.Is(err, ErrForwarderSlowDown) { tm.scope.IncCounter(metrics.AsyncMatchForwardTaskThrottleErrorPerTasklist) } @@ -257,6 +263,9 @@ forLoop: select { case taskC <- task: // poller picked up the task cancel() + tm.scope.IncCounter(metrics.AsyncMatchLocalPollAfterForwardFailedCounterPerTaskList) + tm.scope.RecordTimer(metrics.AsyncMatchAttemptPerTaskList, time.Duration(attempt)) + tm.scope.RecordTimer(metrics.AsyncMatchLocalPollAfterForwardFailedLatencyPerTaskList, time.Since(startT)) return nil case <-childCtx.Done(): case <-ctx.Done(): @@ -264,9 +273,13 @@ forLoop: return fmt.Errorf("failed to dispatch after failing to forward task: %w", ctx.Err()) } cancel() + attempt++ continue forLoop } cancel() + tm.scope.IncCounter(metrics.AsyncMatchForwardPollCounterPerTaskList) + tm.scope.RecordTimer(metrics.AsyncMatchAttemptPerTaskList, time.Duration(attempt)) + tm.scope.RecordTimer(metrics.AsyncMatchForwardPollLatencyPerTaskList, time.Since(startT)) // at this point, we forwarded the task to a parent partition which // in turn dispatched the task to a poller. Make sure we delete the // task from the database diff --git a/service/matching/tasklist/matcher_test.go b/service/matching/tasklist/matcher_test.go index b2d6cab79fc..cd8f6a788f8 100644 --- a/service/matching/tasklist/matcher_test.go +++ b/service/matching/tasklist/matcher_test.go @@ -31,6 +31,7 @@ import ( "github.com/golang/mock/gomock" "github.com/pborman/uuid" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "go.uber.org/yarpc" @@ -494,6 +495,9 @@ func (t *MatcherTestSuite) TestMustOfferRemoteRateLimit() { t.matcher.scope = &scope completionFunc := func(*persistence.TaskInfo, error) {} for i := 0; i < 5; i++ { + scope.On("IncCounter", metrics.AsyncMatchForwardPollCounterPerTaskList) + scope.On("RecordTimer", metrics.AsyncMatchAttemptPerTaskList, mock.Anything) + scope.On("RecordTimer", metrics.AsyncMatchForwardPollLatencyPerTaskList, mock.Anything) t.client.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any()).Return(nil) task := newInternalTask(t.newTaskInfo(), completionFunc, types.TaskSourceDbBacklog, "", false, nil, "") ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)