Skip to content

Commit

Permalink
setup a workflow that performs the diagnostics for a given workflow (u…
Browse files Browse the repository at this point in the history
…ber#6225)

* setup a workflow that performs the diagnostics for a given workflow

* update copyright

* add tests

* Update module_test.go

* Update module_test.go
  • Loading branch information
sankari165 authored Aug 14, 2024
1 parent 7dda2c9 commit 952af5a
Show file tree
Hide file tree
Showing 7 changed files with 489 additions and 0 deletions.
52 changes: 52 additions & 0 deletions service/worker/diagnostics/activities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package diagnostics

import (
"context"

"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/worker/diagnostics/invariants"
)

type retrieveExecutionHistoryInputParams struct {
domain string
execution *types.WorkflowExecution
}

func (w *dw) retrieveExecutionHistory(ctx context.Context, info retrieveExecutionHistoryInputParams) (*types.GetWorkflowExecutionHistoryResponse, error) {
frontendClient := w.clientBean.GetFrontendClient()
return frontendClient.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
Domain: info.domain,
Execution: info.execution,
})
}

type identifyTimeoutsInputParams struct {
history *types.GetWorkflowExecutionHistoryResponse
}

func (w *dw) identifyTimeouts(ctx context.Context, info identifyTimeoutsInputParams) ([]invariants.InvariantCheckResult, error) {
timeoutInvariant := invariants.NewTimeout(info.history)
return timeoutInvariant.Check(ctx)
}
96 changes: 96 additions & 0 deletions service/worker/diagnostics/activities_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package diagnostics

import (
"context"
"encoding/json"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

"github.com/uber/cadence/client"
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/worker/diagnostics/invariants"
)

func Test__retrieveExecutionHistory(t *testing.T) {
dwtest := testDiagnosticWorkflow(t)
result, err := dwtest.retrieveExecutionHistory(context.Background(), retrieveExecutionHistoryInputParams{
domain: "test",
execution: &types.WorkflowExecution{
WorkflowID: "123",
RunID: "abc",
},
})
require.NoError(t, err)
require.Equal(t, testWorkflowExecutionHistoryResponse(), result)
}

func Test__identifyTimeouts(t *testing.T) {
dwtest := testDiagnosticWorkflow(t)
workflowTimeoutSecondInBytes, err := json.Marshal(int32(10))
require.NoError(t, err)
expectedResult := []invariants.InvariantCheckResult{
{
InvariantType: invariants.TimeoutTypeExecution.String(),
Reason: "START_TO_CLOSE",
Metadata: workflowTimeoutSecondInBytes,
},
}
result, err := dwtest.identifyTimeouts(context.Background(), identifyTimeoutsInputParams{history: testWorkflowExecutionHistoryResponse()})
require.NoError(t, err)
require.Equal(t, expectedResult, result)
}

func testDiagnosticWorkflow(t *testing.T) *dw {
ctrl := gomock.NewController(t)
mockClientBean := client.NewMockBean(ctrl)
mockFrontendClient := frontend.NewMockClient(ctrl)
mockClientBean.EXPECT().GetFrontendClient().Return(mockFrontendClient).AnyTimes()
mockFrontendClient.EXPECT().GetWorkflowExecutionHistory(gomock.Any(), gomock.Any()).Return(testWorkflowExecutionHistoryResponse(), nil).AnyTimes()
return &dw{
clientBean: mockClientBean,
}
}

func testWorkflowExecutionHistoryResponse() *types.GetWorkflowExecutionHistoryResponse {
return &types.GetWorkflowExecutionHistoryResponse{
History: &types.History{
Events: []*types.HistoryEvent{
{
ID: 1,
WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(10),
},
},
{
WorkflowExecutionTimedOutEventAttributes: &types.WorkflowExecutionTimedOutEventAttributes{TimeoutType: types.TimeoutTypeStartToClose.Ptr()},
},
},
},
}
}
89 changes: 89 additions & 0 deletions service/worker/diagnostics/module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package diagnostics

import (
"context"

"github.com/opentracing/opentracing-go"
"github.com/uber-go/tally"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/worker"
"go.uber.org/cadence/workflow"

"github.com/uber/cadence/client"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/metrics"
)

type DiagnosticsWorkflow interface {
Start() error
Stop()
}

type dw struct {
svcClient workflowserviceclient.Interface
clientBean client.Bean
metricsClient metrics.Client
tallyScope tally.Scope
worker worker.Worker
}

