Skip to content

Commit

Permalink
Merge branch 'master' into better-postgres-defaults
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored May 9, 2020
2 parents 1625644 + df82425 commit 4f3b43c
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 35 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2020 Uber Technologies, Inc.
Copyright (c) 2017-2020 Uber Technologies Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/domainReplicationQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (q *domainReplicationQueueImpl) Start() {
}

func (q *domainReplicationQueueImpl) Stop() {
if !atomic.CompareAndSwapInt32(&q.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
if !atomic.CompareAndSwapInt32(&q.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}
close(q.done)
Expand Down
6 changes: 6 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ var keys = map[Key]string{
ValidSearchAttributes: "frontend.validSearchAttributes",
SendRawWorkflowHistory: "frontend.sendRawWorkflowHistory",
FrontendEnableRPCReplication: "frontend.enableRPCReplication",
FrontendEnableCleanupReplicationTask: "frontend.enableCleanupReplicationTask",
SearchAttributesNumberOfKeysLimit: "frontend.searchAttributesNumberOfKeysLimit",
SearchAttributesSizeOfValueLimit: "frontend.searchAttributesSizeOfValueLimit",
SearchAttributesTotalSizeLimit: "frontend.searchAttributesTotalSizeLimit",
Expand Down Expand Up @@ -236,6 +237,7 @@ var keys = map[Key]string{
ReplicationTaskProcessorCleanupJitterCoefficient: "history.ReplicationTaskProcessorCleanupJitterCoefficient",
HistoryEnableRPCReplication: "history.EnableRPCReplication",
HistoryEnableKafkaReplication: "history.EnableKafkaReplication",
HistoryEnableCleanupReplicationTask: "history.EnableCleanupReplicationTask",
EnableConsistentQuery: "history.EnableConsistentQuery",
EnableConsistentQueryByDomain: "history.EnableConsistentQueryByDomain",
MaxBufferedQueryCount: "history.MaxBufferedQueryCount",
Expand Down Expand Up @@ -391,6 +393,8 @@ const (
SendRawWorkflowHistory
// FrontendEnableRPCReplication is a feature flag for rpc replication
FrontendEnableRPCReplication
// FrontendEnableCleanupReplicationTask is a feature flag for rpc replication cleanup
FrontendEnableCleanupReplicationTask
// SearchAttributesNumberOfKeysLimit is the limit of number of keys
SearchAttributesNumberOfKeysLimit
// SearchAttributesSizeOfValueLimit is the size limit of each value
Expand Down Expand Up @@ -733,6 +737,8 @@ const (
HistoryEnableRPCReplication
// HistoryEnableKafkaReplication is the migration flag for Kafka replication
HistoryEnableKafkaReplication
// HistoryEnableCleanupReplicationTask is the migration flag for Kafka replication
HistoryEnableCleanupReplicationTask
// EnableConsistentQuery indicates if consistent query is enabled for the cluster
EnableConsistentQuery
// EnableConsistentQueryByDomain indicates if consistent query is enabled for a domain
Expand Down
6 changes: 5 additions & 1 deletion service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,15 @@ func (adh *AdminHandler) RegisterHandler() {
// Start starts the handler
func (adh *AdminHandler) Start() {
// Start domain replication queue cleanup
adh.Resource.GetDomainReplicationQueue().Start()
if adh.config.EnableCleanupReplicationTask() {
// If the queue does not start, we can still call stop()
adh.Resource.GetDomainReplicationQueue().Start()
}
}

// Stop stops the handler
func (adh *AdminHandler) Stop() {
// Calling stop if the queue does not start is ok
adh.Resource.GetDomainReplicationQueue().Stop()
}

Expand Down
2 changes: 2 additions & 0 deletions service/frontend/adminHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (s *adminHandlerSuite) SetupTest() {
}
config := &Config{
EnableAdminProtection: dynamicconfig.GetBoolPropertyFn(false),
EnableCleanupReplicationTask: dynamicconfig.GetBoolPropertyFn(false),
}
s.handler = NewAdminHandler(s.mockResource, params, config)
s.handler.Start()
Expand Down Expand Up @@ -554,6 +555,7 @@ func (s *adminHandlerSuite) Test_AddSearchAttribute_Permission() {
handler.config = &Config{
EnableAdminProtection: dynamicconfig.GetBoolPropertyFn(true),
AdminOperationToken: dynamicconfig.GetStringPropertyFn(common.DefaultAdminOperationToken),
EnableCleanupReplicationTask: dynamicconfig.GetBoolPropertyFn(false),
}

type test struct {
Expand Down
4 changes: 3 additions & 1 deletion service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ type Config struct {

SendRawWorkflowHistory dynamicconfig.BoolPropertyFnWithDomainFilter

EnableRPCReplication dynamicconfig.BoolPropertyFn
EnableRPCReplication dynamicconfig.BoolPropertyFn
EnableCleanupReplicationTask dynamicconfig.BoolPropertyFn
}

// NewConfig returns new service config with default values
Expand Down Expand Up @@ -135,6 +136,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFro
DisallowQuery: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.DisallowQuery, false),
SendRawWorkflowHistory: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.SendRawWorkflowHistory, false),
EnableRPCReplication: dc.GetBoolProperty(dynamicconfig.FrontendEnableRPCReplication, false),
EnableCleanupReplicationTask: dc.GetBoolProperty(dynamicconfig.FrontendEnableCleanupReplicationTask, true),
}
}

Expand Down
2 changes: 2 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ type Config struct {
// TODO: those two flags are for migration. Consider remove them after the migration complete
EnableRPCReplication dynamicconfig.BoolPropertyFn
EnableKafkaReplication dynamicconfig.BoolPropertyFn
EnableCleanupReplicationTask dynamicconfig.BoolPropertyFn

// The following are used by consistent query
EnableConsistentQuery dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -371,6 +372,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
ReplicationTaskProcessorCleanupJitterCoefficient: dc.GetFloat64PropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorCleanupJitterCoefficient, 0.15),
EnableRPCReplication: dc.GetBoolProperty(dynamicconfig.HistoryEnableRPCReplication, false),
EnableKafkaReplication: dc.GetBoolProperty(dynamicconfig.HistoryEnableKafkaReplication, true),
EnableCleanupReplicationTask: dc.GetBoolProperty(dynamicconfig.HistoryEnableCleanupReplicationTask, true),

EnableConsistentQuery: dc.GetBoolProperty(dynamicconfig.EnableConsistentQuery, true),
EnableConsistentQueryByDomain: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableConsistentQueryByDomain, false),
Expand Down
12 changes: 7 additions & 5 deletions service/history/events/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,11 @@ func newEventKey(domainID, workflowID, runID string, eventID int64) eventKey {
}
}

func (e *cacheImpl) GetEvent(domainID, workflowID, runID string, firstEventID, eventID int64,
branchToken []byte) (*shared.HistoryEvent, error) {
func (e *cacheImpl) GetEvent(
domainID, workflowID, runID string,
firstEventID, eventID int64,
branchToken []byte,
) (*shared.HistoryEvent, error) {
e.metricsClient.IncCounter(metrics.EventsCacheGetEventScope, metrics.CacheRequests)
sw := e.metricsClient.StartTimer(metrics.EventsCacheGetEventScope, metrics.CacheLatency)
defer sw.Stop()
Expand All @@ -152,7 +155,7 @@ func (e *cacheImpl) GetEvent(domainID, workflowID, runID string, firstEventID, e
}

e.metricsClient.IncCounter(metrics.EventsCacheGetEventScope, metrics.CacheMissCounter)
event, err := e.getHistoryEventFromStore(domainID, workflowID, runID, firstEventID, eventID, branchToken)
event, err := e.getHistoryEventFromStore(firstEventID, eventID, branchToken)
if err != nil {
e.metricsClient.IncCounter(metrics.EventsCacheGetEventScope, metrics.CacheFailures)
e.logger.Error("EventsCache unable to retrieve event from store",
Expand Down Expand Up @@ -186,8 +189,7 @@ func (e *cacheImpl) DeleteEvent(domainID, workflowID, runID string, eventID int6
e.Delete(key)
}

func (e *cacheImpl) getHistoryEventFromStore(domainID, workflowID, runID string, firstEventID, eventID int64,
branchToken []byte) (*shared.HistoryEvent, error) {
func (e *cacheImpl) getHistoryEventFromStore(firstEventID, eventID int64, branchToken []byte) (*shared.HistoryEvent, error) {
e.metricsClient.IncCounter(metrics.EventsCacheGetFromStoreScope, metrics.CacheRequests)
sw := e.metricsClient.StartTimer(metrics.EventsCacheGetFromStoreScope, metrics.CacheLatency)
defer sw.Stop()
Expand Down
35 changes: 13 additions & 22 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@ import (
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/locks"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/service/history/engine"
"github.com/uber/cadence/service/history/events"
"github.com/uber/cadence/service/history/shard"
)
Expand Down Expand Up @@ -148,11 +146,9 @@ type (
domainID string
workflowExecution workflow.WorkflowExecution
shard shard.Context
engine engine.Engine
executionManager persistence.ExecutionManager
logger log.Logger
metricsClient metrics.Client
timeSource clock.TimeSource

mutex locks.Mutex
mutableState MutableState
Expand All @@ -179,11 +175,9 @@ func NewContext(
domainID: domainID,
workflowExecution: execution,
shard: shard,
engine: shard.GetEngine(),
executionManager: executionManager,
logger: logger,
metricsClient: shard.GetMetricsClient(),
timeSource: shard.GetTimeSource(),
mutex: locks.NewMutex(),
stats: &persistence.ExecutionStats{
HistorySize: 0,
Expand Down Expand Up @@ -562,7 +556,7 @@ func (c *contextImpl) ConflictResolveWorkflowExecution(

workflowState, workflowCloseState := resetMutableState.GetWorkflowStateCloseStatus()
// Current branch changed and notify the watchers
c.engine.NotifyNewHistoryEvent(events.NewNotification(
c.shard.GetEngine().NotifyNewHistoryEvent(events.NewNotification(
c.domainID,
&c.workflowExecution,
resetMutableState.GetLastFirstEventID(),
Expand Down Expand Up @@ -759,7 +753,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
return err
}
workflowState, workflowCloseState := c.mutableState.GetWorkflowStateCloseStatus()
c.engine.NotifyNewHistoryEvent(events.NewNotification(
c.shard.GetEngine().NotifyNewHistoryEvent(events.NewNotification(
c.domainID,
&c.workflowExecution,
c.mutableState.GetLastFirstEventID(),
Expand Down Expand Up @@ -815,9 +809,9 @@ func (c *contextImpl) notifyTasks(
replicationTasks []persistence.Task,
timerTasks []persistence.Task,
) {
c.engine.NotifyNewTransferTasks(transferTasks)
c.engine.NotifyNewReplicationTasks(replicationTasks)
c.engine.NotifyNewTimerTasks(timerTasks)
c.shard.GetEngine().NotifyNewTransferTasks(transferTasks)
c.shard.GetEngine().NotifyNewReplicationTasks(replicationTasks)
c.shard.GetEngine().NotifyNewTimerTasks(timerTasks)
}

func (c *contextImpl) mergeContinueAsNewReplicationTasks(
Expand Down Expand Up @@ -1079,9 +1073,9 @@ func (c *contextImpl) ResetWorkflowExecution(
baseRunNextEventID int64,
) (retError error) {

now := c.timeSource.Now()
currTransferTasks := []persistence.Task{}
currTimerTasks := []persistence.Task{}
now := c.shard.GetTimeSource().Now()
var currTransferTasks []persistence.Task
var currTimerTasks []persistence.Task
if closeTask != nil {
currTransferTasks = append(currTransferTasks, closeTask)
}
Expand Down Expand Up @@ -1172,15 +1166,12 @@ func (c *contextImpl) ResetWorkflowExecution(
}

resetWFReq := &persistence.ResetWorkflowExecutionRequest{
BaseRunID: baseRunID,
BaseRunNextEventID: baseRunNextEventID,

CurrentRunID: currMutableState.GetExecutionInfo().RunID,
CurrentRunNextEventID: currMutableState.GetExecutionInfo().NextEventID,

BaseRunID: baseRunID,
BaseRunNextEventID: baseRunNextEventID,
CurrentRunID: currMutableState.GetExecutionInfo().RunID,
CurrentRunNextEventID: currMutableState.GetExecutionInfo().NextEventID,
CurrentWorkflowMutation: nil,

NewWorkflowSnapshot: *resetWorkflow,
NewWorkflowSnapshot: *resetWorkflow,
}

if updateCurr {
Expand Down
10 changes: 6 additions & 4 deletions service/history/replication/task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,12 @@ func (p *taskProcessorImpl) cleanupReplicationTaskLoop() {
timer.Stop()
return
case <-timer.C:
err := p.cleanupAckedReplicationTasks()
if err != nil {
p.logger.Error("Failed to clean up replication messages.", tag.Error(err))
p.metricsClient.Scope(metrics.ReplicationTaskCleanupScope).IncCounter(metrics.ReplicationTaskCleanupFailure)
if p.config.EnableCleanupReplicationTask() {
err := p.cleanupAckedReplicationTasks()
if err != nil {
p.logger.Error("Failed to clean up replication messages.", tag.Error(err))
p.metricsClient.Scope(metrics.ReplicationTaskCleanupScope).IncCounter(metrics.ReplicationTaskCleanupFailure)
}
}
timer.Reset(backoff.JitDuration(
p.config.ReplicationTaskProcessorCleanupInterval(shardID),
Expand Down

0 comments on commit 4f3b43c

Please sign in to comment.