diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 57b9eb63a09..c46aee7b232 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1672,6 +1672,13 @@ const ( // Default value: false // Allowed filters: DomainID MatchingEnableTaskInfoLogByDomainID + // MatchingEnableTasklistGuardAgainstOwnershipShardLoss + // enables guards to prevent tasklists from processing if there is any detection that the host + // no longer is active or owns the shard + // KeyName: matching.enableTasklistGuardAgainstOwnershipLoss + // Value type: Bool + // Default value: false + MatchingEnableTasklistGuardAgainstOwnershipShardLoss // key for history @@ -4116,6 +4123,11 @@ var BoolKeys = map[BoolKey]DynamicBool{ Description: "MatchingEnableTaskInfoLogByDomainID is enables info level logs for decision/activity task based on the request domainID", DefaultValue: false, }, + MatchingEnableTasklistGuardAgainstOwnershipShardLoss: { + KeyName: "matching.enableTasklistGuardAgainstOwnershipLoss", + Description: "allows guards to ensure that tasklists don't continue processing if there's signal that they've lost ownership", + DefaultValue: false, + }, EventsCacheGlobalEnable: { KeyName: "history.eventsCacheGlobalEnable", Description: "EventsCacheGlobalEnable is enables global cache over all history shards", diff --git a/common/errors/taskListNotOwnedByHostError.go b/common/errors/taskListNotOwnedByHostError.go index 15a8ba21cce..b1c4a566a82 100644 --- a/common/errors/taskListNotOwnedByHostError.go +++ b/common/errors/taskListNotOwnedByHostError.go @@ -24,21 +24,21 @@ package errors import "fmt" -var _ error = &TaskListNotOwnnedByHostError{} +var _ error = &TaskListNotOwnedByHostError{} -type TaskListNotOwnnedByHostError struct { +type TaskListNotOwnedByHostError struct { OwnedByIdentity string MyIdentity string TasklistName string } -func (m *TaskListNotOwnnedByHostError) Error() string { +func (m *TaskListNotOwnedByHostError) Error() string { return fmt.Sprintf("task list is not owned by this host: OwnedBy: %s, Me: %s, Tasklist: %s", m.OwnedByIdentity, m.MyIdentity, m.TasklistName) } -func NewTaskListNotOwnnedByHostError(ownedByIdentity string, myIdentity string, tasklistName string) *TaskListNotOwnnedByHostError { - return &TaskListNotOwnnedByHostError{ +func NewTaskListNotOwnedByHostError(ownedByIdentity string, myIdentity string, tasklistName string) *TaskListNotOwnedByHostError { + return &TaskListNotOwnedByHostError{ OwnedByIdentity: ownedByIdentity, MyIdentity: myIdentity, TasklistName: tasklistName, diff --git a/common/log/tag/tags.go b/common/log/tag/tags.go index a0d3b762017..00aa2b21233 100644 --- a/common/log/tag/tags.go +++ b/common/log/tag/tags.go @@ -956,6 +956,12 @@ func VisibilityQuery(query string) Tag { return newStringTag("visibility-query", query) } +// MembershipChangeEvent is a predefined tag for when logging hashring change events, +// expected to be of type membership.ChangeEvent +func MembershipChangeEvent(event interface{}) Tag { + return newPredefinedDynamicTag("membership-change-event", event) +} + // Dynamic Uses reflection based logging for arbitrary values // for not very performant logging func Dynamic(key string, v interface{}) Tag { diff --git a/common/membership/resolver.go b/common/membership/resolver.go index 8afb0347d55..17d060fb2a4 100644 --- a/common/membership/resolver.go +++ b/common/membership/resolver.go @@ -25,6 +25,7 @@ package membership import ( "fmt" + "sync" "sync/atomic" "github.com/uber/cadence/common" @@ -84,6 +85,7 @@ type MultiringResolver struct { status int32 provider PeerProvider + mu sync.Mutex rings map[string]*ring } @@ -110,6 +112,7 @@ func NewMultiringResolver( provider: provider, rings: make(map[string]*ring), metrics: metricsClient, + mu: sync.Mutex{}, } for _, s := range services { @@ -130,6 +133,8 @@ func (rpo *MultiringResolver) Start() { rpo.provider.Start() + rpo.mu.Lock() + defer rpo.mu.Unlock() for _, ring := range rpo.rings { ring.Start() } @@ -145,6 +150,8 @@ func (rpo *MultiringResolver) Stop() { return } + rpo.mu.Lock() + defer rpo.mu.Unlock() for _, ring := range rpo.rings { ring.Stop() } @@ -163,6 +170,8 @@ func (rpo *MultiringResolver) EvictSelf() error { } func (rpo *MultiringResolver) getRing(service string) (*ring, error) { + rpo.mu.Lock() + defer rpo.mu.Unlock() ring, found := rpo.rings[service] if !found { return nil, fmt.Errorf("service %q is not tracked by Resolver", service) diff --git a/service/matching/config/config.go b/service/matching/config/config.go index 1e1e02184ab..a4c9b20abcd 100644 --- a/service/matching/config/config.go +++ b/service/matching/config/config.go @@ -81,6 +81,8 @@ type ( TaskDispatchRPSTTL time.Duration // task gc configuration MaxTimeBetweenTaskDeletes time.Duration + + EnableTasklistOwnershipGuard dynamicconfig.BoolPropertyFn } ForwarderConfig struct { @@ -158,6 +160,7 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config { EnableTasklistIsolation: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableTasklistIsolation), AllIsolationGroups: mapIGs(dc.GetListProperty(dynamicconfig.AllIsolationGroups)()), AsyncTaskDispatchTimeout: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.AsyncTaskDispatchTimeout), + EnableTasklistOwnershipGuard: dc.GetBoolProperty(dynamicconfig.MatchingEnableTasklistGuardAgainstOwnershipShardLoss), LocalPollWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalPollWaitTime), LocalTaskWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalTaskWaitTime), HostName: hostName, diff --git a/service/matching/config/config_test.go b/service/matching/config/config_test.go index d6073776930..9924a534bec 100644 --- a/service/matching/config/config_test.go +++ b/service/matching/config/config_test.go @@ -79,6 +79,7 @@ func TestNewConfig(t *testing.T) { "TaskDispatchRPS": {nil, 100000.0}, "TaskDispatchRPSTTL": {nil, time.Minute}, "MaxTimeBetweenTaskDeletes": {nil, time.Second}, + "EnableTasklistOwnershipGuard": {dynamicconfig.MatchingEnableTasklistGuardAgainstOwnershipShardLoss, false}, } client := dynamicconfig.NewInMemoryClient() for fieldName, expected := range fields { diff --git a/service/matching/handler/engine.go b/service/matching/handler/engine.go index a66cc5352fa..d685ae6189e 100644 --- a/service/matching/handler/engine.go +++ b/service/matching/handler/engine.go @@ -78,6 +78,8 @@ type ( } matchingEngineImpl struct { + shutdownCompletion *sync.WaitGroup + shutdown chan struct{} taskManager persistence.TaskManager clusterMetadata cluster.Metadata historyService history.Client @@ -120,7 +122,8 @@ var ( var _ Engine = (*matchingEngineImpl)(nil) // Asserts that interface is indeed implemented // NewEngine creates an instance of matching engine -func NewEngine(taskManager persistence.TaskManager, +func NewEngine( + taskManager persistence.TaskManager, clusterMetadata cluster.Metadata, historyService history.Client, matchingClient matching.Client, @@ -132,7 +135,10 @@ func NewEngine(taskManager persistence.TaskManager, partitioner partition.Partitioner, timeSource clock.TimeSource, ) Engine { + e := &matchingEngineImpl{ + shutdown: make(chan struct{}), + shutdownCompletion: &sync.WaitGroup{}, taskManager: taskManager, clusterMetadata: clusterMetadata, historyService: historyService, @@ -149,19 +155,24 @@ func NewEngine(taskManager persistence.TaskManager, partitioner: partitioner, timeSource: timeSource, } + + e.shutdownCompletion.Add(1) + go e.subscribeToMembershipChanges() + e.waitForQueryResultFn = e.waitForQueryResult return e } func (e *matchingEngineImpl) Start() { - // As task lists are initialized lazily nothing is done on startup at this point. } func (e *matchingEngineImpl) Stop() { + close(e.shutdown) // Executes Stop() on each task list outside of lock for _, l := range e.getTaskLists(math.MaxInt32) { l.Stop() } + e.shutdownCompletion.Wait() } func (e *matchingEngineImpl) getTaskLists(maxCount int) []tasklist.Manager { @@ -200,26 +211,9 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *tasklist.Identifier, t } e.taskListsLock.RUnlock() - // Defensive check to make sure we actually own the task list - // If we try to create a task list manager for a task list that is not owned by us, return an error - // The new task list manager will steal the task list from the current owner, which should only happen if - // the task list is owned by the current host. - taskListOwner, err := e.membershipResolver.Lookup(service.Matching, taskList.GetName()) + err := e.errIfShardLoss(taskList) if err != nil { - return nil, fmt.Errorf("failed to lookup task list owner: %w", err) - } - - self, err := e.membershipResolver.WhoAmI() - if err != nil { - return nil, fmt.Errorf("failed to lookup self im membership: %w", err) - } - - if taskListOwner.Identity() != self.Identity() { - return nil, cadence_errors.NewTaskListNotOwnnedByHostError( - taskListOwner.Identity(), - self.Identity(), - taskList.GetName(), - ) + return nil, err } // If it gets here, write lock and check again in case a task list is created between the two locks @@ -1202,6 +1196,64 @@ func (e *matchingEngineImpl) emitInfoOrDebugLog( } } +func (e *matchingEngineImpl) errIfShardLoss(taskList *tasklist.Identifier) error { + if !e.config.EnableTasklistOwnershipGuard() { + return nil + } + + self, err := e.membershipResolver.WhoAmI() + if err != nil { + return fmt.Errorf("failed to lookup self im membership: %w", err) + } + + if e.isShuttingDown() { + e.logger.Warn("request to get tasklist is being rejected because engine is shutting down", + tag.WorkflowDomainID(taskList.GetDomainID()), + tag.WorkflowTaskListType(taskList.GetType()), + tag.WorkflowTaskListName(taskList.GetName()), + ) + + return cadence_errors.NewTaskListNotOwnedByHostError( + "not known", + self.Identity(), + taskList.GetName(), + ) + } + + // Defensive check to make sure we actually own the task list + // If we try to create a task list manager for a task list that is not owned by us, return an error + // The new task list manager will steal the task list from the current owner, which should only happen if + // the task list is owned by the current host. + taskListOwner, err := e.membershipResolver.Lookup(service.Matching, taskList.GetName()) + if err != nil { + return fmt.Errorf("failed to lookup task list owner: %w", err) + } + + if taskListOwner.Identity() != self.Identity() { + e.logger.Warn("Request to get tasklist is being rejected because engine does not own this shard", + tag.WorkflowDomainID(taskList.GetDomainID()), + tag.WorkflowTaskListType(taskList.GetType()), + tag.WorkflowTaskListName(taskList.GetName()), + ) + return cadence_errors.NewTaskListNotOwnedByHostError( + taskListOwner.Identity(), + self.Identity(), + taskList.GetName(), + ) + } + + return nil +} + +func (e *matchingEngineImpl) isShuttingDown() bool { + select { + case <-e.shutdown: + return true + default: + return false + } +} + func (m *lockableQueryTaskMap) put(key string, value chan *queryResult) { m.Lock() defer m.Unlock() diff --git a/service/matching/handler/engine_integration_test.go b/service/matching/handler/engine_integration_test.go index a3e613c60e8..73c3201a1e9 100644 --- a/service/matching/handler/engine_integration_test.go +++ b/service/matching/handler/engine_integration_test.go @@ -34,7 +34,6 @@ import ( "github.com/golang/mock/gomock" "github.com/pborman/uuid" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "github.com/uber-go/tally" "go.uber.org/yarpc" @@ -45,7 +44,6 @@ import ( "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/dynamicconfig" - cadence_errors "github.com/uber/cadence/common/errors" "github.com/uber/cadence/common/isolationgroup/defaultisolationgroupstate" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" @@ -55,6 +53,7 @@ import ( "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/partition" "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/matching/config" "github.com/uber/cadence/service/matching/tasklist" @@ -131,6 +130,7 @@ func (s *matchingEngineSuite) SetupTest() { s.mockMembershipResolver = membership.NewMockResolver(s.controller) s.mockMembershipResolver.EXPECT().Lookup(gomock.Any(), gomock.Any()).Return(membership.HostInfo{}, nil).AnyTimes() s.mockMembershipResolver.EXPECT().WhoAmI().Return(membership.HostInfo{}, nil).AnyTimes() + s.mockMembershipResolver.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).AnyTimes() s.mockIsolationStore = dynamicconfig.NewMockClient(s.controller) dcClient := dynamicconfig.NewInMemoryClient() dcClient.UpdateValue(dynamicconfig.EnableTasklistIsolation, true) @@ -1303,58 +1303,6 @@ func (s *matchingEngineSuite) TestConfigDefaultHostName() { s.EqualValues(configEmpty.HostName, "") } -func (s *matchingEngineSuite) TestGetTaskListManager_OwnerShip() { - testCases := []struct { - name string - lookUpResult string - lookUpErr error - whoAmIResult string - whoAmIErr error - - expectedError error - }{ - { - name: "Not owned by current host", - lookUpResult: "A", - whoAmIResult: "B", - expectedError: new(cadence_errors.TaskListNotOwnnedByHostError), - }, - { - name: "LookupError", - lookUpErr: assert.AnError, - expectedError: assert.AnError, - }, - { - name: "WhoAmIError", - whoAmIErr: assert.AnError, - expectedError: assert.AnError, - }, - } - - for _, tc := range testCases { - s.T().Run(tc.name, func(t *testing.T) { - resolverMock := membership.NewMockResolver(s.controller) - s.matchingEngine.membershipResolver = resolverMock - - resolverMock.EXPECT().Lookup(gomock.Any(), gomock.Any()).Return( - membership.NewDetailedHostInfo("", tc.lookUpResult, make(membership.PortMap)), tc.lookUpErr, - ).AnyTimes() - resolverMock.EXPECT().WhoAmI().Return( - membership.NewDetailedHostInfo("", tc.whoAmIResult, make(membership.PortMap)), tc.whoAmIErr, - ).AnyTimes() - - taskListKind := types.TaskListKindNormal - - _, err := s.matchingEngine.getTaskListManager( - tasklist.NewTestTaskListID(s.T(), "domain", "tasklist", persistence.TaskListTypeActivity), - &taskListKind, - ) - - assert.ErrorAs(s.T(), err, &tc.expectedError) - }) - } -} - func newActivityTaskScheduledEvent(eventID int64, decisionTaskCompletedEventID int64, scheduleAttributes *types.ScheduleActivityTaskDecisionAttributes) *types.HistoryEvent { historyEvent := newHistoryEvent(eventID, types.EventTypeActivityTaskScheduled) @@ -1402,6 +1350,7 @@ func defaultTestConfig() *config.Config { config.GetTasksBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(10) config.AsyncTaskDispatchTimeout = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond) config.MaxTimeBetweenTaskDeletes = time.Duration(0) + config.EnableTasklistOwnershipGuard = func(opts ...dynamicconfig.FilterOption) bool { return true } return config } diff --git a/service/matching/handler/engine_test.go b/service/matching/handler/engine_test.go index 87c84088362..66004d08c84 100644 --- a/service/matching/handler/engine_test.go +++ b/service/matching/handler/engine_test.go @@ -26,6 +26,7 @@ import ( "context" "errors" "fmt" + "sync" "testing" "github.com/golang/mock/gomock" @@ -36,8 +37,10 @@ import ( "github.com/uber/cadence/common/client" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/log/loggerimpl" "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/matching/config" "github.com/uber/cadence/service/matching/tasklist" @@ -617,3 +620,108 @@ func TestWaitForQueryResult(t *testing.T) { }) } } + +func TestIsShuttingDown(t *testing.T) { + wg := sync.WaitGroup{} + wg.Add(0) + e := matchingEngineImpl{ + shutdownCompletion: &wg, + shutdown: make(chan struct{}), + } + e.Start() + assert.False(t, e.isShuttingDown()) + e.Stop() + assert.True(t, e.isShuttingDown()) +} + +func TestGetTasklistsNotOwned(t *testing.T) { + + ctrl := gomock.NewController(t) + resolver := membership.NewMockResolver(ctrl) + + resolver.EXPECT().WhoAmI().Return(membership.NewDetailedHostInfo("self", "host123", nil), nil) + + tl1, _ := tasklist.NewIdentifier("", "tl1", 0) + tl2, _ := tasklist.NewIdentifier("", "tl2", 0) + tl3, _ := tasklist.NewIdentifier("", "tl3", 0) + + tl1m := tasklist.NewMockManager(ctrl) + tl2m := tasklist.NewMockManager(ctrl) + tl3m := tasklist.NewMockManager(ctrl) + + resolver.EXPECT().Lookup(service.Matching, tl1.GetName()).Return(membership.NewDetailedHostInfo("", "host123", nil), nil) + resolver.EXPECT().Lookup(service.Matching, tl2.GetName()).Return(membership.NewDetailedHostInfo("", "host456", nil), nil) + resolver.EXPECT().Lookup(service.Matching, tl3.GetName()).Return(membership.NewDetailedHostInfo("", "host123", nil), nil) + + e := matchingEngineImpl{ + shutdown: make(chan struct{}), + membershipResolver: resolver, + taskListsLock: sync.RWMutex{}, + taskLists: map[tasklist.Identifier]tasklist.Manager{ + *tl1: tl1m, + *tl2: tl2m, + *tl3: tl3m, + }, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + logger: loggerimpl.NewNopLogger(), + } + + tls, err := e.getNonOwnedTasklistsLocked() + assert.NoError(t, err) + + assert.Equal(t, []tasklist.Manager{tl2m}, tls) +} + +func TestShutDownTasklistsNotOwned(t *testing.T) { + + ctrl := gomock.NewController(t) + resolver := membership.NewMockResolver(ctrl) + + resolver.EXPECT().WhoAmI().Return(membership.NewDetailedHostInfo("self", "host123", nil), nil) + + tl1, _ := tasklist.NewIdentifier("", "tl1", 0) + tl2, _ := tasklist.NewIdentifier("", "tl2", 0) + tl3, _ := tasklist.NewIdentifier("", "tl3", 0) + + tl1m := tasklist.NewMockManager(ctrl) + tl2m := tasklist.NewMockManager(ctrl) + tl3m := tasklist.NewMockManager(ctrl) + + resolver.EXPECT().Lookup(service.Matching, tl1.GetName()).Return(membership.NewDetailedHostInfo("", "host123", nil), nil) + resolver.EXPECT().Lookup(service.Matching, tl2.GetName()).Return(membership.NewDetailedHostInfo("", "host456", nil), nil) + resolver.EXPECT().Lookup(service.Matching, tl3.GetName()).Return(membership.NewDetailedHostInfo("", "host123", nil), nil) + + e := matchingEngineImpl{ + shutdown: make(chan struct{}), + membershipResolver: resolver, + taskListsLock: sync.RWMutex{}, + taskLists: map[tasklist.Identifier]tasklist.Manager{ + *tl1: tl1m, + *tl2: tl2m, + *tl3: tl3m, + }, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + metricsClient: metrics.NewNoopMetricsClient(), + logger: loggerimpl.NewNopLogger(), + } + + wg := sync.WaitGroup{} + + wg.Add(1) + + tl2m.EXPECT().TaskListID().Return(tl2).AnyTimes() + tl2m.EXPECT().String().AnyTimes() + + tl2m.EXPECT().Stop().Do(func() { + wg.Done() + }) + + err := e.shutDownNonOwnedTasklists() + wg.Wait() + + assert.NoError(t, err) +} diff --git a/service/matching/handler/interfaces.go b/service/matching/handler/interfaces.go index 4c82fa334b3..21f39f160e0 100644 --- a/service/matching/handler/interfaces.go +++ b/service/matching/handler/interfaces.go @@ -34,7 +34,8 @@ import ( type ( // Engine exposes interfaces for clients to poll for activity and decision tasks. Engine interface { - Stop() + common.Daemon + AddDecisionTask(hCtx *handlerContext, request *types.AddDecisionTaskRequest) (syncMatch bool, err error) AddActivityTask(hCtx *handlerContext, request *types.AddActivityTaskRequest) (syncMatch bool, err error) PollForDecisionTask(hCtx *handlerContext, request *types.MatchingPollForDecisionTaskRequest) (*types.MatchingPollForDecisionTaskResponse, error) diff --git a/service/matching/handler/interfaces_mock.go b/service/matching/handler/interfaces_mock.go index e94141e872f..78401a46838 100644 --- a/service/matching/handler/interfaces_mock.go +++ b/service/matching/handler/interfaces_mock.go @@ -206,6 +206,18 @@ func (mr *MockEngineMockRecorder) RespondQueryTaskCompleted(hCtx, request interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RespondQueryTaskCompleted", reflect.TypeOf((*MockEngine)(nil).RespondQueryTaskCompleted), hCtx, request) } +// Start mocks base method. +func (m *MockEngine) Start() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start. +func (mr *MockEngineMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockEngine)(nil).Start)) +} + // Stop mocks base method. func (m *MockEngine) Stop() { m.ctrl.T.Helper() diff --git a/service/matching/handler/membership.go b/service/matching/handler/membership.go new file mode 100644 index 00000000000..88964d19dd7 --- /dev/null +++ b/service/matching/handler/membership.go @@ -0,0 +1,150 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package handler + +import ( + "fmt" + "sync" + + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/service" + "github.com/uber/cadence/service/matching/tasklist" +) + +const subscriptionBufferSize = 1000 + +// Because there's a bunch of conditions under which matching may be holding a tasklist +// reader daemon and other live procesess but when it doesn't (according to the rest of the hashring) +// own the tasklist anymore, this listener watches for membership changes and purges anything disused +// in the hashring on membership changes. +// +// Combined with the guard on tasklist instantiation, it should prevent incorrect or poorly timed +// creating of tasklist ownership and database shard thrashing between hosts while they figure out +// which host is the real owner of the tasklist. +// +// This is not the main shutdown process, its just an optimization. +func (e *matchingEngineImpl) subscribeToMembershipChanges() { + defer func() { + if r := recover(); r != nil { + e.logger.Error("matching membership watcher changes caused a panic, recovering", tag.Dynamic("recovered-panic", r)) + } + }() + + defer e.shutdownCompletion.Done() + + if !e.config.EnableTasklistOwnershipGuard() { + return + } + + listener := make(chan *membership.ChangedEvent, subscriptionBufferSize) + e.membershipResolver.Subscribe(service.Matching, "matching-engine", listener) + + for { + select { + case event := <-listener: + err := e.shutDownNonOwnedTasklists() + if err != nil { + e.logger.Error("Error while trying to determine if tasklists have been shutdown", + tag.Error(err), + tag.MembershipChangeEvent(event), + ) + } + case <-e.shutdown: + return + } + } +} + +func (e *matchingEngineImpl) shutDownNonOwnedTasklists() error { + if !e.config.EnableTasklistOwnershipGuard() { + return nil + } + noLongerOwned, err := e.getNonOwnedTasklistsLocked() + if err != nil { + return err + } + + tasklistsShutdownWG := sync.WaitGroup{} + + for _, tl := range noLongerOwned { + // for each of the tasklists that are no longer owned, kick off the + // process of stopping them. The stopping process is IO heavy and + // can take a while, so do them in parallel to efficiently unload tasklists not owned + tasklistsShutdownWG.Add(1) + go func(tl tasklist.Manager) { + + defer func() { + if r := recover(); r != nil { + e.logger.Error("panic occurred while trying to shut down tasklist", tag.Dynamic("recovered-panic", r)) + } + }() + defer tasklistsShutdownWG.Done() + + e.logger.Info("shutting down tasklist preemptively because they are no longer owned by this host", + tag.WorkflowTaskListType(tl.TaskListID().GetType()), + tag.WorkflowTaskListName(tl.TaskListID().GetName()), + tag.WorkflowDomainID(tl.TaskListID().GetDomainID()), + tag.Dynamic("tasklist-debug-info", tl.String()), + ) + + e.unloadTaskList(tl) + }(tl) + } + + tasklistsShutdownWG.Wait() + + return nil +} + +func (e *matchingEngineImpl) getNonOwnedTasklistsLocked() ([]tasklist.Manager, error) { + if !e.config.EnableTasklistOwnershipGuard() { + return nil, nil + } + + var toShutDown []tasklist.Manager + + e.taskListsLock.RLock() + defer e.taskListsLock.RUnlock() + + self, err := e.membershipResolver.WhoAmI() + if err != nil { + return nil, fmt.Errorf("failed to lookup self im membership: %w", err) + } + + for tl, manager := range e.taskLists { + taskListOwner, err := e.membershipResolver.Lookup(service.Matching, tl.GetName()) + if err != nil { + return nil, fmt.Errorf("failed to lookup task list owner: %w", err) + } + + if taskListOwner.Identity() != self.Identity() { + toShutDown = append(toShutDown, manager) + } + } + + e.logger.Info("Got list of non-owned-tasklists", + tag.Dynamic("tasklist-debug-info", toShutDown), + ) + return toShutDown, nil +} diff --git a/service/matching/handler/membership_test.go b/service/matching/handler/membership_test.go new file mode 100644 index 00000000000..2fcdf50d5d8 --- /dev/null +++ b/service/matching/handler/membership_test.go @@ -0,0 +1,307 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package handler + +import ( + "sync" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/uber-go/tally" + + "github.com/uber/cadence/client/history" + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/dynamicconfig" + cadence_errors "github.com/uber/cadence/common/errors" + "github.com/uber/cadence/common/log/loggerimpl" + "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/resource" + "github.com/uber/cadence/common/service" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/matching/config" + "github.com/uber/cadence/service/matching/tasklist" +) + +func TestGetTaskListManager_OwnerShip(t *testing.T) { + + testCases := []struct { + name string + lookUpResult string + lookUpErr error + whoAmIResult string + whoAmIErr error + tasklistGuardEnabled bool + + expectedError error + }{ + { + name: "Not owned by current host", + lookUpResult: "A", + whoAmIResult: "B", + tasklistGuardEnabled: true, + + expectedError: new(cadence_errors.TaskListNotOwnedByHostError), + }, + { + name: "LookupError", + lookUpErr: assert.AnError, + tasklistGuardEnabled: true, + expectedError: assert.AnError, + }, + { + name: "WhoAmIError", + whoAmIErr: assert.AnError, + tasklistGuardEnabled: true, + expectedError: assert.AnError, + }, + { + name: "when feature is not enabled, expect previous behaviour to continue", + lookUpResult: "A", + whoAmIResult: "B", + tasklistGuardEnabled: false, + + expectedError: nil, + }, + } + + for _, tc := range testCases { + + t.Run(tc.name, func(t *testing.T) { + + ctrl := gomock.NewController(t) + logger := loggerimpl.NewNopLogger() + + mockTimeSource := clock.NewMockedTimeSourceAt(time.Now()) + taskManager := tasklist.NewTestTaskManager(t, logger, mockTimeSource) + mockHistoryClient := history.NewMockClient(ctrl) + mockDomainCache := cache.NewMockDomainCache(ctrl) + resolverMock := membership.NewMockResolver(ctrl) + resolverMock.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).AnyTimes() + + // this is only if the call goes through + mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(cache.CreateDomainCacheEntry(matchingTestDomainName), nil).AnyTimes() + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.CreateDomainCacheEntry(matchingTestDomainName), nil).AnyTimes() + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return(matchingTestDomainName, nil).AnyTimes() + + config := defaultTestConfig() + taskListEnabled := tc.tasklistGuardEnabled + config.EnableTasklistOwnershipGuard = func(opts ...dynamicconfig.FilterOption) bool { + return taskListEnabled + } + + matchingEngine := NewEngine( + taskManager, + cluster.GetTestClusterMetadata(true), + mockHistoryClient, + nil, + config, + logger, + metrics.NewClient(tally.NoopScope, metrics.Matching), + mockDomainCache, + resolverMock, + nil, + mockTimeSource, + ).(*matchingEngineImpl) + + resolverMock.EXPECT().Lookup(gomock.Any(), gomock.Any()).Return( + membership.NewDetailedHostInfo("", tc.lookUpResult, make(membership.PortMap)), tc.lookUpErr, + ).AnyTimes() + resolverMock.EXPECT().WhoAmI().Return( + membership.NewDetailedHostInfo("", tc.whoAmIResult, make(membership.PortMap)), tc.whoAmIErr, + ).AnyTimes() + + taskListKind := types.TaskListKindNormal + + _, err := matchingEngine.getTaskListManager( + tasklist.NewTestTaskListID(t, "domain", "tasklist", persistence.TaskListTypeActivity), + &taskListKind, + ) + if tc.expectedError != nil { + assert.ErrorAs(t, err, &tc.expectedError) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestMembershipSubscriptionShutdown(t *testing.T) { + assert.NotPanics(t, func() { + ctrl := gomock.NewController(t) + m := membership.NewMockResolver(ctrl) + + m.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).Times(1) + + e := matchingEngineImpl{ + membershipResolver: m, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + shutdown: make(chan struct{}), + logger: loggerimpl.NewNopLogger(), + } + + go func() { + time.Sleep(time.Second) + close(e.shutdown) + }() + e.subscribeToMembershipChanges() + }) +} + +func TestMembershipSubscriptionPanicHandling(t *testing.T) { + assert.NotPanics(t, func() { + ctrl := gomock.NewController(t) + + r := resource.NewTest(t, ctrl, 0) + r.MembershipResolver.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).DoAndReturn(func(_, _, _ any) { + panic("a panic has occurred") + }) + + e := matchingEngineImpl{ + membershipResolver: r.MembershipResolver, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + logger: loggerimpl.NewNopLogger(), + shutdown: make(chan struct{}), + } + + e.subscribeToMembershipChanges() + }) +} + +func TestSubscriptionAndShutdown(t *testing.T) { + ctrl := gomock.NewController(t) + m := membership.NewMockResolver(ctrl) + + shutdownWG := &sync.WaitGroup{} + shutdownWG.Add(1) + + e := matchingEngineImpl{ + shutdownCompletion: shutdownWG, + membershipResolver: m, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + shutdown: make(chan struct{}), + logger: loggerimpl.NewNopLogger(), + } + + // anytimes here because this is quite a racy test and the actual assertions for the unsubscription logic will be separated out + m.EXPECT().WhoAmI().Return(membership.NewDetailedHostInfo("host2", "host2", nil), nil).AnyTimes() + m.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).Do( + func(service string, name string, inc chan<- *membership.ChangedEvent) { + m := membership.ChangedEvent{ + HostsAdded: nil, + HostsUpdated: nil, + HostsRemoved: []string{"host123"}, + } + inc <- &m + }) + + go func() { + // then call stop so the test can finish + time.Sleep(time.Second) + e.Stop() + }() + + e.subscribeToMembershipChanges() +} + +func TestSubscriptionAndErrorReturned(t *testing.T) { + ctrl := gomock.NewController(t) + m := membership.NewMockResolver(ctrl) + + shutdownWG := sync.WaitGroup{} + shutdownWG.Add(1) + + e := matchingEngineImpl{ + shutdownCompletion: &shutdownWG, + membershipResolver: m, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + shutdown: make(chan struct{}), + logger: loggerimpl.NewNopLogger(), + } + + // this should trigger the error case on a membership event + m.EXPECT().WhoAmI().Return(membership.HostInfo{}, assert.AnError).AnyTimes() + + m.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).Do( + func(service string, name string, inc chan<- *membership.ChangedEvent) { + m := membership.ChangedEvent{ + HostsAdded: nil, + HostsUpdated: nil, + HostsRemoved: []string{"host123"}, + } + inc <- &m + }) + + go func() { + // then call stop so the test can finish + time.Sleep(time.Second) + e.Stop() + }() + + e.subscribeToMembershipChanges() +} + +func TestGetTasklistManagerShutdownScenario(t *testing.T) { + ctrl := gomock.NewController(t) + m := membership.NewMockResolver(ctrl) + + self := membership.NewDetailedHostInfo("self", "self", nil) + + m.EXPECT().WhoAmI().Return(self, nil).AnyTimes() + + shutdownWG := sync.WaitGroup{} + shutdownWG.Add(0) + + e := matchingEngineImpl{ + shutdownCompletion: &shutdownWG, + membershipResolver: m, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + shutdown: make(chan struct{}), + logger: loggerimpl.NewNopLogger(), + } + + // set this engine to be shutting down so as to trigger the tasklistGetTasklistByID guard + e.Stop() + + tl, _ := tasklist.NewIdentifier("domainid", "tl", 0) + kind := types.TaskListKindNormal + res, err := e.getTaskListManager(tl, &kind) + assertErr := &cadence_errors.TaskListNotOwnedByHostError{} + assert.ErrorAs(t, err, &assertErr) + assert.Nil(t, res) +}