Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added config store functionality (initial implementation) #4357

Merged
merged 57 commits into from
Aug 13, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
a98f461
added idl api and blob definitions
aliue Jun 23, 2021
65af1c1
Merge branch 'master' into configurationStore
aliue Jun 24, 2021
99aec05
added internal types for configStore API
aliue Jun 24, 2021
aed08c5
added thrift mapper implementation for service API
aliue Jun 24, 2021
38166d1
moved proto API definitions to proto internal
aliue Jun 24, 2021
c5fc1da
added proto mapper for configStore API
aliue Jun 24, 2021
90e0961
added dummy API functions
aliue Jun 24, 2021
6aa2e7a
changed value to datablob type in proto schema
aliue Jun 28, 2021
c9b6f6a
changed internal types and mappers for datablob values
aliue Jun 28, 2021
ef4bd9b
removed db blob definition
aliue Jun 28, 2021
810916e
changed mapper to reflect move of shared DynamicConfig thrift types
aliue Jun 29, 2021
038f940
added blob definition, thrift mapper, and serialize/deserialize for P…
aliue Jun 29, 2021
b6ebe54
updated configStore branch with master
aliue Jul 1, 2021
c196b69
removed get_all and filters field of GetDynamicConfigRequest
aliue Jul 2, 2021
da3a551
added configStore and configStoreManager dummy implementations
aliue Jul 2, 2021
5bb77f9
added configStoreCRUD for nosqlplugin
aliue Jul 2, 2021
f2ba536
started cassandra plugin configStore.go
aliue Jul 2, 2021
06dd59c
added conditional insert for cassandra
aliue Jul 2, 2021
d3587b6
initial persistence layer implementation
aliue Jul 7, 2021
f098dfb
fixed typo
aliue Jul 7, 2021
88a9d8e
added support for config store to persistence clients
aliue Jul 7, 2021
40a7fbc
initial dc client implementation
aliue Jul 19, 2021
f318ab6
moved dc client and finished updateValue impl
aliue Jul 20, 2021
28bf505
fixed UpdateValue for client
aliue Jul 22, 2021
cd83142
fixed exported definitions
aliue Jul 22, 2021
d816107
change to align toward code base conventions and other minor details
aliue Jul 22, 2021
12f4662
added timeouts for fetch and update
aliue Jul 22, 2021
0f80a06
initial tests
aliue Jul 27, 2021
97afde5
merge up to date with master branch
aliue Jul 27, 2021
0eb03d6
conform to conventions with new update
aliue Jul 27, 2021
dd4d58d
added persistence tests (initial set) along with cassandra schema update
aliue Jul 27, 2021
3f1a02d
config store client tests, unresolved map test issue
aliue Jul 30, 2021
5cfabb7
fixed variable naming
aliue Jul 30, 2021
3d82055
added configStore functions to adminHandler
aliue Aug 3, 2021
07481e9
improved admin handler and changed GetDynamicConfigRequest struct
aliue Aug 3, 2021
3f298c0
added cli commands for config store
aliue Aug 5, 2021
87c29da
added dc client unit tests for Restore and List Value, fixed client l…
aliue Aug 5, 2021
d322e6f
shifted configstoreclientconfig to new package, added some adminHandl…
aliue Aug 6, 2021
5f2970f
fixed runtime panic error and improved cli interaction
aliue Aug 10, 2021
820b6d7
made changes to idl schema and fixed restore behavior
aliue Aug 11, 2021
8add81a
fixed client test race conditions
aliue Aug 11, 2021
f9e4309
realized still needed list request and cli get with no filter specifi…
aliue Aug 11, 2021
9f592a3
select dc client impl based on config file rather than hardcoded default
aliue Aug 11, 2021
22f115c
commiting .gen changes
aliue Aug 12, 2021
0c40f8c
change back to official idl repo
aliue Aug 12, 2021
bf6ae43
added back in configStore thrift
aliue Aug 12, 2021
e4a5654
fixed metric test error
aliue Aug 12, 2021
64ff26b
changes made to reflect idl changes
aliue Aug 12, 2021
e11a480
improvements such as backward dc client config compatibility, default…
aliue Aug 13, 2021
ea50165
added case to persistence test base for sql
aliue Aug 13, 2021
9754275
Revert "added case to persistence test base for sql"
aliue Aug 13, 2021
467261a
added skips to config store persistence test for non-Cass dbs
aliue Aug 13, 2021
c9076f5
changes to config yaml files
aliue Aug 13, 2021
6a847fe
bring config store branch up to date with master
aliue Aug 13, 2021
a0874cf
cli flags.go changes
aliue Aug 13, 2021
9748c8d
regenerated mocks
aliue Aug 13, 2021
decf85d
regenerated mocks
aliue Aug 13, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
improvements such as backward dc client config compatibility, default…
… config store client values, better adherence to conventions, etc.
  • Loading branch information
