Skip to content

Commit

Permalink
Add long running workflow metrics (uber#4643)
Browse files Browse the repository at this point in the history
  • Loading branch information
demirkayaender authored Nov 24, 2021
1 parent 770e9ec commit 80700d8
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 81 deletions.
2 changes: 2 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ const (
DefaultESAnalyzerLimitToTypes = ""
// DefaultESAnalyzerLimitToDomains controls if we want to limit ESAnalyzer only to some domains
DefaultESAnalyzerLimitToDomains = ""
// DefaultESAnalyzerWorkflowDurationWarnThreshold defines warning threshold for a workflow duration
DefaultESAnalyzerWorkflowDurationWarnThresholds = ""
)

// StickyTaskConditionFailedErrorMsg error msg for sticky task ConditionFailedError
Expand Down
24 changes: 15 additions & 9 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2081,6 +2081,11 @@ const (
// Value type: Int
// Default value: "" => means no limitation
ESAnalyzerLimitToDomains
// ESAnalyzerWorkflowDurationWarnThresholds defines the warning execution thresholds for workflow types
// KeyName: worker.ESAnalyzerWorkflowDurationWarnThresholds
// Value type: string (json of a dictionary {"<domainName>/<workflowType>":<value>,...})
// Default value: ""
ESAnalyzerWorkflowDurationWarnThresholds

// LastKeyForTest must be the last one in this const group for testing purpose
LastKeyForTest
Expand Down Expand Up @@ -2454,15 +2459,16 @@ var Keys = map[Key]string{
WorkerDeterministicConstructionCheckProbability: "worker.DeterministicConstructionCheckProbability",
WorkerBlobIntegrityCheckProbability: "worker.BlobIntegrityCheckProbability",

ESAnalyzerPause: "worker.ESAnalyzerPause",
ESAnalyzerTimeWindow: "worker.ESAnalyzerTimeWindow",
ESAnalyzerMaxNumDomains: "worker.ESAnalyzerMaxNumDomains",
ESAnalyzerMaxNumWorkflowTypes: "worker.ESAnalyzerMaxNumWorkflowTypes",
ESAnalyzerNumWorkflowsToRefresh: "worker.ESAnalyzerNumWorkflowsToRefresh",
ESAnalyzerBufferWaitTime: "worker.ESAnalyzerBufferWaitTime",
ESAnalyzerMinNumWorkflowsForAvg: "worker.ESAnalyzerMinNumWorkflowsForAvg",
ESAnalyzerLimitToTypes: "worker.ESAnalyzerLimitToTypes",
ESAnalyzerLimitToDomains: "worker.ESAnalyzerLimitToDomains",
ESAnalyzerPause: "worker.ESAnalyzerPause",
ESAnalyzerTimeWindow: "worker.ESAnalyzerTimeWindow",
ESAnalyzerMaxNumDomains: "worker.ESAnalyzerMaxNumDomains",
ESAnalyzerMaxNumWorkflowTypes: "worker.ESAnalyzerMaxNumWorkflowTypes",
ESAnalyzerNumWorkflowsToRefresh: "worker.ESAnalyzerNumWorkflowsToRefresh",
ESAnalyzerBufferWaitTime: "worker.ESAnalyzerBufferWaitTime",
ESAnalyzerMinNumWorkflowsForAvg: "worker.ESAnalyzerMinNumWorkflowsForAvg",
ESAnalyzerLimitToTypes: "worker.ESAnalyzerLimitToTypes",
ESAnalyzerLimitToDomains: "worker.ESAnalyzerLimitToDomains",
ESAnalyzerWorkflowDurationWarnThresholds: "worker.ESAnalyzerWorkflowDurationWarnThresholds",
}

var KeyNames map[string]Key
Expand Down
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2240,6 +2240,7 @@ const (
ESAnalyzerNumStuckWorkflowsDiscovered
ESAnalyzerNumStuckWorkflowsRefreshed
ESAnalyzerNumStuckWorkflowsFailedToRefresh
ESAnalyzerNumLongRunningWorkflows

NumWorkerMetrics
)
Expand Down Expand Up @@ -2766,6 +2767,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ESAnalyzerNumStuckWorkflowsDiscovered: {metricName: "es_analyzer_num_stuck_workflows_discovered", metricType: Counter},
ESAnalyzerNumStuckWorkflowsRefreshed: {metricName: "es_analyzer_num_stuck_workflows_refreshed", metricType: Counter},
ESAnalyzerNumStuckWorkflowsFailedToRefresh: {metricName: "es_analyzer_num_stuck_workflows_failed_to_refresh", metricType: Counter},
ESAnalyzerNumLongRunningWorkflows: {metricName: "es_analyzer_num_long_running_workflows", metricType: Counter},
},
}