type Params struct {
ServiceClient workflowserviceclient.Interface
ClientBean client.Bean
MetricsClient metrics.Client
TallyScope tally.Scope
}

// New creates a new diagnostics workflow.
func New(params Params) DiagnosticsWorkflow {
return &dw{
svcClient: params.ServiceClient,
metricsClient: params.MetricsClient,
tallyScope: params.TallyScope,
clientBean: params.ClientBean,
}
}

// Start starts the worker
func (w *dw) Start() error {
workerOpts := worker.Options{
MetricsScope: w.tallyScope,
BackgroundActivityContext: context.Background(),
Tracer: opentracing.GlobalTracer(),
MaxConcurrentActivityTaskPollers: 10,
MaxConcurrentDecisionTaskPollers: 10,
}
newWorker := worker.New(w.svcClient, common.SystemLocalDomainName, tasklist, workerOpts)
newWorker.RegisterWorkflowWithOptions(w.DiagnosticsWorkflow, workflow.RegisterOptions{Name: diagnosticsWorkflow})
newWorker.RegisterActivityWithOptions(w.retrieveExecutionHistory, activity.RegisterOptions{Name: retrieveWfExecutionHistoryActivity})
newWorker.RegisterActivityWithOptions(w.identifyTimeouts, activity.RegisterOptions{Name: identifyTimeoutsActivity})
w.worker = newWorker
return newWorker.Start()
}

func (w *dw) Stop() {
w.worker.Stop()
}
60 changes: 60 additions & 0 deletions service/worker/diagnostics/module_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package diagnostics

import (
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
"go.uber.org/cadence/.gen/go/shared"

"github.com/uber/cadence/client"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/resource"
)

func Test__Start(t *testing.T) {
dwTest, mockResource := setuptest(t)
err := dwTest.Start()
require.NoError(t, err)
dwTest.Stop()
mockResource.Finish(t)
}

func setuptest(t *testing.T) (DiagnosticsWorkflow, *resource.Test) {
ctrl := gomock.NewController(t)
mockClientBean := client.NewMockBean(ctrl)
mockResource := resource.NewTest(t, ctrl, metrics.Worker)
sdkClient := mockResource.GetSDKClient()
mockResource.SDKClient.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&shared.DescribeDomainResponse{}, nil).AnyTimes()
mockResource.SDKClient.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&shared.PollForDecisionTaskResponse{}, nil).AnyTimes()
mockResource.SDKClient.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&shared.PollForActivityTaskResponse{}, nil).AnyTimes()
return New(Params{
ServiceClient: sdkClient,
ClientBean: mockClientBean,
MetricsClient: nil,
TallyScope: tally.TestScope(nil),
}), mockResource
}
76 changes: 76 additions & 0 deletions service/worker/diagnostics/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package diagnostics

import (
"fmt"
"time"

"go.uber.org/cadence/workflow"

"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/worker/diagnostics/invariants"
)

const (
diagnosticsWorkflow = "diagnostics-workflow"
tasklist = "wf-diagnostics"

retrieveWfExecutionHistoryActivity = "retrieveWfExecutionHistory"
identifyTimeoutsActivity = "identifyTimeouts"
)

type DiagnosticsWorkflowInput struct {
Domain string
WorkflowID string
RunID string
}

func (w *dw) DiagnosticsWorkflow(ctx workflow.Context, params DiagnosticsWorkflowInput) error {
activityOptions := workflow.ActivityOptions{
ScheduleToCloseTimeout: time.Second * 10,
ScheduleToStartTimeout: time.Second * 5,
StartToCloseTimeout: time.Second * 5,
}
activityCtx := workflow.WithActivityOptions(ctx, activityOptions)

var wfExecutionHistory *types.GetWorkflowExecutionHistoryResponse
err := workflow.ExecuteActivity(activityCtx, w.retrieveExecutionHistory, retrieveExecutionHistoryInputParams{
domain: params.Domain,
execution: &types.WorkflowExecution{
WorkflowID: params.WorkflowID,
RunID: params.RunID,
}}).Get(ctx, &wfExecutionHistory)
if err != nil {
return fmt.Errorf("RetrieveExecutionHistory: %w", err)
}

var checkResult []invariants.InvariantCheckResult
err = workflow.ExecuteActivity(activityCtx, w.identifyTimeouts, identifyTimeoutsInputParams{
history: wfExecutionHistory}).Get(ctx, &checkResult)
if err != nil {
return fmt.Errorf("IdentifyTimeouts: %w", err)
}

return nil
}
Loading

0 comments on commit 952af5a

Please sign in to comment.