Skip to content

Commit

Permalink
Fix admin remove task command for timer queue (#3157)
Browse files Browse the repository at this point in the history
* Remove DeleteTask method from persistence interface
* Remove related metrics
* Use CompleteTask method of executionManger to delete task
* Add visibility timestamp tag when initializing logger for task
* Add visibility timestamp flag to admin remove task command
  • Loading branch information
yycptt authored Apr 1, 2020
1 parent 9b02fec commit 0976e69
Show file tree
Hide file tree
Showing 20 changed files with 128 additions and 155 deletions.
58 changes: 51 additions & 7 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,17 @@ const (

// DomainDataKeyForManagedFailover is key of DomainData for managed failover
const DomainDataKeyForManagedFailover = "IsManagedByCadence"

type (
// TaskType is the enum for representing different task types
TaskType int
)

const (
// TaskTypeTransfer is the task type for transfer task
TaskTypeTransfer TaskType = iota + 2 // starting from 2 here to be consistent with the row type define for cassandra
// TaskTypeTimer is the task type for timer task
TaskTypeTimer
// TaskTypeReplication is the task type for replication task
TaskTypeReplication
)
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,11 @@ func TaskType(taskType int) Tag {
return newInt("queue-task-type", taskType)
}

// TaskVisibilityTimestamp returns tag for task visibilityTimestamp
func TaskVisibilityTimestamp(timestamp int64) Tag {
return newInt64("queue-task-visibility-timestamp", timestamp)
}

// NumberProcessed returns tag for NumberProcessed
func NumberProcessed(n int) Tag {
return newInt("number-processed", n)
Expand Down
3 changes: 0 additions & 3 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,6 @@ const (
PersistenceDeleteWorkflowExecutionScope
// PersistenceDeleteCurrentWorkflowExecutionScope tracks DeleteCurrentWorkflowExecution calls made by service to persistence layer
PersistenceDeleteCurrentWorkflowExecutionScope
// PersistenceDeleteTaskScope tracks RemoveTask calls made by service to persistence layer
PersistenceDeleteTaskScope
// PersistenceGetCurrentExecutionScope tracks GetCurrentExecution calls made by service to persistence layer
PersistenceGetCurrentExecutionScope
// PersistenceGetTransferTasksScope tracks GetTransferTasks calls made by service to persistence layer
Expand Down Expand Up @@ -1053,7 +1051,6 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceResetWorkflowExecutionScope: {operation: "ResetWorkflowExecution"},
PersistenceDeleteWorkflowExecutionScope: {operation: "DeleteWorkflowExecution"},
PersistenceDeleteCurrentWorkflowExecutionScope: {operation: "DeleteCurrentWorkflowExecution"},
PersistenceDeleteTaskScope: {operation: "PersistenceDelete"},
PersistenceGetCurrentExecutionScope: {operation: "GetCurrentExecution"},
PersistenceGetTransferTasksScope: {operation: "GetTransferTasks"},
PersistenceCompleteTransferTaskScope: {operation: "CompleteTransferTask"},
Expand Down
20 changes: 4 additions & 16 deletions common/mocks/ExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

package mocks

import "github.com/uber/cadence/common/persistence"
import "github.com/stretchr/testify/mock"
import (
"github.com/stretchr/testify/mock"
"github.com/uber/cadence/common/persistence"
)

// ExecutionManager mock implementation
type ExecutionManager struct {
Expand Down Expand Up @@ -153,20 +155,6 @@ func (_m *ExecutionManager) ResetWorkflowExecution(request *persistence.ResetWor
return r0
}

// DeleteTask provides a mock function with given fields: request
func (_m *ExecutionManager) DeleteTask(request *persistence.DeleteTaskRequest) error {
ret := _m.Called(request)

var r0 error
if rf, ok := ret.Get(0).(func(executionRequest *persistence.DeleteTaskRequest) error); ok {
r0 = rf(request)
} else {
r0 = ret.Error(0)
}

return r0
}

// DeleteWorkflowExecution provides a mock function with given fields: request
func (_m *ExecutionManager) DeleteWorkflowExecution(request *persistence.DeleteWorkflowExecutionRequest) error {
ret := _m.Called(request)
Expand Down
50 changes: 1 addition & 49 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ import (
"strings"
"time"

"github.com/uber/cadence/common/cassandra"

"github.com/gocql/gocql"

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cassandra"
"github.com/uber/cadence/common/log"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service/config"
Expand Down Expand Up @@ -1942,53 +1941,6 @@ func (d *cassandraPersistence) assertNotCurrentExecution(
return nil
}

func (d *cassandraPersistence) DeleteTask(request *p.DeleteTaskRequest) error {
var domainID, workflowID, runID string
switch request.Type {
case rowTypeTransferTask:
domainID = rowTypeTransferDomainID
workflowID = rowTypeTransferWorkflowID
runID = rowTypeTransferRunID

case rowTypeTimerTask:
domainID = rowTypeTimerDomainID
workflowID = rowTypeTimerWorkflowID
runID = rowTypeTimerRunID

case rowTypeReplicationTask:
domainID = rowTypeReplicationDomainID
workflowID = rowTypeReplicationWorkflowID
runID = rowTypeReplicationRunID

default:
return fmt.Errorf("DeleteTask type id is not one of 2 (transfer task), 3 (timer task), 4 (replication task) ")

}

query := d.session.Query(templateDeleteWorkflowExecutionMutableStateQuery,
request.ShardID,
request.Type,
domainID,
workflowID,
runID,
defaultVisibilityTimestamp,
request.TaskID)

err := query.Exec()
if err != nil {
if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("DeleteTask operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("DeleteTask operation failed. Error: %v", err),
}
}

return nil
}

func (d *cassandraPersistence) DeleteWorkflowExecution(request *p.DeleteWorkflowExecutionRequest) error {
query := d.session.Query(templateDeleteWorkflowExecutionMutableStateQuery,
d.shardID,
Expand Down
14 changes: 1 addition & 13 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ import (
"strings"
"time"

"github.com/uber/cadence/common/checksum"

"github.com/pborman/uuid"

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/checksum"
"github.com/uber/cadence/common/codec"
)

Expand Down Expand Up @@ -926,14 +925,6 @@ type (
RunID string
}

// DeleteTaskRequest is used to detele a task that corrupted and need to be removed
// e.g. corrupted history event batch, eventID is not continouous
DeleteTaskRequest struct {
TaskID int64
Type int
ShardID int
}

// DeleteCurrentWorkflowExecutionRequest is used to delete the current workflow execution
DeleteCurrentWorkflowExecutionRequest struct {
DomainID string
Expand Down Expand Up @@ -1502,9 +1493,6 @@ type (
GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error)
CompleteTimerTask(request *CompleteTimerTaskRequest) error
RangeCompleteTimerTask(request *RangeCompleteTimerTaskRequest) error

// Remove Task due to corrupted data
DeleteTask(request *DeleteTaskRequest) error
}

// ExecutionManagerFactory creates an instance of ExecutionManager for a given shard
Expand Down
6 changes: 0 additions & 6 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,12 +760,6 @@ func (m *executionManagerImpl) DeserializeVersionHistories(
return NewVersionHistoriesFromThrift(versionHistories), nil
}

func (m *executionManagerImpl) DeleteTask(
request *DeleteTaskRequest,
) error {
return m.persistence.DeleteTask(request)
}

func (m *executionManagerImpl) DeleteWorkflowExecution(
request *DeleteWorkflowExecutionRequest,
) error {
Expand Down
3 changes: 0 additions & 3 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ type (
GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error)
CompleteTimerTask(request *CompleteTimerTaskRequest) error
RangeCompleteTimerTask(request *RangeCompleteTimerTaskRequest) error

// Remove corrupted task
DeleteTask(request *DeleteTaskRequest) error
}

// HistoryStore is to manager workflow history events
Expand Down
13 changes: 0 additions & 13 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,19 +520,6 @@ func (p *workflowExecutionPersistenceClient) RangeCompleteTimerTask(request *Ran
return err
}

func (p *workflowExecutionPersistenceClient) DeleteTask(request *DeleteTaskRequest) error {
p.metricClient.IncCounter(metrics.PersistenceDeleteTaskScope, metrics.PersistenceRequests)
sw := p.metricClient.StartTimer(metrics.PersistenceDeleteTaskScope, metrics.PersistenceLatency)
err := p.persistence.DeleteTask(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceRangeCompleteTimerTaskScope, err)
}

return err
}

func (p *workflowExecutionPersistenceClient) updateErrorMetric(scope int, err error) {
switch err.(type) {
case *WorkflowExecutionAlreadyStartedError:
Expand Down
9 changes: 0 additions & 9 deletions common/persistence/persistenceRateLimitedClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,15 +384,6 @@ func (p *workflowExecutionRateLimitedPersistenceClient) RangeCompleteTimerTask(r
return err
}

func (p *workflowExecutionRateLimitedPersistenceClient) DeleteTask(request *DeleteTaskRequest) error {
if ok := p.rateLimiter.Allow(); !ok {
return ErrPersistenceLimitExceeded
}

err := p.persistence.DeleteTask(request)
return err
}

func (p *workflowExecutionRateLimitedPersistenceClient) Close() {
p.persistence.Close()
}
Expand Down
6 changes: 0 additions & 6 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,12 +770,6 @@ func (m *sqlExecutionManager) conflictResolveWorkflowExecutionTx(
return nil
}

func (m *sqlExecutionManager) DeleteTask(request *p.DeleteTaskRequest) error {
//TODO: This needs implement when we use sql for tasks.
// https://github.com/uber/cadence/issues/2479
return nil
}

func (m *sqlExecutionManager) DeleteWorkflowExecution(
request *p.DeleteWorkflowExecutionRequest,
) error {
Expand Down
25 changes: 19 additions & 6 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/pborman/uuid"
"go.uber.org/yarpc/yarpcerrors"
Expand Down Expand Up @@ -77,6 +78,7 @@ var (
errSourceClusterNotSet = &gen.BadRequestError{Message: "Source Cluster not set on request."}
errShardIDNotSet = &gen.BadRequestError{Message: "Shard ID not set on request."}
errTimestampNotSet = &gen.BadRequestError{Message: "Timestamp not set on request."}
errInvalidTaskType = &gen.BadRequestError{Message: "Invalid task type"}
errHistoryHostThrottle = &gen.ServiceBusyError{Message: "History host rps exceeded"}
errShuttingDown = &gen.InternalServiceError{Message: "Shutting down"}
)
Expand Down Expand Up @@ -724,13 +726,24 @@ func (h *Handler) RemoveTask(
if err != nil {
return err
}
deleteTaskRequest := &persistence.DeleteTaskRequest{
TaskID: request.GetTaskID(),
Type: int(request.GetType()),
ShardID: int(request.GetShardID()),

switch taskType := common.TaskType(request.GetType()); taskType {
case common.TaskTypeTransfer:
return executionMgr.CompleteTransferTask(&persistence.CompleteTransferTaskRequest{
TaskID: request.GetTaskID(),
})
case common.TaskTypeTimer:
return executionMgr.CompleteTimerTask(&persistence.CompleteTimerTaskRequest{
VisibilityTimestamp: time.Unix(0, request.GetVisibilityTimestamp()),
TaskID: request.GetTaskID(),
})
case common.TaskTypeReplication:
return executionMgr.CompleteReplicationTask(&persistence.CompleteReplicationTaskRequest{
TaskID: request.GetTaskID(),
})
default:
return errInvalidTaskType
}
err = executionMgr.DeleteTask(deleteTaskRequest)
return err
}

// CloseShard closes a shard hosted by this instance
Expand Down
1 change: 1 addition & 0 deletions service/history/nDCTaskUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func initializeLoggerForTask(
taskLogger := logger.WithTags(
tag.ShardID(shardID),
tag.TaskID(task.GetTaskID()),
tag.TaskVisibilityTimestamp(task.GetVisibilityTimestamp().UnixNano()),
tag.FailoverVersion(task.GetVersion()),
tag.TaskType(task.GetTaskType()),
tag.WorkflowDomainID(task.GetDomainID()),
Expand Down
4 changes: 2 additions & 2 deletions service/history/timerQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func newTimerQueueActiveProcessor(
historyService.metricsClient.Scope(
getTimerTaskMetricScope(taskInfo.GetTaskType(), true),
),
logger,
initializeLoggerForTask(shard.GetShardID(), taskInfo, logger),
timerTaskFilter,
processor.taskExecutor,
redispatchQueue,
Expand Down Expand Up @@ -223,7 +223,7 @@ func newTimerQueueFailoverProcessor(
historyService.metricsClient.Scope(
getTimerTaskMetricScope(taskInfo.GetTaskType(), true),
),
logger,
initializeLoggerForTask(shard.GetShardID(), taskInfo, logger),
timerTaskFilter,
processor.taskExecutor,
redispatchQueue,
Expand Down
2 changes: 1 addition & 1 deletion service/history/timerQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func newTimerQueueStandbyProcessor(
historyService.metricsClient.Scope(
getTimerTaskMetricScope(taskInfo.GetTaskType(), false),
),
logger,
initializeLoggerForTask(shard.GetShardID(), taskInfo, logger),
timerTaskFilter,
processor.taskExecutor,
redispatchQueue,
Expand Down
2 changes: 1 addition & 1 deletion service/history/transferQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func newTransferQueueStandbyProcessor(
historyService.metricsClient.Scope(
getTransferTaskMetricsScope(taskInfo.GetTaskType(), false),
),
logger,
initializeLoggerForTask(shard.GetShardID(), taskInfo, logger),
transferTaskFilter,
processor.taskExecutor,
redispatchQueue,
Expand Down
16 changes: 10 additions & 6 deletions tools/cli/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,19 +221,23 @@ func newAdminShardManagementCommands() []cli.Command {
{
Name: "removeTask",
Aliases: []string{"rmtk"},
Usage: "remove a task based on shardID, typeID and taskID",
Usage: "remove a task based on shardID, task type, taskID, and task visibility timestamp",
Flags: []cli.Flag{
cli.IntFlag{
Name: FlagShardID,
Usage: "ShardID for the cadence cluster to manage",
Usage: "shardID",
},
cli.Int64Flag{
Name: FlagRemoveTaskID,
Usage: "task id which user want to specify",
Name: FlagTaskID,
Usage: "taskID",
},
cli.IntFlag{
Name: FlagRemoveTypeID,
Usage: "type id which user want to specify: 2 (transfer task), 3 (timer task), 4 (replication task)",
Name: FlagTaskType,
Usage: "task type : 2 (transfer task), 3 (timer task) or 4 (replication task)",
},
cli.Int64Flag{
Name: FlagTaskVisibilityTimestamp,
Usage: "task visibility timestamp in nano (required for removing timer task)",
},
},
Action: func(c *cli.Context) {
Expand Down
Loading

0 comments on commit 0976e69

Please sign in to comment.