Expand Down
27 changes: 16 additions & 11 deletions service/worker/esanalyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type (
clientBean client.Bean
esClient es.GenericClient
logger log.Logger
metricsClient metrics.Client
scopedMetricClient metrics.Scope
tallyScope tally.Scope
visibilityIndexName string
resource resource.Resource
Expand All @@ -63,15 +63,16 @@ type (

// Config contains all configs for ElasticSearch Analyzer
Config struct {
ESAnalyzerPause dynamicconfig.BoolPropertyFn
ESAnalyzerTimeWindow dynamicconfig.DurationPropertyFn
ESAnalyzerMaxNumDomains dynamicconfig.IntPropertyFn
ESAnalyzerMaxNumWorkflowTypes dynamicconfig.IntPropertyFn
ESAnalyzerLimitToTypes dynamicconfig.StringPropertyFn
ESAnalyzerLimitToDomains dynamicconfig.StringPropertyFn
ESAnalyzerNumWorkflowsToRefresh dynamicconfig.IntPropertyFnWithWorkflowTypeFilter
ESAnalyzerBufferWaitTime dynamicconfig.DurationPropertyFnWithWorkflowTypeFilter
ESAnalyzerMinNumWorkflowsForAvg dynamicconfig.IntPropertyFnWithWorkflowTypeFilter
ESAnalyzerPause dynamicconfig.BoolPropertyFn
ESAnalyzerTimeWindow dynamicconfig.DurationPropertyFn
ESAnalyzerMaxNumDomains dynamicconfig.IntPropertyFn
ESAnalyzerMaxNumWorkflowTypes dynamicconfig.IntPropertyFn
ESAnalyzerLimitToTypes dynamicconfig.StringPropertyFn
ESAnalyzerLimitToDomains dynamicconfig.StringPropertyFn
ESAnalyzerNumWorkflowsToRefresh dynamicconfig.IntPropertyFnWithWorkflowTypeFilter
ESAnalyzerBufferWaitTime dynamicconfig.DurationPropertyFnWithWorkflowTypeFilter
ESAnalyzerMinNumWorkflowsForAvg dynamicconfig.IntPropertyFnWithWorkflowTypeFilter
ESAnalyzerWorkflowDurationWarnThresholds dynamicconfig.StringPropertyFn
}
)

Expand All @@ -97,7 +98,7 @@ func New(
clientBean: clientBean,
esClient: esClient,
logger: logger,
metricsClient: metricsClient,
scopedMetricClient: getScopedMetricsClient(metricsClient),
tallyScope: tallyScope,
visibilityIndexName: esConfig.Indices[common.VisibilityAppName],
resource: resource,
Expand All @@ -106,6 +107,10 @@ func New(
}
}

func getScopedMetricsClient(metricsClient metrics.Client) metrics.Scope {
return metricsClient.Scope(metrics.ESAnalyzerScope)
}

// Start starts the scanner
func (a *Analyzer) Start() error {
ctx := context.Background()
Expand Down
116 changes: 77 additions & 39 deletions service/worker/esanalyzer/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,25 @@ import (
type esanalyzerWorkflowTestSuite struct {
suite.Suite
testsuite.WorkflowTestSuite
activityEnv *testsuite.TestActivityEnvironment
workflowEnv *testsuite.TestWorkflowEnvironment
controller *gomock.Controller
resource *resource.Test
mockAdminClient *admin.MockClient
mockDomainCache *cache.MockDomainCache
clientBean *client.MockBean
logger *log.MockLogger
mockMetricClient *mocks.Client
mockESClient *esMocks.GenericClient
analyzer *Analyzer
workflow *Workflow
config Config
DomainID string
DomainName string
WorkflowType string
WorkflowID string
RunID string
activityEnv *testsuite.TestActivityEnvironment
workflowEnv *testsuite.TestWorkflowEnvironment
controller *gomock.Controller
resource *resource.Test
mockAdminClient *admin.MockClient
mockDomainCache *cache.MockDomainCache
clientBean *client.MockBean
logger *log.MockLogger
mockMetricClient *mocks.Client
scopedMetricClient *mocks.Scope
mockESClient *esMocks.GenericClient
analyzer *Analyzer
workflow *Workflow
config Config
DomainID string
DomainName string
WorkflowType string
WorkflowID string
RunID string
}

func TestESAnalyzerWorkflowTestSuite(t *testing.T) {
Expand Down Expand Up @@ -101,15 +102,16 @@ func (s *esanalyzerWorkflowTestSuite) SetupTest() {
)

s.config = Config{
ESAnalyzerPause: dynamicconfig.GetBoolPropertyFn(false),
ESAnalyzerTimeWindow: dynamicconfig.GetDurationPropertyFn(time.Hour * 24 * 30),
ESAnalyzerMaxNumDomains: dynamicconfig.GetIntPropertyFn(500),
ESAnalyzerMaxNumWorkflowTypes: dynamicconfig.GetIntPropertyFn(100),
ESAnalyzerLimitToTypes: dynamicconfig.GetStringPropertyFn(""),
ESAnalyzerLimitToDomains: dynamicconfig.GetStringPropertyFn(""),
ESAnalyzerNumWorkflowsToRefresh: dynamicconfig.GetIntPropertyFilteredByWorkflowType(2),
ESAnalyzerBufferWaitTime: dynamicconfig.GetDurationPropertyFilteredByWorkflowType(time.Minute * 30),
ESAnalyzerMinNumWorkflowsForAvg: dynamicconfig.GetIntPropertyFilteredByWorkflowType(100),
ESAnalyzerPause: dynamicconfig.GetBoolPropertyFn(false),
ESAnalyzerTimeWindow: dynamicconfig.GetDurationPropertyFn(time.Hour * 24 * 30),
ESAnalyzerMaxNumDomains: dynamicconfig.GetIntPropertyFn(500),
ESAnalyzerMaxNumWorkflowTypes: dynamicconfig.GetIntPropertyFn(100),
ESAnalyzerLimitToTypes: dynamicconfig.GetStringPropertyFn(""),
ESAnalyzerLimitToDomains: dynamicconfig.GetStringPropertyFn(""),
ESAnalyzerNumWorkflowsToRefresh: dynamicconfig.GetIntPropertyFilteredByWorkflowType(2),
ESAnalyzerBufferWaitTime: dynamicconfig.GetDurationPropertyFilteredByWorkflowType(time.Minute * 30),
ESAnalyzerMinNumWorkflowsForAvg: dynamicconfig.GetIntPropertyFilteredByWorkflowType(100),
ESAnalyzerWorkflowDurationWarnThresholds: dynamicconfig.GetStringPropertyFn(""),
}

s.activityEnv = s.NewTestActivityEnvironment()
Expand All @@ -121,21 +123,25 @@ func (s *esanalyzerWorkflowTestSuite) SetupTest() {
s.clientBean = client.NewMockBean(s.controller)
s.logger = &log.MockLogger{}
s.mockMetricClient = &mocks.Client{}
s.scopedMetricClient = &mocks.Scope{}
s.mockESClient = &esMocks.GenericClient{}

s.mockMetricClient.On("Scope", metrics.ESAnalyzerScope, mock.Anything).Return(s.scopedMetricClient).Once()
s.scopedMetricClient.On("Tagged", mock.Anything, mock.Anything).Return(s.scopedMetricClient).Once()

s.mockDomainCache.EXPECT().GetDomainByID(s.DomainID).Return(activeDomainCache, nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomain(s.DomainName).Return(activeDomainCache, nil).AnyTimes()
s.clientBean.EXPECT().GetRemoteAdminClient(cluster.TestCurrentClusterName).Return(s.mockAdminClient).AnyTimes()

// SET UP ANALYZER
s.analyzer = &Analyzer{
svcClient: s.resource.GetSDKClient(),
clientBean: s.clientBean,
domainCache: s.mockDomainCache,
logger: s.logger,
metricsClient: s.mockMetricClient,
esClient: s.mockESClient,
config: &s.config,
svcClient: s.resource.GetSDKClient(),
clientBean: s.clientBean,
domainCache: s.mockDomainCache,
logger: s.logger,
scopedMetricClient: getScopedMetricsClient(s.mockMetricClient),
esClient: s.mockESClient,
config: &s.config,
}
s.activityEnv.SetTestTimeout(time.Second * 5)
s.activityEnv.SetWorkerOptions(worker.Options{BackgroundActivityContext: context.Background()})
Expand All @@ -156,6 +162,10 @@ func (s *esanalyzerWorkflowTestSuite) SetupTest() {
s.workflow.refreshStuckWorkflowsFromSameWorkflowType,
activity.RegisterOptions{Name: refreshStuckWorkflowsActivity},
)
s.workflowEnv.RegisterActivityWithOptions(
s.workflow.findLongRunningWorkflows,
activity.RegisterOptions{Name: findLongRunningWorkflowsActivity},
)

s.activityEnv.RegisterActivityWithOptions(
s.workflow.getWorkflowTypes,
Expand All @@ -167,6 +177,10 @@ func (s *esanalyzerWorkflowTestSuite) SetupTest() {
s.workflow.refreshStuckWorkflowsFromSameWorkflowType,
activity.RegisterOptions{Name: refreshStuckWorkflowsActivity},
)
s.activityEnv.RegisterActivityWithOptions(
s.workflow.findLongRunningWorkflows,
activity.RegisterOptions{Name: findLongRunningWorkflowsActivity},
)
}

func (s *esanalyzerWorkflowTestSuite) TearDownTest() {
Expand Down Expand Up @@ -196,6 +210,8 @@ func (s *esanalyzerWorkflowTestSuite) TestExecuteWorkflow() {
}
s.workflowEnv.OnActivity(findStuckWorkflowsActivity, mock.Anything, workflowTypeInfos[0]).
Return(workflows, nil).Times(1)
s.workflowEnv.OnActivity(findLongRunningWorkflowsActivity, mock.Anything).
Return(nil).Times(1)

s.workflowEnv.OnActivity(refreshStuckWorkflowsActivity, mock.Anything, workflows).Return(nil).Times(1)

Expand Down Expand Up @@ -238,6 +254,8 @@ func (s *esanalyzerWorkflowTestSuite) TestExecuteWorkflowMultipleWorkflowTypes()
Return(workflows1, nil).Times(1)
s.workflowEnv.OnActivity(findStuckWorkflowsActivity, mock.Anything, workflowTypeInfos[1]).
Return(workflows2, nil).Times(1)
s.workflowEnv.OnActivity(findLongRunningWorkflowsActivity, mock.Anything).
Return(nil).Times(1)

s.workflowEnv.OnActivity(refreshStuckWorkflowsActivity, mock.Anything, workflows1).Return(nil).Times(1)
s.workflowEnv.OnActivity(refreshStuckWorkflowsActivity, mock.Anything, workflows2).Return(nil).Times(1)
Expand All @@ -264,7 +282,7 @@ func (s *esanalyzerWorkflowTestSuite) TestRefreshStuckWorkflowsFromSameWorkflowT
},
}).Return(nil).Times(1)
s.logger.On("Info", "Refreshed stuck workflow", mock.Anything).Return().Once()
s.mockMetricClient.On("IncCounter", metrics.ESAnalyzerScope, metrics.ESAnalyzerNumStuckWorkflowsRefreshed).Return().Once()
s.scopedMetricClient.On("IncCounter", metrics.ESAnalyzerNumStuckWorkflowsRefreshed).Return().Once()

_, err := s.activityEnv.ExecuteActivity(s.workflow.refreshStuckWorkflowsFromSameWorkflowType, workflows)
s.NoError(err)
Expand Down Expand Up @@ -298,7 +316,7 @@ func (s *esanalyzerWorkflowTestSuite) TestRefreshStuckWorkflowsFromSameWorkflowT
expectedRunIDs[request.Execution.RunID] = true
}).Times(2)
s.logger.On("Info", "Refreshed stuck workflow", mock.Anything).Return().Times(2)
s.mockMetricClient.On("IncCounter", metrics.ESAnalyzerScope, metrics.ESAnalyzerNumStuckWorkflowsRefreshed).Return().Times(2)
s.scopedMetricClient.On("IncCounter", metrics.ESAnalyzerNumStuckWorkflowsRefreshed).Return().Times(2)

_, err := s.activityEnv.ExecuteActivity(s.workflow.refreshStuckWorkflowsFromSameWorkflowType, workflows)
s.NoError(err)
Expand Down Expand Up @@ -341,13 +359,34 @@ func (s *esanalyzerWorkflowTestSuite) TestRefreshStuckWorkflowsFromSameWorkflowI
expectedRunIDs[request.Execution.RunID] = true
}).Times(1)
s.logger.On("Info", "Refreshed stuck workflow", mock.Anything).Return().Times(2)
s.mockMetricClient.On("IncCounter", metrics.ESAnalyzerScope, metrics.ESAnalyzerNumStuckWorkflowsRefreshed).Return().Times(2)
s.scopedMetricClient.On("IncCounter", metrics.ESAnalyzerNumStuckWorkflowsRefreshed).Return().Times(2)

