Skip to content

Commit

Permalink
Enable graceful shutdown of replication task fetcher by default
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir committed Dec 18, 2024
1 parent f4e219a commit 82f640a
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 29 deletions.
11 changes: 0 additions & 11 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1708,12 +1708,6 @@ const (
// Default value: false
// Allowed filters: N/A
QueueProcessorEnableGracefulSyncShutdown
// ReplicationTaskFetcherEnableGracefulSyncShutdown indicates whether task fetcher should be shutdown gracefully & synchronously
// KeyName: history.replicationTaskFetcherEnableGracefulSyncShutdown
// Value type: Bool
// Default value: false
// Allowed filters: N/A
ReplicationTaskFetcherEnableGracefulSyncShutdown
// TransferProcessorEnableValidator is whether validator should be enabled for transferQueueProcessor
// KeyName: history.transferProcessorEnableValidator
// Value type: Bool
Expand Down Expand Up @@ -4109,11 +4103,6 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "QueueProcessorEnableGracefulSyncShutdown indicates whether processing queue should be shutdown gracefully & synchronously",
DefaultValue: false,
},
ReplicationTaskFetcherEnableGracefulSyncShutdown: {
KeyName: "history.replicationTaskFetcherEnableGracefulSyncShutdown",
Description: "ReplicationTaskFetcherEnableGracefulSyncShutdown is whether we should gracefully drain replication task fetcher on shutdown",
DefaultValue: false,
},
TransferProcessorEnableValidator: {
KeyName: "history.transferProcessorEnableValidator",
Description: "TransferProcessorEnableValidator is whether validator should be enabled for transferQueueProcessor",
Expand Down
4 changes: 0 additions & 4 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ type Config struct {
ReplicationTaskFetcherTimerJitterCoefficient dynamicconfig.FloatPropertyFn
ReplicationTaskFetcherErrorRetryWait dynamicconfig.DurationPropertyFn
ReplicationTaskFetcherServiceBusyWait dynamicconfig.DurationPropertyFn
ReplicationTaskFetcherEnableGracefulSyncShutdown dynamicconfig.BoolPropertyFn
ReplicationTaskProcessorErrorRetryWait dynamicconfig.DurationPropertyFnWithShardIDFilter
ReplicationTaskProcessorErrorRetryMaxAttempts dynamicconfig.IntPropertyFnWithShardIDFilter
ReplicationTaskProcessorErrorSecondRetryWait dynamicconfig.DurationPropertyFnWithShardIDFilter
Expand Down Expand Up @@ -500,7 +499,6 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, i
ReplicationTaskFetcherTimerJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicationTaskFetcherTimerJitterCoefficient),
ReplicationTaskFetcherErrorRetryWait: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherErrorRetryWait),
ReplicationTaskFetcherServiceBusyWait: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherServiceBusyWait),
ReplicationTaskFetcherEnableGracefulSyncShutdown: dc.GetBoolProperty(dynamicconfig.ReplicationTaskFetcherEnableGracefulSyncShutdown),
ReplicationTaskProcessorErrorRetryWait: dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorErrorRetryWait),
ReplicationTaskProcessorErrorRetryMaxAttempts: dc.GetIntPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorErrorRetryMaxAttempts),
ReplicationTaskProcessorErrorSecondRetryWait: dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorErrorSecondRetryWait),
Expand Down Expand Up @@ -606,7 +604,6 @@ func NewForTestByShardNumber(shardNumber int) *Config {
"1": 50,
}))
panicIfErr(inMem.UpdateValue(dynamicconfig.QueueProcessorRandomSplitProbability, 0.5))
panicIfErr(inMem.UpdateValue(dynamicconfig.ReplicationTaskFetcherEnableGracefulSyncShutdown, true))
panicIfErr(inMem.UpdateValue(dynamicconfig.EnableStrongIdempotency, true))

dc := dynamicconfig.NewCollection(inMem, log.NewNoop())
Expand All @@ -631,7 +628,6 @@ func NewForTestByShardNumber(shardNumber int) *Config {
config.QueueProcessorPendingTaskSplitThreshold = dc.GetMapProperty(dynamicconfig.QueueProcessorPendingTaskSplitThreshold)
config.QueueProcessorStuckTaskSplitThreshold = dc.GetMapProperty(dynamicconfig.QueueProcessorStuckTaskSplitThreshold)
config.QueueProcessorRandomSplitProbability = dc.GetFloat64Property(dynamicconfig.QueueProcessorRandomSplitProbability)
config.ReplicationTaskFetcherEnableGracefulSyncShutdown = dc.GetBoolProperty(dynamicconfig.ReplicationTaskFetcherEnableGracefulSyncShutdown)
return config
}

Expand Down
1 change: 0 additions & 1 deletion service/history/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ func TestNewConfig(t *testing.T) {
"ReplicationTaskFetcherTimerJitterCoefficient": {dynamicconfig.ReplicationTaskFetcherTimerJitterCoefficient, 9.0},
"ReplicationTaskFetcherErrorRetryWait": {dynamicconfig.ReplicationTaskFetcherErrorRetryWait, time.Second},
"ReplicationTaskFetcherServiceBusyWait": {dynamicconfig.ReplicationTaskFetcherServiceBusyWait, time.Second},
"ReplicationTaskFetcherEnableGracefulSyncShutdown": {dynamicconfig.ReplicationTaskFetcherEnableGracefulSyncShutdown, true},
"ReplicationTaskProcessorErrorRetryWait": {dynamicconfig.ReplicationTaskProcessorErrorRetryWait, time.Second},
"ReplicationTaskProcessorErrorRetryMaxAttempts": {dynamicconfig.ReplicationTaskProcessorErrorRetryMaxAttempts, 86},
"ReplicationTaskProcessorErrorSecondRetryWait": {dynamicconfig.ReplicationTaskProcessorErrorSecondRetryWait, time.Second},
Expand Down
18 changes: 5 additions & 13 deletions service/history/replication/task_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,11 @@ func (f *taskFetcherImpl) Stop() {
}

f.cancelCtx()
// TODO: remove this config and disable non graceful shutdown
if f.config.ReplicationTaskFetcherEnableGracefulSyncShutdown() {
if !common.AwaitWaitGroup(&f.wg, 10*time.Second) {
f.logger.Warn("Replication task fetcher timed out on shutdown.")
} else {
f.logger.Info("Replication task fetcher graceful shutdown completed.")
}
if !common.AwaitWaitGroup(&f.wg, 10*time.Second) {
f.logger.Warn("Replication task fetcher timed out on shutdown.")
} else {
f.logger.Info("Replication task fetcher graceful shutdown completed.")
}
f.logger.Info("Replication task fetcher stopped.")
}

// fetchTasks collects getReplicationTasks request from shards and send out aggregated request to source frontend.
Expand Down Expand Up @@ -291,11 +287,7 @@ func (f *taskFetcherImpl) getMessages(requestByShard map[int32]*request) (map[in
tokens = append(tokens, request.token)
}

parentCtx := f.ctx
if !f.config.ReplicationTaskFetcherEnableGracefulSyncShutdown() {
parentCtx = context.Background()
}
ctx, cancel := context.WithTimeout(parentCtx, fetchTaskRequestTimeout)
ctx, cancel := context.WithTimeout(f.ctx, fetchTaskRequestTimeout)
defer cancel()

request := &types.GetReplicationMessagesRequest{
Expand Down

0 comments on commit 82f640a

Please sign in to comment.