Skip to content

Commit

Permalink
Fix admin remove task command for timer queue (#3157)
Browse files Browse the repository at this point in the history
* Remove DeleteTask method from persistence interface
* Remove related metrics
* Use CompleteTask method of executionManger to delete task
* Add visibility timestamp tag when initializing logger for task
* Add visibility timestamp flag to admin remove task command
  • Loading branch information
yycptt authored and anish531213 committed Apr 10, 2020
1 parent 3560992 commit 19c32ad
Show file tree
Hide file tree
Showing 20 changed files with 265 additions and 159 deletions.
69 changes: 57 additions & 12 deletions .gen/go/shared/shared.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,20 @@ const (
// AdvancedVisibilityWritingModeDual means write to both normal visibility and advanced visibility store
AdvancedVisibilityWritingModeDual = "dual"
)

// DomainDataKeyForManagedFailover is key of DomainData for managed failover
const DomainDataKeyForManagedFailover = "IsManagedByCadence"

type (
// TaskType is the enum for representing different task types
TaskType int
)

const (
// TaskTypeTransfer is the task type for transfer task
TaskTypeTransfer TaskType = iota + 2 // starting from 2 here to be consistent with the row type define for cassandra
// TaskTypeTimer is the task type for timer task
TaskTypeTimer
// TaskTypeReplication is the task type for replication task
TaskTypeReplication
)
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,11 @@ func TaskType(taskType int) Tag {
return newInt("queue-task-type", taskType)
}

// TaskVisibilityTimestamp returns tag for task visibilityTimestamp
func TaskVisibilityTimestamp(timestamp int64) Tag {
return newInt64("queue-task-visibility-timestamp", timestamp)
}

// NumberProcessed returns tag for NumberProcessed
func NumberProcessed(n int) Tag {
return newInt("number-processed", n)
Expand Down
3 changes: 0 additions & 3 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,6 @@ const (
PersistenceDeleteWorkflowExecutionScope
// PersistenceDeleteCurrentWorkflowExecutionScope tracks DeleteCurrentWorkflowExecution calls made by service to persistence layer
PersistenceDeleteCurrentWorkflowExecutionScope
// PersistenceDeleteTaskScope tracks RemoveTask calls made by service to persistence layer
PersistenceDeleteTaskScope
// PersistenceGetCurrentExecutionScope tracks GetCurrentExecution calls made by service to persistence layer
PersistenceGetCurrentExecutionScope
// PersistenceGetTransferTasksScope tracks GetTransferTasks calls made by service to persistence layer
Expand Down Expand Up @@ -1053,7 +1051,6 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceResetWorkflowExecutionScope: {operation: "ResetWorkflowExecution"},
PersistenceDeleteWorkflowExecutionScope: {operation: "DeleteWorkflowExecution"},
PersistenceDeleteCurrentWorkflowExecutionScope: {operation: "DeleteCurrentWorkflowExecution"},
PersistenceDeleteTaskScope: {operation: "PersistenceDelete"},
PersistenceGetCurrentExecutionScope: {operation: "GetCurrentExecution"},
PersistenceGetTransferTasksScope: {operation: "GetTransferTasks"},
PersistenceCompleteTransferTaskScope: {operation: "CompleteTransferTask"},
Expand Down
20 changes: 4 additions & 16 deletions common/mocks/ExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

package mocks

import "github.com/uber/cadence/common/persistence"
import "github.com/stretchr/testify/mock"
import (
"github.com/stretchr/testify/mock"
"github.com/uber/cadence/common/persistence"
)

// ExecutionManager mock implementation
type ExecutionManager struct {
Expand Down Expand Up @@ -153,20 +155,6 @@ func (_m *ExecutionManager) ResetWorkflowExecution(request *persistence.ResetWor
return r0
}

// DeleteTask provides a mock function with given fields: request
func (_m *ExecutionManager) DeleteTask(request *persistence.DeleteTaskRequest) error {
ret := _m.Called(request)

var r0 error
if rf, ok := ret.Get(0).(func(executionRequest *persistence.DeleteTaskRequest) error); ok {
r0 = rf(request)
} else {
r0 = ret.Error(0)
}

return r0
}

// DeleteWorkflowExecution provides a mock function with given fields: request
func (_m *ExecutionManager) DeleteWorkflowExecution(request *persistence.DeleteWorkflowExecutionRequest) error {
ret := _m.Called(request)
Expand Down
50 changes: 1 addition & 49 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ import (
"strings"
"time"

"github.com/uber/cadence/common/cassandra"

"github.com/gocql/gocql"

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cassandra"
"github.com/uber/cadence/common/log"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service/config"
Expand Down Expand Up @@ -1942,53 +1941,6 @@ func (d *cassandraPersistence) assertNotCurrentExecution(
return nil
}

func (d *cassandraPersistence) DeleteTask(request *p.DeleteTaskRequest) error {
var domainID, workflowID, runID string
switch request.Type {
case rowTypeTransferTask:
domainID = rowTypeTransferDomainID
workflowID = rowTypeTransferWorkflowID
runID = rowTypeTransferRunID

case rowTypeTimerTask:
domainID = rowTypeTimerDomainID
workflowID = rowTypeTimerWorkflowID
runID = rowTypeTimerRunID

case rowTypeReplicationTask:
domainID = rowTypeReplicationDomainID
workflowID = rowTypeReplicationWorkflowID
runID = rowTypeReplicationRunID

default:
return fmt.Errorf("DeleteTask type id is not one of 2 (transfer task), 3 (timer task), 4 (replication task) ")

}

query := d.session.Query(templateDeleteWorkflowExecutionMutableStateQuery,
request.ShardID,
request.Type,
domainID,
workflowID,
runID,
defaultVisibilityTimestamp,
request.TaskID)

err := query.Exec()
if err != nil {
if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("DeleteTask operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("DeleteTask operation failed. Error: %v", err),
}
}

return nil
}

func (d *cassandraPersistence) DeleteWorkflowExecution(request *p.DeleteWorkflowExecutionRequest) error {
query := d.session.Query(templateDeleteWorkflowExecutionMutableStateQuery,
d.shardID,
Expand Down
14 changes: 1 addition & 13 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ import (
"strings"
"time"

"github.com/uber/cadence/common/checksum"

"github.com/pborman/uuid"

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/checksum"
"github.com/uber/cadence/common/codec"
)

Expand Down Expand Up @@ -926,14 +925,6 @@ type (
RunID string
}

// DeleteTaskRequest is used to detele a task that corrupted and need to be removed
// e.g. corrupted history event batch, eventID is not continouous
DeleteTaskRequest struct {
TaskID int64
Type int
ShardID int
}

// DeleteCurrentWorkflowExecutionRequest is used to delete the current workflow execution
DeleteCurrentWorkflowExecutionRequest struct {
DomainID string
Expand Down Expand Up @@ -1500,9 +1491,6 @@ type (
GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error)
CompleteTimerTask(request *CompleteTimerTaskRequest) error
RangeCompleteTimerTask(request *RangeCompleteTimerTaskRequest) error

// Remove Task due to corrupted data
DeleteTask(request *DeleteTaskRequest) error
}

// ExecutionManagerFactory creates an instance of ExecutionManager for a given shard
Expand Down
6 changes: 0 additions & 6 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,12 +760,6 @@ func (m *executionManagerImpl) DeserializeVersionHistories(
return NewVersionHistoriesFromThrift(versionHistories), nil
}

func (m *executionManagerImpl) DeleteTask(
request *DeleteTaskRequest,
) error {
return m.persistence.DeleteTask(request)
}

func (m *executionManagerImpl) DeleteWorkflowExecution(
request *DeleteWorkflowExecutionRequest,
) error {
Expand Down
3 changes: 0 additions & 3 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ type (
GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error)
CompleteTimerTask(request *CompleteTimerTaskRequest) error
RangeCompleteTimerTask(request *RangeCompleteTimerTaskRequest) error

// Remove corrupted task
DeleteTask(request *DeleteTaskRequest) error
}

// HistoryStore is to manager workflow history events
Expand Down
13 changes: 0 additions & 13 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,19 +520,6 @@ func (p *workflowExecutionPersistenceClient) RangeCompleteTimerTask(request *Ran
return err
}

func (p *workflowExecutionPersistenceClient) DeleteTask(request *DeleteTaskRequest) error {
p.metricClient.IncCounter(metrics.PersistenceDeleteTaskScope, metrics.PersistenceRequests)
sw := p.metricClient.StartTimer(metrics.PersistenceDeleteTaskScope, metrics.PersistenceLatency)
err := p.persistence.DeleteTask(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceRangeCompleteTimerTaskScope, err)
}

return err
}

func (p *workflowExecutionPersistenceClient) updateErrorMetric(scope int, err error) {
switch err.(type) {
case *WorkflowExecutionAlreadyStartedError:
Expand Down
Loading

0 comments on commit 19c32ad

Please sign in to comment.