aliue committed Aug 13, 2021
commit e11a480d5b21c4b835574d0a71f80e84a3c1af1f
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "idls"]
path = idls
url = https://github.com/uber/cadence-idl.git
branch = configurationStore
branch = master
8 changes: 4 additions & 4 deletions client/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (c *clientImpl) GetDynamicConfig(
if err != nil {
return nil, err
}
ctx, cancel := c.createContextWithLargeTimeout(ctx)
ctx, cancel := c.createContext(ctx)
defer cancel()
return client.GetDynamicConfig(ctx, request, opts...)
}
Expand All @@ -399,7 +399,7 @@ func (c *clientImpl) UpdateDynamicConfig(
if err != nil {
return err
}
ctx, cancel := c.createContextWithLargeTimeout(ctx)
ctx, cancel := c.createContext(ctx)
defer cancel()
return client.UpdateDynamicConfig(ctx, request, opts...)
}
Expand All @@ -414,7 +414,7 @@ func (c *clientImpl) RestoreDynamicConfig(
if err != nil {
return err
}
ctx, cancel := c.createContextWithLargeTimeout(ctx)
ctx, cancel := c.createContext(ctx)
defer cancel()
return client.RestoreDynamicConfig(ctx, request, opts...)
}
Expand All @@ -429,7 +429,7 @@ func (c *clientImpl) ListDynamicConfig(
if err != nil {
return nil, err
}
ctx, cancel := c.createContextWithLargeTimeout(ctx)
ctx, cancel := c.createContext(ctx)
defer cancel()
return client.ListDynamicConfig(ctx, request, opts...)
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/server/cadence/cadence.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ func startHandler(c *cli.Context) {
if cfg.Log.Level == "debug" {
log.Printf("config=\n%v\n", cfg.String())
}
cfg.DynamicConfig.FileBased.Filepath = constructPathIfNeed(rootDir, cfg.DynamicConfig.FileBased.Filepath)
if cfg.DynamicConfig.Client == "" {
cfg.DynamicConfigClient.Filepath = constructPathIfNeed(rootDir, cfg.DynamicConfigClient.Filepath)
} else {
cfg.DynamicConfig.FileBased.Filepath = constructPathIfNeed(rootDir, cfg.DynamicConfig.FileBased.Filepath)
}

if err := cfg.ValidateAndFillDefaults(); err != nil {
log.Fatalf("config validation failed: %v", err)
Expand Down
38 changes: 22 additions & 16 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,25 +114,31 @@ func (s *server) startService() common.Daemon {
params.UpdateLoggerWithServiceName(params.Name)
params.PersistenceConfig = s.cfg.Persistence

switch s.cfg.DynamicConfig.Client {
case dynamicconfig.DynamicConfigConfigStoreClient:
params.DynamicConfig, err = configstore.NewConfigStoreClient(
&s.cfg.DynamicConfig.ConfigStore,
s.cfg.Persistence.DataStores[s.cfg.Persistence.DefaultStore].NoSQL,
params.Logger,
s.doneC,
)
log.Printf("Initializing Config Store Dynamic Config Client\n")
case dynamicconfig.DynamicConfigFileBasedClient:
params.DynamicConfig, err = dynamicconfig.NewFileBasedClient(&s.cfg.DynamicConfig.FileBased, params.Logger, s.doneC)
log.Printf("Initializing File Based Dynamic Config Client\n")
default:
params.DynamicConfig = dynamicconfig.NewNopClient()
err = nil
err = nil
if s.cfg.DynamicConfig.Client == "" {
//try to fallback to legacy dynamicClientConfig
params.DynamicConfig, err = dynamicconfig.NewFileBasedClient(&s.cfg.DynamicConfigClient, params.Logger, s.doneC)
} else {
switch s.cfg.DynamicConfig.Client {
case dynamicconfig.DynamicConfigConfigStoreClient:
log.Printf("Trying to initialize Config Store Dynamic Config Client\n")
params.DynamicConfig, err = configstore.NewConfigStoreClient(
&s.cfg.DynamicConfig.ConfigStore,
&s.cfg.Persistence,
params.Logger,
s.doneC,
)
case dynamicconfig.DynamicConfigFileBasedClient:
log.Printf("Trying to initialize File Based Dynamic Config Client\n")
params.DynamicConfig, err = dynamicconfig.NewFileBasedClient(&s.cfg.DynamicConfig.FileBased, params.Logger, s.doneC)
default:
log.Printf("Trying to initialize Nop Config Client\n")
params.DynamicConfig = dynamicconfig.NewNopClient()
}
}

if err != nil {
log.Printf("error creating config store based dynamic config client, use no-op config client instead. error: %v", err)
log.Printf("error creating dynamic config client, using no-op config client instead. error: %v", err)
params.DynamicConfig = dynamicconfig.NewNopClient()
}
clusterMetadata := s.cfg.ClusterMetadata
Expand Down
12 changes: 8 additions & 4 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/dynamicconfig/configstore/configstoreconfig"
c "github.com/uber/cadence/common/dynamicconfig/configstore/config"
)

type (
Expand All @@ -58,6 +58,10 @@ type (
PublicClient PublicClient `yaml:"publicClient"`
// DynamicConfigClient is the config for setting up the file based dynamic config client
// Filepath would be relative to the root directory when the path wasn't absolute.
// Included for backwards compatibility, please transition to DynamicConfig
DynamicConfigClient dynamicconfig.FileBasedClientConfig `yaml:"dynamicConfigClient"`
// DynamicConfig is the config for setting up all dynamic config clients
// Allows for changes in client without needing code change
DynamicConfig DynamicConfig `yaml:"dynamicconfig"`
// DomainDefaults is the default config for every domain
DomainDefaults DomainDefaults `yaml:"domainDefaults"`
Expand All @@ -73,9 +77,9 @@ type (
}

DynamicConfig struct {
Client string `yaml:"client"`
ConfigStore configstoreconfig.ConfigStoreClientConfig `yaml:"configstore"`
FileBased dynamicconfig.FileBasedClientConfig `yaml:"filebased"`
Client string `yaml:"client"`
ConfigStore c.ClientConfig `yaml:"configstore"`
FileBased dynamicconfig.FileBasedClientConfig `yaml:"filebased"`
}

NoopAuthorizer struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package configstoreconfig
package config

import "time"

// ConfigStoreClientConfig is the config for the config store based dynamic config client.
//This package is necessary to avoid import cycle as config_store_client imports common/config
//while common/config imports this ClientConfig definition

// ClientConfig is the config for the config store based dynamic config client.
// It specifies how often the cached config should be updated by checking underlying database.
type ConfigStoreClientConfig struct {
type ClientConfig struct {
PollInterval time.Duration `yaml:"pollInterval"`
UpdateRetryAttempts int `yaml:"updateRetryAttempts"`
FetchTimeout time.Duration `yaml:"FetchTimeout"`
Expand Down
118 changes: 65 additions & 53 deletions common/dynamicconfig/configstore/config_store_client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2017 Uber Technologies, Inc.
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -31,7 +31,7 @@ import (

"github.com/uber/cadence/common/config"
dc "github.com/uber/cadence/common/dynamicconfig"
csc "github.com/uber/cadence/common/dynamicconfig/configstore/configstoreconfig"
csc "github.com/uber/cadence/common/dynamicconfig/configstore/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
Expand All @@ -45,10 +45,17 @@ const (
configStoreMinPollInterval = time.Second * 2
)

var defaultConfigValues = &csc.ClientConfig{
PollInterval: time.Second * 10,
UpdateRetryAttempts: 1,
FetchTimeout: 2,
UpdateTimeout: 2,
}

type configStoreClient struct {
values atomic.Value
lastUpdatedTime time.Time
config *csc.ConfigStoreClientConfig
config *csc.ClientConfig
configStoreManager persistence.ConfigStoreManager
doneCh chan struct{}
logger log.Logger
Expand All @@ -66,8 +73,23 @@ type fetchResult struct {
}

// NewConfigStoreClient creates a config store client
func NewConfigStoreClient(clientCfg *csc.ConfigStoreClientConfig, persistenceCfg *config.NoSQL, logger log.Logger, doneCh chan struct{}) (dc.Client, error) {
client, err := newConfigStoreClient(clientCfg, persistenceCfg, logger, doneCh)
func NewConfigStoreClient(clientCfg *csc.ClientConfig, persistenceCfg *config.Persistence, logger log.Logger, doneCh chan struct{}) (dc.Client, error) {
if err := validateClientConfig(clientCfg); err != nil {
logger.Error("Invalid Client Config Values, Using Default Values")
clientCfg = defaultConfigValues
}

if persistenceCfg == nil {
return nil, errors.New("persistence cfg is nil")
} else if persistenceCfg.DefaultStore != "cass-default" {
return nil, errors.New("persistence cfg default store is not Cassandra")
} else if store, ok := persistenceCfg.DataStores[persistenceCfg.DefaultStore]; !ok {
return nil, errors.New("persistence cfg datastores missing Cassandra")
} else if store.NoSQL == nil {
return nil, errors.New("NoSQL struct is nil")
}

client, err := newConfigStoreClient(clientCfg, persistenceCfg.DataStores[persistenceCfg.DefaultStore].NoSQL, logger, doneCh)
if err != nil {
return nil, err
}
Expand All @@ -78,11 +100,7 @@ func NewConfigStoreClient(clientCfg *csc.ConfigStoreClientConfig, persistenceCfg
return client, nil
}

func newConfigStoreClient(clientCfg *csc.ConfigStoreClientConfig, persistenceCfg *config.NoSQL, logger log.Logger, doneCh chan struct{}) (*configStoreClient, error) {
if err := validateConfigStoreClientConfig(clientCfg); err != nil {
return nil, err
}

func newConfigStoreClient(clientCfg *csc.ClientConfig, persistenceCfg *config.NoSQL, logger log.Logger, doneCh chan struct{}) (*configStoreClient, error) {
store, err := nosql.NewNoSQLConfigStore(*persistenceCfg, logger)
if err != nil {
return nil, err
Expand Down Expand Up @@ -205,20 +223,20 @@ func (csc *configStoreClient) GetDurationValue(
return defaultValue, err
}

durationString, ok := val.(string)
if !ok {
durVal, ok2 := val.(time.Duration)
if ok2 {
return durVal, nil
var durVal time.Duration
switch v := val.(type) {
case string:
durVal, err = time.ParseDuration(v)
if err != nil {
return defaultValue, errors.New("value string encoding cannot be parsed into duration")
}
return defaultValue, errors.New("value type is not string")
case time.Duration:
durVal = v
default:
return defaultValue, errors.New("value type is not duration")
}

durationVal, err := time.ParseDuration(durationString)
if err != nil {
return defaultValue, fmt.Errorf("failed to parse duration: %v", err)
}
return durationVal, nil
return durVal, nil
}

func (csc *configStoreClient) UpdateValue(name dc.Key, value interface{}) error {
Expand Down Expand Up @@ -358,18 +376,21 @@ func (csc *configStoreClient) updateValue(name dc.Key, value interface{}, retryA
},
}

updateCh := make(chan error)
go func() {
updateCh <- csc.configStoreManager.UpdateDynamicConfig(
context.TODO(),
&persistence.UpdateDynamicConfigRequest{
Snapshot: newSnapshot,
},
)
}()
ctx, cancel := context.WithTimeout(context.Background(), csc.config.UpdateTimeout)
defer cancel()

err := csc.configStoreManager.UpdateDynamicConfig(
ctx,
&persistence.UpdateDynamicConfigRequest{
Snapshot: newSnapshot,
},
)

select {
case err := <-updateCh:
case <-ctx.Done():
//potentially we can retry on timeout
return errors.New("timeout error on update")
default:
if err != nil {
if _, ok := err.(*persistence.ConditionFailedError); ok && retryAttempts > 0 {
//fetch new config and retry
Expand All @@ -386,9 +407,6 @@ func (csc *configStoreClient) updateValue(name dc.Key, value interface{}, retryA
return err
}
return nil
case <-time.After(csc.config.UpdateTimeout):
return errors.New("timeout error on update")
//should we retry on timeout errors
}
}

Expand Down Expand Up @@ -455,34 +473,28 @@ func copyDataBlob(blob *types.DataBlob) *types.DataBlob {
}

func (csc *configStoreClient) update() error {
fetchCh := make(chan *fetchResult)
go func() {
res, err := csc.configStoreManager.FetchDynamicConfig(context.TODO())
if res == nil {
fetchCh <- &fetchResult{snapshot: nil, err: err}
} else {
fetchCh <- &fetchResult{snapshot: res.Snapshot, err: err}
}
}()
ctx, cancel := context.WithTimeout(context.Background(), csc.config.FetchTimeout)
defer cancel()

res, err := csc.configStoreManager.FetchDynamicConfig(ctx)

select {
case fetchRes := <-fetchCh:
if fetchRes.err != nil {
return fmt.Errorf("failed to fetch dynamic config snapshot %v", fetchRes.err)
case <-ctx.Done():
return errors.New("timeout error on fetch")
default:
if err != nil {
return fmt.Errorf("failed to fetch dynamic config snapshot %v", err)
}

if fetchRes.snapshot != nil {
if res != nil && res.Snapshot != nil {
defer func() {
csc.lastUpdatedTime = time.Now()
}()

return csc.storeValues(fetchRes.snapshot)
return csc.storeValues(res.Snapshot)
}

return nil
case <-time.After(csc.config.FetchTimeout):
return errors.New("timeout error on fetch")
}
return nil
}

func (csc *configStoreClient) storeValues(snapshot *persistence.DynamicConfigSnapshot) error {
Expand Down Expand Up @@ -557,7 +569,7 @@ func matchFilters(dcValue *types.DynamicConfigValue, filters map[dc.Filter]inter
return true
}

func validateConfigStoreClientConfig(config *csc.ConfigStoreClientConfig) error {
func validateClientConfig(config *csc.ClientConfig) error {
if config == nil {
return errors.New("no config found for config store based dynamic config client")
}
Expand Down
Loading