Skip to content

Commit

Permalink
Add flag for replication cleanup process (#3241)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 committed May 12, 2020
1 parent c5f5276 commit 1399966
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 12 deletions.
2 changes: 1 addition & 1 deletion common/persistence/domainReplicationQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (q *domainReplicationQueueImpl) Start() {
}

func (q *domainReplicationQueueImpl) Stop() {
if !atomic.CompareAndSwapInt32(&q.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
if !atomic.CompareAndSwapInt32(&q.status, common.DaemonStatusInitialized, common.DaemonStatusStopped) {
return
}
close(q.done)
Expand Down
17 changes: 16 additions & 1 deletion common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ var keys = map[Key]string{
FrontendThrottledLogRPS: "frontend.throttledLogRPS",
EnableClientVersionCheck: "frontend.enableClientVersionCheck",
ValidSearchAttributes: "frontend.validSearchAttributes",
SendRawWorkflowHistory: "frontend.sendRawWorkflowHistory",
FrontendEnableRPCReplication: "frontend.enableRPCReplication",
FrontendEnableCleanupReplicationTask: "frontend.enableCleanupReplicationTask",
SearchAttributesNumberOfKeysLimit: "frontend.searchAttributesNumberOfKeysLimit",
SearchAttributesSizeOfValueLimit: "frontend.searchAttributesSizeOfValueLimit",
SearchAttributesTotalSizeLimit: "frontend.searchAttributesTotalSizeLimit",
Expand Down Expand Up @@ -225,7 +228,7 @@ var keys = map[Key]string{
MutableStateChecksumInvalidateBefore: "history.mutableStateChecksumInvalidateBefore",
ReplicationEventsFromCurrentCluster: "history.ReplicationEventsFromCurrentCluster",
EnableDropStuckTaskByDomainID: "history.DropStuckTaskByDomain",
SkipReapplicationByDomainId: "history.SkipReapplicationByDomainId",
SkipReapplicationByDomainId: "history.SkipReapplicationByDomainId",

WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
WorkerPersistenceGlobalMaxQPS: "worker.persistenceGlobalMaxQPS",
Expand Down Expand Up @@ -368,6 +371,12 @@ const (
FrontendMaxBadBinaries
// ValidSearchAttributes is legal indexed keys that can be used in list APIs
ValidSearchAttributes
// SendRawWorkflowHistory is whether to enable raw history retrieving
SendRawWorkflowHistory
// FrontendEnableRPCReplication is a feature flag for rpc replication
FrontendEnableRPCReplication
// FrontendEnableCleanupReplicationTask is a feature flag for rpc replication cleanup
FrontendEnableCleanupReplicationTask
// SearchAttributesNumberOfKeysLimit is the limit of number of keys
SearchAttributesNumberOfKeysLimit
// SearchAttributesSizeOfValueLimit is the size limit of each value
Expand Down Expand Up @@ -677,6 +686,12 @@ const (
ReplicationTaskProcessorCleanupInterval
// ReplicationTaskProcessorCleanupJitterCoefficient is the jitter for cleanup timer
ReplicationTaskProcessorCleanupJitterCoefficient
// HistoryEnableRPCReplication is the feature flag for RPC replication
HistoryEnableRPCReplication
// HistoryEnableKafkaReplication is the migration flag for Kafka replication
HistoryEnableKafkaReplication
// HistoryEnableCleanupReplicationTask is the migration flag for Kafka replication
HistoryEnableCleanupReplicationTask
// EnableConsistentQuery indicates if consistent query is enabled for the cluster
EnableConsistentQuery
// EnableConsistentQueryByDomain indicates if consistent query is enabled for a domain
Expand Down
4 changes: 3 additions & 1 deletion service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ func (adh *AdminHandler) RegisterHandler() {
// Start starts the handler
func (adh *AdminHandler) Start() {
// Start domain replication queue cleanup
adh.Resource.GetDomainReplicationQueue().Start()
if adh.config.EnableCleanupReplicationTask() {
adh.Resource.GetDomainReplicationQueue().Start()
}
}

// Stop stops the handler
Expand Down
8 changes: 5 additions & 3 deletions service/frontend/adminHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func (s *adminHandlerSuite) SetupTest() {
},
}
config := &Config{
EnableAdminProtection: dynamicconfig.GetBoolPropertyFn(false),
EnableAdminProtection: dynamicconfig.GetBoolPropertyFn(false),
EnableCleanupReplicationTask: dynamicconfig.GetBoolPropertyFn(false),
}
s.handler = NewAdminHandler(s.mockResource, params, config)
s.handler.Start()
Expand Down Expand Up @@ -552,8 +553,9 @@ func (s *adminHandlerSuite) Test_AddSearchAttribute_Permission() {
ctx := context.Background()
handler := s.handler
handler.config = &Config{
EnableAdminProtection: dynamicconfig.GetBoolPropertyFn(true),
AdminOperationToken: dynamicconfig.GetStringPropertyFn(common.DefaultAdminOperationToken),
EnableAdminProtection: dynamicconfig.GetBoolPropertyFn(true),
AdminOperationToken: dynamicconfig.GetStringPropertyFn(common.DefaultAdminOperationToken),
EnableCleanupReplicationTask: dynamicconfig.GetBoolPropertyFn(false),
}

type test struct {
Expand Down
8 changes: 8 additions & 0 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ type Config struct {

// VisibilityArchival system protection
VisibilityArchivalQueryMaxPageSize dynamicconfig.IntPropertyFn

SendRawWorkflowHistory dynamicconfig.BoolPropertyFnWithDomainFilter

EnableRPCReplication dynamicconfig.BoolPropertyFn
EnableCleanupReplicationTask dynamicconfig.BoolPropertyFn
}

// NewConfig returns new service config with default values
Expand Down Expand Up @@ -129,6 +134,9 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFro
MinRetentionDays: dc.GetIntProperty(dynamicconfig.MinRetentionDays, domain.MinRetentionDays),
VisibilityArchivalQueryMaxPageSize: dc.GetIntProperty(dynamicconfig.VisibilityArchivalQueryMaxPageSize, 10000),
DisallowQuery: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.DisallowQuery, false),
SendRawWorkflowHistory: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.SendRawWorkflowHistory, false),
EnableRPCReplication: dc.GetBoolProperty(dynamicconfig.FrontendEnableRPCReplication, false),
EnableCleanupReplicationTask: dc.GetBoolProperty(dynamicconfig.FrontendEnableCleanupReplicationTask, true),
}
}

Expand Down
10 changes: 6 additions & 4 deletions service/history/replicationTaskProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,12 @@ func (p *ReplicationTaskProcessorImpl) cleanupReplicationTaskLoop() {
timer.Stop()
return
case <-timer.C:
err := p.cleanupAckedReplicationTasks()
if err != nil {
p.logger.Error("Failed to clean up replication messages.", tag.Error(err))
p.metricsClient.Scope(metrics.ReplicationTaskCleanupScope).IncCounter(metrics.ReplicationTaskCleanupFailure)
if p.config.EnableCleanupReplicationTask() {
err := p.cleanupAckedReplicationTasks()
if err != nil {
p.logger.Error("Failed to clean up replication messages.", tag.Error(err))
p.metricsClient.Scope(metrics.ReplicationTaskCleanupScope).IncCounter(metrics.ReplicationTaskCleanupFailure)
}
}
timer.Reset(backoff.JitDuration(
p.config.ShardSyncMinInterval(),
Expand Down
7 changes: 5 additions & 2 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ type Config struct {
ReplicationTaskProcessorCleanupInterval dynamicconfig.DurationPropertyFn
ReplicationTaskProcessorCleanupJitterCoefficient dynamicconfig.FloatPropertyFn

EnableCleanupReplicationTask dynamicconfig.BoolPropertyFn

// The following are used by consistent query
EnableConsistentQuery dynamicconfig.BoolPropertyFn
EnableConsistentQueryByDomain dynamicconfig.BoolPropertyFnWithDomainFilter
Expand All @@ -207,7 +209,7 @@ type Config struct {
ReplicationEventsFromCurrentCluster dynamicconfig.BoolPropertyFnWithDomainFilter

EnableDropStuckTaskByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter
SkipReapplicationByDomainId dynamicconfig.BoolPropertyFnWithDomainIDFilter
SkipReapplicationByDomainId dynamicconfig.BoolPropertyFnWithDomainIDFilter
}

const (
Expand Down Expand Up @@ -331,6 +333,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, storeType strin
ReplicationTaskProcessorNoTaskRetryWait: dc.GetDurationProperty(dynamicconfig.ReplicationTaskProcessorNoTaskInitialWait, 2*time.Second),
ReplicationTaskProcessorCleanupInterval: dc.GetDurationProperty(dynamicconfig.ReplicationTaskProcessorCleanupInterval, 1*time.Minute),
ReplicationTaskProcessorCleanupJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorCleanupJitterCoefficient, 0.15),
EnableCleanupReplicationTask: dc.GetBoolProperty(dynamicconfig.HistoryEnableCleanupReplicationTask, true),

EnableConsistentQuery: dc.GetBoolProperty(dynamicconfig.EnableConsistentQuery, true),
EnableConsistentQueryByDomain: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableConsistentQueryByDomain, false),
Expand All @@ -342,7 +345,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, storeType strin
ReplicationEventsFromCurrentCluster: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.ReplicationEventsFromCurrentCluster, false),

EnableDropStuckTaskByDomainID: dc.GetBoolPropertyFnWithDomainIDFilter(dynamicconfig.EnableDropStuckTaskByDomainID, false),
SkipReapplicationByDomainId: dc.GetBoolPropertyFnWithDomainIDFilter(dynamicconfig.SkipReapplicationByDomainId, false),
SkipReapplicationByDomainId: dc.GetBoolPropertyFnWithDomainIDFilter(dynamicconfig.SkipReapplicationByDomainId, false),
}

return cfg
Expand Down

0 comments on commit 1399966

Please sign in to comment.