Skip to content

Commit

Permalink
signal watchdog
Browse files Browse the repository at this point in the history
  • Loading branch information
demirkayaender committed Jan 26, 2022
1 parent 564d06a commit 5b79e88
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 2 deletions.
21 changes: 21 additions & 0 deletions service/history/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/worker/watchdog"
)

const (
Expand Down Expand Up @@ -209,6 +210,25 @@ func (t *taskImpl) Execute() error {
return t.taskExecutor.Execute(t, t.shouldProcessTask)
}

func (t *taskImpl) ReportCorruptWorkflowToWatchDog() error {
domainID := t.GetDomainID()
wid := t.GetWorkflowID()
rid := t.GetRunID()

domainName, err := t.shard.GetDomainCache().GetDomainName(domainID)
if err != nil {
return err
}

watchDogClient := watchdog.NewClient(
t.shard.GetMetricsClient(),
t.shard.GetLogger(),
t.shard.GetService().GetSDKClient(),
)
watchDogClient.ReportCorruptWorkflow(domainName, wid, rid)
return nil
}

func (t *taskImpl) HandleErr(
err error,
) (retErr error) {
Expand All @@ -224,6 +244,7 @@ func (t *taskImpl) HandleErr(
t.scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(t.attempt))
t.logger.Error("Critical error processing task, retrying.",
tag.Error(err), tag.OperationCritical, tag.TaskType(t.GetTaskType()))
t.ReportCorruptWorkflowToWatchDog()
}
}
}()
Expand Down
85 changes: 85 additions & 0 deletions service/worker/watchdog/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) 2022 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package watchdog

import (
"context"
"time"

"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
cclient "go.uber.org/cadence/client"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/types"
)

type (

// Client is used to send request to processor workflow
Client interface {
ReportCorruptWorkflow(domainName string, workflowID string, runID string) error
}

clientImpl struct {
metricsClient metrics.Client
logger log.Logger
cadenceClient cclient.Client
}
)

var _ Client = (*clientImpl)(nil)

const (
signalTimeout = 400 * time.Millisecond
)

// NewClient creates a new Client
func NewClient(
metricsClient metrics.Client,
logger log.Logger,
publicClient workflowserviceclient.Interface,
) Client {
return &clientImpl{
metricsClient: metricsClient,
logger: logger,
cadenceClient: cclient.NewClient(publicClient, common.SystemLocalDomainName, &cclient.Options{}),
}
}

func (c *clientImpl) ReportCorruptWorkflow(
domainName string,
workflowID string,
runID string,
) error {
signalCtx, cancel := context.WithTimeout(context.Background(), signalTimeout)
defer cancel()
request := CorruptWFRequest{
DomainName: domainName,
workflow: types.WorkflowExecution{
WorkflowID: workflowID,
RunID: runID,
},
}
err := c.cadenceClient.SignalWorkflow(signalCtx, watchdogWFID, "", corruptWorkflowWatchdogChannelName, request)
return err
}
4 changes: 2 additions & 2 deletions service/worker/watchdog/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var (
ExpirationInterval: time.Hour,
}

handleCorruptedWorkflowOptions = workflow.ActivityOptions{
handleCorruptWorkflowOptions = workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: 5 * time.Minute,
RetryPolicy: &retryPolicy,
Expand Down Expand Up @@ -123,7 +123,7 @@ func (w *Workflow) workflowFunc(ctx workflow.Context) error {
logger.Warn("Corrupt workflow execution is paused. Enable to continue processing")
continue
}
opt := workflow.WithActivityOptions(ctx, handleCorruptedWorkflowOptions)
opt := workflow.WithActivityOptions(ctx, handleCorruptWorkflowOptions)
_ = workflow.ExecuteActivity(opt, handleCorrputedWorkflowActivity, request).Get(ctx, nil)
}
}
Expand Down

0 comments on commit 5b79e88

Please sign in to comment.