diff --git a/service/worker/diagnostics/activities.go b/service/worker/diagnostics/activities.go new file mode 100644 index 00000000000..8c9346391e6 --- /dev/null +++ b/service/worker/diagnostics/activities.go @@ -0,0 +1,52 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package diagnostics + +import ( + "context" + + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/worker/diagnostics/invariants" +) + +type retrieveExecutionHistoryInputParams struct { + domain string + execution *types.WorkflowExecution +} + +func (w *dw) retrieveExecutionHistory(ctx context.Context, info retrieveExecutionHistoryInputParams) (*types.GetWorkflowExecutionHistoryResponse, error) { + frontendClient := w.clientBean.GetFrontendClient() + return frontendClient.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{ + Domain: info.domain, + Execution: info.execution, + }) +} + +type identifyTimeoutsInputParams struct { + history *types.GetWorkflowExecutionHistoryResponse +} + +func (w *dw) identifyTimeouts(ctx context.Context, info identifyTimeoutsInputParams) ([]invariants.InvariantCheckResult, error) { + timeoutInvariant := invariants.NewTimeout(info.history) + return timeoutInvariant.Check(ctx) +} diff --git a/service/worker/diagnostics/activities_test.go b/service/worker/diagnostics/activities_test.go new file mode 100644 index 00000000000..44ca2657c70 --- /dev/null +++ b/service/worker/diagnostics/activities_test.go @@ -0,0 +1,96 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package diagnostics + +import ( + "context" + "encoding/json" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + + "github.com/uber/cadence/client" + "github.com/uber/cadence/client/frontend" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/worker/diagnostics/invariants" +) + +func Test__retrieveExecutionHistory(t *testing.T) { + dwtest := testDiagnosticWorkflow(t) + result, err := dwtest.retrieveExecutionHistory(context.Background(), retrieveExecutionHistoryInputParams{ + domain: "test", + execution: &types.WorkflowExecution{ + WorkflowID: "123", + RunID: "abc", + }, + }) + require.NoError(t, err) + require.Equal(t, testWorkflowExecutionHistoryResponse(), result) +} + +func Test__identifyTimeouts(t *testing.T) { + dwtest := testDiagnosticWorkflow(t) + workflowTimeoutSecondInBytes, err := json.Marshal(int32(10)) + require.NoError(t, err) + expectedResult := []invariants.InvariantCheckResult{ + { + InvariantType: invariants.TimeoutTypeExecution.String(), + Reason: "START_TO_CLOSE", + Metadata: workflowTimeoutSecondInBytes, + }, + } + result, err := dwtest.identifyTimeouts(context.Background(), identifyTimeoutsInputParams{history: testWorkflowExecutionHistoryResponse()}) + require.NoError(t, err) + require.Equal(t, expectedResult, result) +} + +func testDiagnosticWorkflow(t *testing.T) *dw { + ctrl := gomock.NewController(t) + mockClientBean := client.NewMockBean(ctrl) + mockFrontendClient := frontend.NewMockClient(ctrl) + mockClientBean.EXPECT().GetFrontendClient().Return(mockFrontendClient).AnyTimes() + mockFrontendClient.EXPECT().GetWorkflowExecutionHistory(gomock.Any(), gomock.Any()).Return(testWorkflowExecutionHistoryResponse(), nil).AnyTimes() + return &dw{ + clientBean: mockClientBean, + } +} + +func testWorkflowExecutionHistoryResponse() *types.GetWorkflowExecutionHistoryResponse { + return &types.GetWorkflowExecutionHistoryResponse{ + History: &types.History{ + Events: []*types.HistoryEvent{ + { + ID: 1, + WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{ + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(10), + }, + }, + { + WorkflowExecutionTimedOutEventAttributes: &types.WorkflowExecutionTimedOutEventAttributes{TimeoutType: types.TimeoutTypeStartToClose.Ptr()}, + }, + }, + }, + } +} diff --git a/service/worker/diagnostics/module.go b/service/worker/diagnostics/module.go new file mode 100644 index 00000000000..3efa0f88eaa --- /dev/null +++ b/service/worker/diagnostics/module.go @@ -0,0 +1,89 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package diagnostics + +import ( + "context" + + "github.com/opentracing/opentracing-go" + "github.com/uber-go/tally" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + + "github.com/uber/cadence/client" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/metrics" +) + +type DiagnosticsWorkflow interface { + Start() error + Stop() +} + +type dw struct { + svcClient workflowserviceclient.Interface + clientBean client.Bean + metricsClient metrics.Client + tallyScope tally.Scope + worker worker.Worker +} + +type Params struct { + ServiceClient workflowserviceclient.Interface + ClientBean client.Bean + MetricsClient metrics.Client + TallyScope tally.Scope +} + +// New creates a new diagnostics workflow. +func New(params Params) DiagnosticsWorkflow { + return &dw{ + svcClient: params.ServiceClient, + metricsClient: params.MetricsClient, + tallyScope: params.TallyScope, + clientBean: params.ClientBean, + } +} + +// Start starts the worker +func (w *dw) Start() error { + workerOpts := worker.Options{ + MetricsScope: w.tallyScope, + BackgroundActivityContext: context.Background(), + Tracer: opentracing.GlobalTracer(), + MaxConcurrentActivityTaskPollers: 10, + MaxConcurrentDecisionTaskPollers: 10, + } + newWorker := worker.New(w.svcClient, common.SystemLocalDomainName, tasklist, workerOpts) + newWorker.RegisterWorkflowWithOptions(w.DiagnosticsWorkflow, workflow.RegisterOptions{Name: diagnosticsWorkflow}) + newWorker.RegisterActivityWithOptions(w.retrieveExecutionHistory, activity.RegisterOptions{Name: retrieveWfExecutionHistoryActivity}) + newWorker.RegisterActivityWithOptions(w.identifyTimeouts, activity.RegisterOptions{Name: identifyTimeoutsActivity}) + w.worker = newWorker + return newWorker.Start() +} + +func (w *dw) Stop() { + w.worker.Stop() +} diff --git a/service/worker/diagnostics/module_test.go b/service/worker/diagnostics/module_test.go new file mode 100644 index 00000000000..d8a8934f8c8 --- /dev/null +++ b/service/worker/diagnostics/module_test.go @@ -0,0 +1,60 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package diagnostics + +import ( + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "go.uber.org/cadence/.gen/go/shared" + + "github.com/uber/cadence/client" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/resource" +) + +func Test__Start(t *testing.T) { + dwTest, mockResource := setuptest(t) + err := dwTest.Start() + require.NoError(t, err) + dwTest.Stop() + mockResource.Finish(t) +} + +func setuptest(t *testing.T) (DiagnosticsWorkflow, *resource.Test) { + ctrl := gomock.NewController(t) + mockClientBean := client.NewMockBean(ctrl) + mockResource := resource.NewTest(t, ctrl, metrics.Worker) + sdkClient := mockResource.GetSDKClient() + mockResource.SDKClient.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&shared.DescribeDomainResponse{}, nil).AnyTimes() + mockResource.SDKClient.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&shared.PollForDecisionTaskResponse{}, nil).AnyTimes() + mockResource.SDKClient.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&shared.PollForActivityTaskResponse{}, nil).AnyTimes() + return New(Params{ + ServiceClient: sdkClient, + ClientBean: mockClientBean, + MetricsClient: nil, + TallyScope: tally.TestScope(nil), + }), mockResource +} diff --git a/service/worker/diagnostics/workflow.go b/service/worker/diagnostics/workflow.go new file mode 100644 index 00000000000..4f0a5bb3c1d --- /dev/null +++ b/service/worker/diagnostics/workflow.go @@ -0,0 +1,76 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package diagnostics + +import ( + "fmt" + "time" + + "go.uber.org/cadence/workflow" + + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/worker/diagnostics/invariants" +) + +const ( + diagnosticsWorkflow = "diagnostics-workflow" + tasklist = "wf-diagnostics" + + retrieveWfExecutionHistoryActivity = "retrieveWfExecutionHistory" + identifyTimeoutsActivity = "identifyTimeouts" +) + +type DiagnosticsWorkflowInput struct { + Domain string + WorkflowID string + RunID string +} + +func (w *dw) DiagnosticsWorkflow(ctx workflow.Context, params DiagnosticsWorkflowInput) error { + activityOptions := workflow.ActivityOptions{ + ScheduleToCloseTimeout: time.Second * 10, + ScheduleToStartTimeout: time.Second * 5, + StartToCloseTimeout: time.Second * 5, + } + activityCtx := workflow.WithActivityOptions(ctx, activityOptions) + + var wfExecutionHistory *types.GetWorkflowExecutionHistoryResponse + err := workflow.ExecuteActivity(activityCtx, w.retrieveExecutionHistory, retrieveExecutionHistoryInputParams{ + domain: params.Domain, + execution: &types.WorkflowExecution{ + WorkflowID: params.WorkflowID, + RunID: params.RunID, + }}).Get(ctx, &wfExecutionHistory) + if err != nil { + return fmt.Errorf("RetrieveExecutionHistory: %w", err) + } + + var checkResult []invariants.InvariantCheckResult + err = workflow.ExecuteActivity(activityCtx, w.identifyTimeouts, identifyTimeoutsInputParams{ + history: wfExecutionHistory}).Get(ctx, &checkResult) + if err != nil { + return fmt.Errorf("IdentifyTimeouts: %w", err) + } + + return nil +} diff --git a/service/worker/diagnostics/workflow_test.go b/service/worker/diagnostics/workflow_test.go new file mode 100644 index 00000000000..f5e258baf56 --- /dev/null +++ b/service/worker/diagnostics/workflow_test.go @@ -0,0 +1,101 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package diagnostics + +import ( + "errors" + "fmt" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/testsuite" + "go.uber.org/cadence/workflow" + + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/resource" +) + +type diagnosticsWorkflowTestSuite struct { + suite.Suite + testsuite.WorkflowTestSuite + workflowEnv *testsuite.TestWorkflowEnvironment + dw *dw +} + +func TestDiagnosticsWorkflowTestSuite(t *testing.T) { + suite.Run(t, new(diagnosticsWorkflowTestSuite)) +} + +func (s *diagnosticsWorkflowTestSuite) SetupTest() { + s.workflowEnv = s.NewTestWorkflowEnvironment() + controller := gomock.NewController(s.T()) + mockResource := resource.NewTest(s.T(), controller, metrics.Worker) + + s.dw = &dw{ + svcClient: mockResource.GetSDKClient(), + clientBean: mockResource.ClientBean, + } + + s.T().Cleanup(func() { + mockResource.Finish(s.T()) + }) + + s.workflowEnv.RegisterWorkflowWithOptions(s.dw.DiagnosticsWorkflow, workflow.RegisterOptions{Name: diagnosticsWorkflow}) + s.workflowEnv.RegisterActivityWithOptions(s.dw.retrieveExecutionHistory, activity.RegisterOptions{Name: retrieveWfExecutionHistoryActivity}) + s.workflowEnv.RegisterActivityWithOptions(s.dw.identifyTimeouts, activity.RegisterOptions{Name: identifyTimeoutsActivity}) +} + +func (s *diagnosticsWorkflowTestSuite) TearDownTest() { + s.workflowEnv.AssertExpectations(s.T()) +} + +func (s *diagnosticsWorkflowTestSuite) TestWorkflow() { + params := &DiagnosticsWorkflowInput{ + Domain: "test", + WorkflowID: "123", + RunID: "abc", + } + s.workflowEnv.OnActivity(retrieveWfExecutionHistoryActivity, mock.Anything, mock.Anything).Return(nil, nil) + s.workflowEnv.OnActivity(identifyTimeoutsActivity, mock.Anything, mock.Anything).Return(nil, nil) + s.workflowEnv.ExecuteWorkflow(diagnosticsWorkflow, params) + s.True(s.workflowEnv.IsWorkflowCompleted()) +} + +func (s *diagnosticsWorkflowTestSuite) TestWorkflow_Error() { + params := &DiagnosticsWorkflowInput{ + Domain: "test", + WorkflowID: "123", + RunID: "abc", + } + mockErr := errors.New("mockErr") + errExpected := fmt.Errorf("IdentifyTimeouts: %w", mockErr) + s.workflowEnv.OnActivity(retrieveWfExecutionHistoryActivity, mock.Anything, mock.Anything).Return(nil, nil) + s.workflowEnv.OnActivity(identifyTimeoutsActivity, mock.Anything, mock.Anything).Return(nil, mockErr) + s.workflowEnv.ExecuteWorkflow(diagnosticsWorkflow, params) + s.True(s.workflowEnv.IsWorkflowCompleted()) + s.Error(s.workflowEnv.GetWorkflowError()) + s.EqualError(s.workflowEnv.GetWorkflowError(), errExpected.Error()) +} diff --git a/service/worker/service.go b/service/worker/service.go index 5464a115693..f9fb113329c 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -39,6 +39,7 @@ import ( "github.com/uber/cadence/service/worker/archiver" "github.com/uber/cadence/service/worker/asyncworkflow" "github.com/uber/cadence/service/worker/batcher" + "github.com/uber/cadence/service/worker/diagnostics" "github.com/uber/cadence/service/worker/esanalyzer" "github.com/uber/cadence/service/worker/failovermanager" "github.com/uber/cadence/service/worker/indexer" @@ -220,6 +221,7 @@ func (s *Service) Start() { } s.startReplicator() + s.startDiagnostics() if s.GetArchivalMetadata().GetHistoryConfig().ClusterConfiguredForArchival() { s.startArchiver() @@ -332,6 +334,19 @@ func (s *Service) startFixerWorkflowWorker() { } } +func (s *Service) startDiagnostics() { + params := diagnostics.Params{ + ServiceClient: s.params.PublicClient, + MetricsClient: s.GetMetricsClient(), + TallyScope: s.params.MetricScope, + ClientBean: s.GetClientBean(), + } + if err := diagnostics.New(params).Start(); err != nil { + s.Stop() + s.GetLogger().Fatal("error starting diagnostics", tag.Error(err)) + } +} + func (s *Service) startReplicator() { domainReplicationTaskExecutor := domain.NewReplicationTaskExecutor( s.Resource.GetDomainManager(),