Skip to content

Commit

Permalink
Fix race condition in integration test (uber#2871)
Browse files Browse the repository at this point in the history
* fix race condition for archiver provider

* fix race condition for persistence config

* pr comments

* fix test

* fix race condition for archiver provide in another way
  • Loading branch information
yycptt authored Nov 27, 2019
1 parent 41f1acc commit dc5f154
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 6 deletions.
3 changes: 3 additions & 0 deletions common/archiver/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func (p *archiverProvider) RegisterBootstrapContainer(
historyContainer *archiver.HistoryBootstrapContainer,
visibilityContainter *archiver.VisibilityBootstrapContainer,
) error {
p.Lock()
defer p.Unlock()

if _, ok := p.historyContainers[serviceName]; ok && historyContainer != nil {
return ErrBootstrapContainerAlreadyRegistered
}
Expand Down
55 changes: 49 additions & 6 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package host

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -401,13 +402,19 @@ func (c *cadenceImpl) startFrontend(hosts map[string][]string, startWG *sync.Wai
params.ClusterMetadata = c.clusterMetadata
params.DispatcherProvider = c.dispatcherProvider
params.MessagingClient = c.messagingClient
params.PersistenceConfig = c.persistenceConfig
params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger))
params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNopClient())
params.ArchivalMetadata = c.archiverMetadata
params.ArchiverProvider = c.archiverProvider
params.ESConfig = c.esConfig
params.ESClient = c.esClient

var err error
params.PersistenceConfig, err = copyPersistenceConfig(c.persistenceConfig)
if err != nil {
c.logger.Fatal("Failed to copy persistence config for frontend", tag.Error(err))
}

if c.esConfig != nil {
esDataStoreName := "es-visibility"
params.PersistenceConfig.AdvancedVisibilityStore = esDataStoreName
Expand Down Expand Up @@ -459,17 +466,21 @@ func (c *cadenceImpl) startHistory(
params.ClusterMetadata = c.clusterMetadata
params.DispatcherProvider = c.dispatcherProvider
params.MessagingClient = c.messagingClient
params.PersistenceConfig = c.persistenceConfig
params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger))
params.DynamicConfig = dynamicconfig.NewNopClient()
dispatcher, err := params.DispatcherProvider.Get(common.FrontendServiceName, c.FrontendAddress())
if err != nil {
c.logger.Fatal("Failed to get dispatcher for frontend", tag.Error(err))
c.logger.Fatal("Failed to get dispatcher for history", tag.Error(err))
}
params.PublicClient = cwsc.New(dispatcher.ClientConfig(common.FrontendServiceName))
params.ArchivalMetadata = c.archiverMetadata
params.ArchiverProvider = c.archiverProvider

params.PersistenceConfig, err = copyPersistenceConfig(c.persistenceConfig)
if err != nil {
c.logger.Fatal("Failed to copy persistence config for history", tag.Error(err))
}

service := service.New(params)
c.historyService = service
hConfig := c.historyConfig
Expand Down Expand Up @@ -557,12 +568,17 @@ func (c *cadenceImpl) startMatching(hosts map[string][]string, startWG *sync.Wai
params.MembershipFactory = newMembershipFactory(params.Name, hosts)
params.ClusterMetadata = c.clusterMetadata
params.DispatcherProvider = c.dispatcherProvider
params.PersistenceConfig = c.persistenceConfig
params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger))
params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNopClient())
params.ArchivalMetadata = c.archiverMetadata
params.ArchiverProvider = c.archiverProvider

var err error
params.PersistenceConfig, err = copyPersistenceConfig(c.persistenceConfig)
if err != nil {
c.logger.Fatal("Failed to copy persistence config for matching", tag.Error(err))
}

matchingService, err := matching.NewService(params)
if err != nil {
params.Logger.Fatal("unable to start matching service", tag.Error(err))
Expand Down Expand Up @@ -594,15 +610,20 @@ func (c *cadenceImpl) startWorker(hosts map[string][]string, startWG *sync.WaitG
params.MembershipFactory = newMembershipFactory(params.Name, hosts)
params.ClusterMetadata = c.clusterMetadata
params.DispatcherProvider = c.dispatcherProvider
params.PersistenceConfig = c.persistenceConfig
params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger))
params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNopClient())
params.ArchivalMetadata = c.archiverMetadata
params.ArchiverProvider = c.archiverProvider

var err error
params.PersistenceConfig, err = copyPersistenceConfig(c.persistenceConfig)
if err != nil {
c.logger.Fatal("Failed to copy persistence config for worker", tag.Error(err))
}

dispatcher, err := params.DispatcherProvider.Get(common.FrontendServiceName, c.FrontendAddress())
if err != nil {
c.logger.Fatal("Failed to get dispatcher for frontend", tag.Error(err))
c.logger.Fatal("Failed to get dispatcher for worker", tag.Error(err))
}
params.PublicClient = cwsc.New(dispatcher.ClientConfig(common.FrontendServiceName))
service := service.New(params)
Expand Down Expand Up @@ -742,6 +763,28 @@ func (c *cadenceImpl) GetExecutionManagerFactory() persistence.ExecutionManagerF
return c.executionMgrFactory
}

// copyPersistenceConfig makes a deepcopy of persistence config.
// This is just a temp fix for the race condition of persistence config.
// The race condition happens because all the services are using the same datastore map in the config.
// Also all services will retry to modify the maxQPS field in the datastore during start up and use the modified maxQPS value to create a persistence factory.
func copyPersistenceConfig(pConfig config.Persistence) (config.Persistence, error) {
copiedDataStores := make(map[string]config.DataStore)
for name, value := range pConfig.DataStores {
copiedDataStore := config.DataStore{}
encodedDataStore, err := json.Marshal(value)
if err != nil {
return pConfig, err
}

if err = json.Unmarshal(encodedDataStore, &copiedDataStore); err != nil {
return pConfig, err
}
copiedDataStores[name] = copiedDataStore
}
pConfig.DataStores = copiedDataStores
return pConfig, nil
}

func newMembershipFactory(serviceName string, hosts map[string][]string) service.MembershipMonitorFactory {
return &membershipFactoryImpl{
serviceName: serviceName,
Expand Down

0 comments on commit dc5f154

Please sign in to comment.