diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index e88ca0ffe5f..228928ecedd 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -151,10 +151,11 @@ var keys = map[Key]string{ EnableEventsV2: "history.enableEventsV2", NumSystemWorkflows: "history.numSystemWorkflows", - WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS", - WorkerReplicatorConcurrency: "worker.replicatorConcurrency", - WorkerReplicatorBufferRetryCount: "worker.replicatorBufferRetryCount", - WorkerReplicationTaskMaxRetry: "worker.replicationTaskMaxRetry", + WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS", + WorkerReplicatorConcurrency: "worker.replicatorConcurrency", + WorkerReplicatorActivityBufferRetryCount: "worker.replicatorActivityBufferRetryCount", + WorkerReplicatorHistoryBufferRetryCount: "worker.replicatorHistoryBufferRetryCount", + WorkerReplicationTaskMaxRetry: "worker.replicationTaskMaxRetry", } const ( @@ -381,8 +382,10 @@ const ( WorkerPersistenceMaxQPS // WorkerReplicatorConcurrency is the max concurrenct tasks to be processed at any given time WorkerReplicatorConcurrency - // WorkerReplicatorBufferRetryCount is the retry attempt when encounter retry error - WorkerReplicatorBufferRetryCount + // WorkerReplicatorActivityBufferRetryCount is the retry attempt when encounter retry error on activity + WorkerReplicatorActivityBufferRetryCount + // WorkerReplicatorHistoryBufferRetryCount is the retry attempt when encounter retry error on history + WorkerReplicatorHistoryBufferRetryCount // WorkerReplicationTaskMaxRetry is the max retry for any task WorkerReplicationTaskMaxRetry diff --git a/common/xdc/historyRereplicator.go b/common/xdc/historyRereplicator.go index 74579f1eebe..df3b61817f7 100644 --- a/common/xdc/historyRereplicator.go +++ b/common/xdc/historyRereplicator.go @@ -412,10 +412,6 @@ func (c *historyRereplicationContext) getHistory(domainID string, workflowID str logger.WithField(logging.TagErr, err).Error("error getting history") return nil, err } - if len(response.HistoryBatches) == 0 { - logger.WithField(logging.TagErr, ErrNoHistoryRawEventBatches).Error(ErrNoHistoryRawEventBatches.Error()) - return nil, ErrNoHistoryRawEventBatches - } return response, nil } @@ -482,8 +478,5 @@ func (c *historyRereplicationContext) deserializeBlob(blob *shared.DataBlob) ([] return nil, ErrUnknownEncodingType } - if len(historyEvents) == 0 { - return nil, ErrEmptyHistoryRawEventBatch - } return historyEvents, nil } diff --git a/service/worker/replicator/processor.go b/service/worker/replicator/processor.go index f033a48159e..6124ddc6e09 100644 --- a/service/worker/replicator/processor.go +++ b/service/worker/replicator/processor.go @@ -378,6 +378,7 @@ func (p *replicationTaskProcessor) handleActivityTask(task *replicator.Replicati sw := p.metricsClient.StartTimer(metrics.SyncActivityTaskScope, metrics.ReplicatorLatency) defer sw.Stop() + var err error attr := task.SyncActicvityTaskAttributes logger.Debugf("Received sync activity task %v.", attr) @@ -394,9 +395,22 @@ func (p *replicationTaskProcessor) handleActivityTask(task *replicator.Replicati Details: attr.Details, Attempt: attr.Attempt, } - ctx, cancel := context.WithTimeout(context.Background(), replicationTimeout) - err := p.historyClient.SyncActivity(ctx, req) - cancel() + +RetryLoop: + for i := 0; i < p.config.ReplicatorActivityBufferRetryCount(); i++ { + ctx, cancel := context.WithTimeout(context.Background(), replicationTimeout) + err = p.historyClient.SyncActivity(ctx, req) + cancel() + + // Replication tasks could be slightly out of order for a particular workflow execution + // We first try to apply the events without buffering enabled with a small delay to account for such delays + // Caller should try to apply the event with buffering enabled once we return RetryTaskError after all retries + if p.isRetryTaskError(err) { + time.Sleep(retryErrorWaitMillis * time.Millisecond) + continue RetryLoop + } + break RetryLoop + } if !p.isRetryTaskError(err) { return err @@ -409,7 +423,7 @@ func (p *replicationTaskProcessor) handleActivityTask(task *replicator.Replicati defer p.rereplicationLock.UnlockID(workflowIdendifier) // before actually trying to re-replicate the missing event, try again - ctx, cancel = context.WithTimeout(context.Background(), replicationTimeout) + ctx, cancel := context.WithTimeout(context.Background(), replicationTimeout) err = p.historyClient.SyncActivity(ctx, req) cancel() @@ -480,7 +494,7 @@ Loop: } RetryLoop: - for i := 0; i < p.config.ReplicatorBufferRetryCount(); i++ { + for i := 0; i < p.config.ReplicatorHistoryBufferRetryCount(); i++ { ctx, cancel := context.WithTimeout(context.Background(), replicationTimeout) err = p.historyClient.ReplicateEvents(ctx, req) cancel() diff --git a/service/worker/replicator/replicator.go b/service/worker/replicator/replicator.go index f84b248efbd..97a0ee6a0b0 100644 --- a/service/worker/replicator/replicator.go +++ b/service/worker/replicator/replicator.go @@ -63,10 +63,11 @@ type ( Config struct { EnableHistoryRereplication dynamicconfig.BoolPropertyFn - PersistenceMaxQPS dynamicconfig.IntPropertyFn - ReplicatorConcurrency dynamicconfig.IntPropertyFn - ReplicatorBufferRetryCount dynamicconfig.IntPropertyFn - ReplicationTaskMaxRetry dynamicconfig.IntPropertyFn + PersistenceMaxQPS dynamicconfig.IntPropertyFn + ReplicatorConcurrency dynamicconfig.IntPropertyFn + ReplicatorActivityBufferRetryCount dynamicconfig.IntPropertyFn + ReplicatorHistoryBufferRetryCount dynamicconfig.IntPropertyFn + ReplicationTaskMaxRetry dynamicconfig.IntPropertyFn } ) diff --git a/service/worker/service.go b/service/worker/service.go index 277efbcf040..b6daf489068 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -82,11 +82,12 @@ func NewService(params *service.BootstrapParams) common.Daemon { func NewConfig(dc *dynamicconfig.Collection) *Config { return &Config{ ReplicationCfg: &replicator.Config{ - EnableHistoryRereplication: dc.GetBoolProperty(dynamicconfig.EnableHistoryRereplication, false), - PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.WorkerPersistenceMaxQPS, 500), - ReplicatorConcurrency: dc.GetIntProperty(dynamicconfig.WorkerReplicatorConcurrency, 1000), - ReplicatorBufferRetryCount: dc.GetIntProperty(dynamicconfig.WorkerReplicatorBufferRetryCount, 8), - ReplicationTaskMaxRetry: dc.GetIntProperty(dynamicconfig.WorkerReplicationTaskMaxRetry, 50), + EnableHistoryRereplication: dc.GetBoolProperty(dynamicconfig.EnableHistoryRereplication, true), + PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.WorkerPersistenceMaxQPS, 500), + ReplicatorConcurrency: dc.GetIntProperty(dynamicconfig.WorkerReplicatorConcurrency, 1000), + ReplicatorActivityBufferRetryCount: dc.GetIntProperty(dynamicconfig.WorkerReplicatorActivityBufferRetryCount, 8), + ReplicatorHistoryBufferRetryCount: dc.GetIntProperty(dynamicconfig.WorkerReplicatorHistoryBufferRetryCount, 8), + ReplicationTaskMaxRetry: dc.GetIntProperty(dynamicconfig.WorkerReplicationTaskMaxRetry, 50), }, SysWorkflowCfg: &sysworkflow.Config{}, }