Skip to content

Commit

Permalink
Revert "Make persistence config dynamic for system scanner (#3052)" (#…
Browse files Browse the repository at this point in the history
…3067)

This reverts commit 909c329.
  • Loading branch information
andrewjdawson2016 authored Feb 21, 2020
1 parent 909c329 commit c09186f
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 57 deletions.
11 changes: 5 additions & 6 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/uber/cadence/common/persistence/sql"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/common/service/dynamicconfig"
)

type (
Expand Down Expand Up @@ -139,7 +138,7 @@ func NewFactory(
logger: logger,
clusterName: clusterName,
}
limiters := buildRateLimiters(cfg)
limiters := buildRatelimiters(cfg)
factory.init(clusterName, limiters)
return factory
}
Expand Down Expand Up @@ -323,18 +322,18 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
f.datastores[storeTypeVisibility] = visibilityDataStore
}

func buildRateLimiters(cfg *config.Persistence) map[string]quotas.Limiter {
func buildRatelimiters(cfg *config.Persistence) map[string]quotas.Limiter {
result := make(map[string]quotas.Limiter, len(cfg.DataStores))
for dsName, ds := range cfg.DataStores {
var qps dynamicconfig.IntPropertyFn
qps := 0
if ds.Cassandra != nil {
qps = ds.Cassandra.MaxQPS
}
if ds.SQL != nil {
qps = ds.SQL.MaxQPS
}
if qps != nil {
result[dsName] = quotas.NewDynamicRateLimiter(func() float64 { return float64(qps()) })
if qps > 0 {
result[dsName] = quotas.NewSimpleRateLimiter(qps)
}
}
return result
Expand Down
4 changes: 2 additions & 2 deletions common/service/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ type (
// Datacenter is the data center filter arg for cassandra
Datacenter string `yaml:"datacenter"`
// MaxQPS is the max request rate to this datastore
MaxQPS dynamicconfig.IntPropertyFn `yaml:"-" json:"-"`
MaxQPS int `yaml:"maxQPS"`
// MaxConns is the max number of connections to this datastore for a single keyspace
MaxConns int `yaml:"maxConns"`
// TLS configuration
Expand All @@ -211,7 +211,7 @@ type (
// ConnectAttributes is a set of key-value attributes to be sent as part of connect data_source_name url
ConnectAttributes map[string]string `yaml:"connectAttributes"`
// MaxQPS the max request rate on this datastore
MaxQPS dynamicconfig.IntPropertyFn `yaml:"-" json:"-"`
MaxQPS int `yaml:"maxQPS"`
// MaxConns the max number of connections to this datastore
MaxConns int `yaml:"maxConns"`
// MaxIdleConns is the max number of idle connections to this datastore
Expand Down
8 changes: 2 additions & 6 deletions common/service/config/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@

package config

import (
"fmt"

"github.com/uber/cadence/common/service/dynamicconfig"
)
import "fmt"

const (
// StoreTypeSQL refers to sql based storage as persistence store
Expand All @@ -34,7 +30,7 @@ const (
)

// SetMaxQPS sets the MaxQPS value for the given datastore
func (c *Persistence) SetMaxQPS(key string, qps dynamicconfig.IntPropertyFn) {
func (c *Persistence) SetMaxQPS(key string, qps int) {
ds, ok := c.DataStores[key]
if !ok {
return
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func NewService(
serviceConfig := NewConfig(dynamicconfig.NewCollection(params.DynamicConfig, params.Logger), params.PersistenceConfig.NumHistoryShards, isAdvancedVisExistInConfig)

params.PersistenceConfig.HistoryMaxConns = serviceConfig.HistoryMgrNumConns()
params.PersistenceConfig.SetMaxQPS(params.PersistenceConfig.DefaultStore, serviceConfig.PersistenceMaxQPS)
params.PersistenceConfig.SetMaxQPS(params.PersistenceConfig.DefaultStore, serviceConfig.PersistenceMaxQPS())
params.PersistenceConfig.VisibilityConfig = &config.VisibilityConfig{
VisibilityListMaxQPS: serviceConfig.VisibilityListMaxQPS,
EnableSampling: serviceConfig.EnableVisibilitySampling,
Expand Down
2 changes: 1 addition & 1 deletion service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func NewService(
params.PersistenceConfig.IsAdvancedVisibilityConfigExist())

params.PersistenceConfig.HistoryMaxConns = serviceConfig.HistoryMgrNumConns()
params.PersistenceConfig.SetMaxQPS(params.PersistenceConfig.DefaultStore, serviceConfig.PersistenceMaxQPS)
params.PersistenceConfig.SetMaxQPS(params.PersistenceConfig.DefaultStore, serviceConfig.PersistenceMaxQPS())
params.PersistenceConfig.VisibilityConfig = &config.VisibilityConfig{
VisibilityOpenMaxQPS: serviceConfig.VisibilityOpenMaxQPS,
VisibilityClosedMaxQPS: serviceConfig.VisibilityClosedMaxQPS,
Expand Down
5 changes: 4 additions & 1 deletion service/matching/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ func NewService(
) (resource.Resource, error) {

serviceConfig := NewConfig(dynamicconfig.NewCollection(params.DynamicConfig, params.Logger))
params.PersistenceConfig.SetMaxQPS(params.PersistenceConfig.DefaultStore, serviceConfig.PersistenceMaxQPS)
params.PersistenceConfig.SetMaxQPS(
params.PersistenceConfig.DefaultStore,
serviceConfig.PersistenceMaxQPS(),
)
serviceResource, err := resource.New(
params,
common.MatchingServiceName,
Expand Down
1 change: 1 addition & 0 deletions service/worker/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func New(
) *Scanner {

cfg := params.Config
cfg.Persistence.SetMaxQPS(cfg.Persistence.DefaultStore, cfg.PersistenceMaxQPS())
zapLogger, err := zap.NewProduction()
if err != nil {
resource.GetLogger().Fatal("failed to initialize zap logger", tag.Error(err))
Expand Down
65 changes: 26 additions & 39 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ type (
Service struct {
resource.Resource

scannerResource resource.Resource // separate out scanner resource because it requires a different persistence QPS
status int32
stopC chan struct{}
params *service.BootstrapParams
config *Config
status int32
stopC chan struct{}
params *service.BootstrapParams
config *Config
}

// Config contains all the service config for worker
Expand All @@ -77,23 +76,33 @@ func NewService(
) (resource.Resource, error) {

serviceConfig := NewConfig(params)
serviceResource, err := getResource(params, serviceConfig.ThrottledLogRPS, serviceConfig.ReplicationCfg.PersistenceMaxQPS)
if err != nil {
return nil, err
}
scannerResource, err := getResource(params, serviceConfig.ThrottledLogRPS, serviceConfig.ScannerCfg.PersistenceMaxQPS)

params.PersistenceConfig.SetMaxQPS(
params.PersistenceConfig.DefaultStore,
serviceConfig.ReplicationCfg.PersistenceMaxQPS(),
)

serviceResource, err := resource.New(
params,
common.WorkerServiceName,
serviceConfig.ThrottledLogRPS,
func(
persistenceBean persistenceClient.Bean,
logger log.Logger,
) (persistence.VisibilityManager, error) {
return persistenceBean.GetVisibilityManager(), nil
},
)
if err != nil {
return nil, err
}

return &Service{
Resource: serviceResource,

scannerResource: scannerResource,
status: common.DaemonStatusInitialized,
config: serviceConfig,
params: params,
stopC: make(chan struct{}),
status: common.DaemonStatusInitialized,
config: serviceConfig,
params: params,
stopC: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -160,7 +169,6 @@ func (s *Service) Start() {
logger.Info("worker starting", tag.ComponentWorker)

s.Resource.Start()
s.scannerResource.Start()

s.ensureSystemDomainExists()
s.startScanner()
Expand Down Expand Up @@ -194,7 +202,6 @@ func (s *Service) Stop() {
close(s.stopC)

s.Resource.Stop()
s.scannerResource.Stop()

s.params.Logger.Info("worker stopped", tag.ComponentWorker)
}
Expand Down Expand Up @@ -232,7 +239,7 @@ func (s *Service) startScanner() {
Config: *s.config.ScannerCfg,
TallyScope: s.params.MetricScope,
}
if err := scanner.New(s.scannerResource, params).Start(); err != nil {
if err := scanner.New(s.Resource, params).Start(); err != nil {
s.GetLogger().Fatal("error starting scanner", tag.Error(err))
}
}
Expand Down Expand Up @@ -334,23 +341,3 @@ func (s *Service) registerSystemDomain() {
s.GetLogger().Fatal("failed to register system domain", tag.Error(err))
}
}

func getResource(
params *service.BootstrapParams,
throttledLogRPS dynamicconfig.IntPropertyFn,
persistenceMaxQPS dynamicconfig.IntPropertyFn,
) (resource.Resource, error) {

params.PersistenceConfig.SetMaxQPS(params.PersistenceConfig.DefaultStore, persistenceMaxQPS)
return resource.New(
params,
common.WorkerServiceName,
throttledLogRPS,
func(
persistenceBean persistenceClient.Bean,
logger log.Logger,
) (persistence.VisibilityManager, error) {
return persistenceBean.GetVisibilityManager(), nil
},
)
}
2 changes: 1 addition & 1 deletion tools/cli/domainUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func initializeMetadataMgr(
) persistence.MetadataManager {

pConfig := serviceConfig.Persistence
pConfig.SetMaxQPS(pConfig.DefaultStore, dynamicconfig.GetIntPropertyFn(dependencyMaxQPS))
pConfig.SetMaxQPS(pConfig.DefaultStore, dependencyMaxQPS)
pConfig.VisibilityConfig = &config.VisibilityConfig{
VisibilityListMaxQPS: dynamicconfig.GetIntPropertyFilteredByDomain(dependencyMaxQPS),
EnableSampling: dynamicconfig.GetBoolPropertyFn(false), // not used by domain operation
Expand Down

0 comments on commit c09186f

Please sign in to comment.