_, err := s.activityEnv.ExecuteActivity(s.workflow.refreshStuckWorkflowsFromSameWorkflowType, workflows)
s.Error(err)
s.EqualError(err, "InternalServiceError{Message: Inconsistent worklow. Expected domainID: deadbeef-0123-4567-890a-bcdef0123460, actual: another-domain-id}")
}

func (s *esanalyzerWorkflowTestSuite) TestFindLongRunningWorkflows() {
s.mockESClient.On("SearchRaw", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&elasticsearch.RawResponse{
TookInMillis: 12,
Hits: elasticsearch.SearchHits{
TotalHits: 1234,
},
},
nil).Times(1)

s.scopedMetricClient.On(
"AddCounter",
metrics.ESAnalyzerNumLongRunningWorkflows,
int64(1234),
).Return().Times(1)

s.config.ESAnalyzerWorkflowDurationWarnThresholds = dynamicconfig.GetStringPropertyFn(`{"test-domain/workflow1":"1m"}`)
_, err := s.activityEnv.ExecuteActivity(s.workflow.findLongRunningWorkflows)
s.NoError(err)
}

func (s *esanalyzerWorkflowTestSuite) TestFindStuckWorkflows() {
info := WorkflowTypeInfo{
DomainID: s.DomainID,
Expand Down Expand Up @@ -376,9 +415,8 @@ func (s *esanalyzerWorkflowTestSuite) TestFindStuckWorkflows() {
},
},
nil).Times(1)
s.mockMetricClient.On(
s.scopedMetricClient.On(
"AddCounter",
metrics.ESAnalyzerScope,
metrics.ESAnalyzerNumStuckWorkflowsDiscovered,
int64(2),
).Return().Times(1)
Expand Down
Loading

0 comments on commit 80700d8

Please sign in to comment.