Skip to content

Commit

Permalink
Make sync activity retry multiple times before fetch history from rem…
Browse files Browse the repository at this point in the history
…ote (uber#1379)

* Make sync activity retry multiple times before fetch history from remote
  • Loading branch information
wxing1292 authored Jan 9, 2019
1 parent 7a0afef commit 8ad3979
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 32 deletions.
25 changes: 14 additions & 11 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,16 @@ var keys = map[Key]string{
EnableEventsV2: "history.enableEventsV2",
NumSystemWorkflows: "history.numSystemWorkflows",

WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
WorkerReplicatorConcurrency: "worker.replicatorConcurrency",
WorkerReplicatorBufferRetryCount: "worker.replicatorBufferRetryCount",
WorkerReplicationTaskMaxRetry: "worker.replicationTaskMaxRetry",
WorkerIndexerConcurrency: "worker.indexerConcurrency",
WorkerESProcessorNumOfWorkers: "worker.ESProcessorNumOfWorkers",
WorkerESProcessorBulkActions: "worker.ESProcessorBulkActions",
WorkerESProcessorBulkSize: "worker.ESProcessorBulkSize",
WorkerESProcessorFlushInterval: "worker.ESProcessorFlushInterval",
WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
WorkerReplicatorConcurrency: "worker.replicatorConcurrency",
WorkerReplicatorActivityBufferRetryCount: "worker.replicatorActivityBufferRetryCount",
WorkerReplicatorHistoryBufferRetryCount: "worker.replicatorHistoryBufferRetryCount",
WorkerReplicationTaskMaxRetry: "worker.replicationTaskMaxRetry",
WorkerIndexerConcurrency: "worker.indexerConcurrency",
WorkerESProcessorNumOfWorkers: "worker.ESProcessorNumOfWorkers",
WorkerESProcessorBulkActions: "worker.ESProcessorBulkActions",
WorkerESProcessorBulkSize: "worker.ESProcessorBulkSize",
WorkerESProcessorFlushInterval: "worker.ESProcessorFlushInterval",
}

const (
Expand Down Expand Up @@ -386,8 +387,10 @@ const (
WorkerPersistenceMaxQPS
// WorkerReplicatorConcurrency is the max concurrent tasks to be processed at any given time
WorkerReplicatorConcurrency
// WorkerReplicatorBufferRetryCount is the retry attempt when encounter retry error
WorkerReplicatorBufferRetryCount
// WorkerReplicatorActivityBufferRetryCount is the retry attempt when encounter retry error on activity
WorkerReplicatorActivityBufferRetryCount
// WorkerReplicatorHistoryBufferRetryCount is the retry attempt when encounter retry error on history
WorkerReplicatorHistoryBufferRetryCount
// WorkerReplicationTaskMaxRetry is the max retry for any task
WorkerReplicationTaskMaxRetry
// WorkerIndexerConcurrency is the max concurrent messages to be processed at any given time
Expand Down
7 changes: 0 additions & 7 deletions common/xdc/historyRereplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,10 +412,6 @@ func (c *historyRereplicationContext) getHistory(domainID string, workflowID str
logger.WithField(logging.TagErr, err).Error("error getting history")
return nil, err
}
if len(response.HistoryBatches) == 0 {
logger.WithField(logging.TagErr, ErrNoHistoryRawEventBatches).Error(ErrNoHistoryRawEventBatches.Error())
return nil, ErrNoHistoryRawEventBatches
}

return response, nil
}
Expand Down Expand Up @@ -482,8 +478,5 @@ func (c *historyRereplicationContext) deserializeBlob(blob *shared.DataBlob) ([]
return nil, ErrUnknownEncodingType
}

if len(historyEvents) == 0 {
return nil, ErrEmptyHistoryRawEventBatch
}
return historyEvents, nil
}
24 changes: 19 additions & 5 deletions service/worker/replicator/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ func (p *replicationTaskProcessor) handleActivityTask(task *replicator.Replicati
sw := p.metricsClient.StartTimer(metrics.SyncActivityTaskScope, metrics.ReplicatorLatency)
defer sw.Stop()

var err error
attr := task.SyncActicvityTaskAttributes
logger.Debugf("Received sync activity task %v.", attr)

Expand All @@ -394,9 +395,22 @@ func (p *replicationTaskProcessor) handleActivityTask(task *replicator.Replicati
Details: attr.Details,
Attempt: attr.Attempt,
}
ctx, cancel := context.WithTimeout(context.Background(), replicationTimeout)
err := p.historyClient.SyncActivity(ctx, req)
cancel()

RetryLoop:
for i := 0; i < p.config.ReplicatorActivityBufferRetryCount(); i++ {
ctx, cancel := context.WithTimeout(context.Background(), replicationTimeout)
err = p.historyClient.SyncActivity(ctx, req)
cancel()

// Replication tasks could be slightly out of order for a particular workflow execution
// We first try to apply the events without buffering enabled with a small delay to account for such delays
// Caller should try to apply the event with buffering enabled once we return RetryTaskError after all retries
if p.isRetryTaskError(err) {
time.Sleep(retryErrorWaitMillis * time.Millisecond)
continue RetryLoop
}
break RetryLoop
}

if !p.isRetryTaskError(err) {
return err
Expand All @@ -409,7 +423,7 @@ func (p *replicationTaskProcessor) handleActivityTask(task *replicator.Replicati
defer p.rereplicationLock.UnlockID(workflowIdendifier)

// before actually trying to re-replicate the missing event, try again
ctx, cancel = context.WithTimeout(context.Background(), replicationTimeout)
ctx, cancel := context.WithTimeout(context.Background(), replicationTimeout)
err = p.historyClient.SyncActivity(ctx, req)
cancel()

Expand Down Expand Up @@ -485,7 +499,7 @@ Loop:
}

RetryLoop:
for i := 0; i < p.config.ReplicatorBufferRetryCount(); i++ {
for i := 0; i < p.config.ReplicatorHistoryBufferRetryCount(); i++ {
ctx, cancel := context.WithTimeout(context.Background(), replicationTimeout)
err = p.historyClient.ReplicateEvents(ctx, req)
cancel()
Expand Down
9 changes: 5 additions & 4 deletions service/worker/replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ type (
Config struct {
EnableHistoryRereplication dynamicconfig.BoolPropertyFn

PersistenceMaxQPS dynamicconfig.IntPropertyFn
ReplicatorConcurrency dynamicconfig.IntPropertyFn
ReplicatorBufferRetryCount dynamicconfig.IntPropertyFn
ReplicationTaskMaxRetry dynamicconfig.IntPropertyFn
PersistenceMaxQPS dynamicconfig.IntPropertyFn
ReplicatorConcurrency dynamicconfig.IntPropertyFn
ReplicatorActivityBufferRetryCount dynamicconfig.IntPropertyFn
ReplicatorHistoryBufferRetryCount dynamicconfig.IntPropertyFn
ReplicationTaskMaxRetry dynamicconfig.IntPropertyFn
}
)

Expand Down
11 changes: 6 additions & 5 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ func NewService(params *service.BootstrapParams) common.Daemon {
func NewConfig(dc *dynamicconfig.Collection) *Config {
return &Config{
ReplicationCfg: &replicator.Config{
EnableHistoryRereplication: dc.GetBoolProperty(dynamicconfig.EnableHistoryRereplication, false),
PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.WorkerPersistenceMaxQPS, 500),
ReplicatorConcurrency: dc.GetIntProperty(dynamicconfig.WorkerReplicatorConcurrency, 1000),
ReplicatorBufferRetryCount: dc.GetIntProperty(dynamicconfig.WorkerReplicatorBufferRetryCount, 8),
ReplicationTaskMaxRetry: dc.GetIntProperty(dynamicconfig.WorkerReplicationTaskMaxRetry, 50),
EnableHistoryRereplication: dc.GetBoolProperty(dynamicconfig.EnableHistoryRereplication, true),
PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.WorkerPersistenceMaxQPS, 500),
ReplicatorConcurrency: dc.GetIntProperty(dynamicconfig.WorkerReplicatorConcurrency, 1000),
ReplicatorActivityBufferRetryCount: dc.GetIntProperty(dynamicconfig.WorkerReplicatorActivityBufferRetryCount, 8),
ReplicatorHistoryBufferRetryCount: dc.GetIntProperty(dynamicconfig.WorkerReplicatorHistoryBufferRetryCount, 8),
ReplicationTaskMaxRetry: dc.GetIntProperty(dynamicconfig.WorkerReplicationTaskMaxRetry, 50),
},
SysWorkflowCfg: &sysworkflow.Config{},
IndexerCfg: &indexer.Config{
Expand Down

0 comments on commit 8ad3979

Please sign in to comment.