Skip to content

Commit

Permalink
Log more tags while putting to replication task to dlq (uber#4754)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored Mar 2, 2022
1 parent 7084679 commit d83fa3a
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 64 deletions.
38 changes: 17 additions & 21 deletions service/history/replication/task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func NewTaskProcessor(
taskExecutor TaskExecutor,
) TaskProcessor {
shardID := shard.GetShardID()
sourceCluster := taskFetcher.GetSourceCluster()
firstRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait(shardID))
firstRetryPolicy.SetMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts(shardID))
secondRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorSecondRetryWait(shardID))
Expand All @@ -131,14 +132,14 @@ func NewTaskProcessor(
noTaskRetrier := backoff.NewRetrier(noTaskBackoffPolicy, backoff.SystemClock)
return &taskProcessorImpl{
currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(),
sourceCluster: taskFetcher.GetSourceCluster(),
sourceCluster: sourceCluster,
status: common.DaemonStatusInitialized,
shard: shard,
historyEngine: historyEngine,
historySerializer: persistence.NewPayloadSerializer(),
config: config,
metricsClient: metricsClient,
logger: shard.GetLogger(),
logger: shard.GetLogger().WithTags(tag.SourceCluster(sourceCluster), tag.ShardID(shardID)),
taskExecutor: taskExecutor,
hostRateLimiter: taskFetcher.GetRateLimiter(),
shardRateLimiter: quotas.NewDynamicRateLimiter(config.ReplicationTaskProcessorShardQPS.AsFloat64()),
Expand Down Expand Up @@ -426,17 +427,26 @@ func (p *taskProcessorImpl) processSingleTask(replicationTask *types.Replication
p.logger.Warn("Skip adding new messages to DLQ.", tag.Error(err))
return err
default:
p.logger.Error(
"Failed to apply replication task after retry. Putting task into DLQ.",
tag.TaskID(replicationTask.GetSourceTaskID()),
request, err := p.generateDLQRequest(replicationTask)
if err != nil {
p.logger.Error("Failed to generate DLQ replication task.", tag.Error(err))
// We cannot deserialize the task. Dropping it.
return nil
}
p.logger.Error("Failed to apply replication task after retry. Putting task into DLQ.",
tag.WorkflowDomainID(request.TaskInfo.GetDomainID()),
tag.WorkflowID(request.TaskInfo.GetWorkflowID()),
tag.WorkflowRunID(request.TaskInfo.GetRunID()),
tag.TaskID(request.TaskInfo.GetTaskID()),
tag.TaskType(request.TaskInfo.GetTaskType()),
tag.Error(err),
)
//TODO: uncomment this when the execution fixer workflow is ready
//if err = p.triggerDataInconsistencyScan(replicationTask); err != nil {
// p.logger.Warn("Failed to trigger data scan", tag.Error(err))
// p.metricsClient.IncCounter(metrics.ReplicationDLQStatsScope, metrics.ReplicationDLQValidationFailed)
//}
return p.putReplicationTaskToDLQ(replicationTask)
return p.putReplicationTaskToDLQ(request)
}
}

Expand Down Expand Up @@ -466,21 +476,7 @@ func (p *taskProcessorImpl) processTaskOnce(replicationTask *types.ReplicationTa
return err
}

func (p *taskProcessorImpl) putReplicationTaskToDLQ(replicationTask *types.ReplicationTask) error {
request, err := p.generateDLQRequest(replicationTask)
if err != nil {
p.logger.Error("Failed to generate DLQ replication task.", tag.Error(err))
// We cannot deserialize the task. Dropping it.
return nil
}
p.logger.Info("Put history replication to DLQ",
tag.WorkflowDomainID(request.TaskInfo.GetDomainID()),
tag.WorkflowID(request.TaskInfo.GetWorkflowID()),
tag.WorkflowRunID(request.TaskInfo.GetRunID()),
tag.TaskID(request.TaskInfo.GetTaskID()),
tag.ShardID(p.shard.GetShardID()),
)

func (p *taskProcessorImpl) putReplicationTaskToDLQ(request *persistence.PutReplicationTaskToDLQRequest) error {
p.metricsClient.Scope(
metrics.ReplicationDLQStatsScope,
metrics.TargetClusterTag(p.sourceCluster),
Expand Down
51 changes: 8 additions & 43 deletions service/history/replication/task_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,70 +176,35 @@ func (s *taskProcessorSuite) TestHandleSyncShardStatus() {
}

func (s *taskProcessorSuite) TestPutReplicationTaskToDLQ_SyncActivityReplicationTask() {
domainID := uuid.New()
workflowID := uuid.New()
runID := uuid.New()
task := &types.ReplicationTask{
TaskType: types.ReplicationTaskTypeSyncActivity.Ptr(),
SyncActivityTaskAttributes: &types.SyncActivityTaskAttributes{
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
},
}
request := &persistence.PutReplicationTaskToDLQRequest{
SourceClusterName: "standby",
TaskInfo: &persistence.ReplicationTaskInfo{
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
DomainID: uuid.New(),
WorkflowID: uuid.New(),
RunID: uuid.New(),
TaskType: persistence.ReplicationTaskTypeSyncActivity,
},
}
s.executionManager.On("PutReplicationTaskToDLQ", mock.Anything, request).Return(nil)
err := s.taskProcessor.putReplicationTaskToDLQ(task)
err := s.taskProcessor.putReplicationTaskToDLQ(request)
s.NoError(err)
}

func (s *taskProcessorSuite) TestPutReplicationTaskToDLQ_HistoryV2ReplicationTask() {
domainID := uuid.New()
workflowID := uuid.New()
runID := uuid.New()
events := []*types.HistoryEvent{
{
EventID: 1,
Version: 1,
},
}
serializer := s.mockShard.GetPayloadSerializer()
data, err := serializer.SerializeBatchEvents(events, common.EncodingTypeThriftRW)
s.NoError(err)
task := &types.ReplicationTask{
TaskType: types.ReplicationTaskTypeHistoryV2.Ptr(),
HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
Events: &types.DataBlob{
EncodingType: types.EncodingTypeThriftRW.Ptr(),
Data: data.Data,
},
},
}
request := &persistence.PutReplicationTaskToDLQRequest{
SourceClusterName: "standby",
TaskInfo: &persistence.ReplicationTaskInfo{
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
DomainID: uuid.New(),
WorkflowID: uuid.New(),
RunID: uuid.New(),
TaskType: persistence.ReplicationTaskTypeHistory,
FirstEventID: 1,
NextEventID: 2,
Version: 1,
},
}
s.executionManager.On("PutReplicationTaskToDLQ", mock.Anything, request).Return(nil)
err = s.taskProcessor.putReplicationTaskToDLQ(task)
err := s.taskProcessor.putReplicationTaskToDLQ(request)
s.NoError(err)
}

Expand Down

0 comments on commit d83fa3a

Please sign in to comment.