Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added config store functionality (initial implementation) #4357

Merged
merged 57 commits into from
Aug 13, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
a98f461
added idl api and blob definitions
aliue Jun 23, 2021
65af1c1
Merge branch 'master' into configurationStore
aliue Jun 24, 2021
99aec05
added internal types for configStore API
aliue Jun 24, 2021
aed08c5
added thrift mapper implementation for service API
aliue Jun 24, 2021
38166d1
moved proto API definitions to proto internal
aliue Jun 24, 2021
c5fc1da
added proto mapper for configStore API
aliue Jun 24, 2021
90e0961
added dummy API functions
aliue Jun 24, 2021
6aa2e7a
changed value to datablob type in proto schema
aliue Jun 28, 2021
c9b6f6a
changed internal types and mappers for datablob values
aliue Jun 28, 2021
ef4bd9b
removed db blob definition
aliue Jun 28, 2021
810916e
changed mapper to reflect move of shared DynamicConfig thrift types
aliue Jun 29, 2021
038f940
added blob definition, thrift mapper, and serialize/deserialize for P…
aliue Jun 29, 2021
b6ebe54
updated configStore branch with master
aliue Jul 1, 2021
c196b69
removed get_all and filters field of GetDynamicConfigRequest
aliue Jul 2, 2021
da3a551
added configStore and configStoreManager dummy implementations
aliue Jul 2, 2021
5bb77f9
added configStoreCRUD for nosqlplugin
aliue Jul 2, 2021
f2ba536
started cassandra plugin configStore.go
aliue Jul 2, 2021
06dd59c
added conditional insert for cassandra
aliue Jul 2, 2021
d3587b6
initial persistence layer implementation
aliue Jul 7, 2021
f098dfb
fixed typo
aliue Jul 7, 2021
88a9d8e
added support for config store to persistence clients
aliue Jul 7, 2021
40a7fbc
initial dc client implementation
aliue Jul 19, 2021
f318ab6
moved dc client and finished updateValue impl
aliue Jul 20, 2021
28bf505
fixed UpdateValue for client
aliue Jul 22, 2021
cd83142
fixed exported definitions
aliue Jul 22, 2021
d816107
change to align toward code base conventions and other minor details
aliue Jul 22, 2021
12f4662
added timeouts for fetch and update
aliue Jul 22, 2021
0f80a06
initial tests
aliue Jul 27, 2021
97afde5
merge up to date with master branch
aliue Jul 27, 2021
0eb03d6
conform to conventions with new update
aliue Jul 27, 2021
dd4d58d
added persistence tests (initial set) along with cassandra schema update
aliue Jul 27, 2021
3f1a02d
config store client tests, unresolved map test issue
aliue Jul 30, 2021
5cfabb7
fixed variable naming
aliue Jul 30, 2021
3d82055
added configStore functions to adminHandler
aliue Aug 3, 2021
07481e9
improved admin handler and changed GetDynamicConfigRequest struct
aliue Aug 3, 2021
3f298c0
added cli commands for config store
aliue Aug 5, 2021
87c29da
added dc client unit tests for Restore and List Value, fixed client l…
aliue Aug 5, 2021
d322e6f
shifted configstoreclientconfig to new package, added some adminHandl…
aliue Aug 6, 2021
5f2970f
fixed runtime panic error and improved cli interaction
aliue Aug 10, 2021
820b6d7
made changes to idl schema and fixed restore behavior
aliue Aug 11, 2021
8add81a
fixed client test race conditions
aliue Aug 11, 2021
f9e4309
realized still needed list request and cli get with no filter specifi…
aliue Aug 11, 2021
9f592a3
select dc client impl based on config file rather than hardcoded default
aliue Aug 11, 2021
22f115c
commiting .gen changes
aliue Aug 12, 2021
0c40f8c
change back to official idl repo
aliue Aug 12, 2021
bf6ae43
added back in configStore thrift
aliue Aug 12, 2021
e4a5654
fixed metric test error
aliue Aug 12, 2021
64ff26b
changes made to reflect idl changes
aliue Aug 12, 2021
e11a480
improvements such as backward dc client config compatibility, default…
aliue Aug 13, 2021
ea50165
added case to persistence test base for sql
aliue Aug 13, 2021
9754275
Revert "added case to persistence test base for sql"
aliue Aug 13, 2021
467261a
added skips to config store persistence test for non-Cass dbs
aliue Aug 13, 2021
c9076f5
changes to config yaml files
aliue Aug 13, 2021
6a847fe
bring config store branch up to date with master
aliue Aug 13, 2021
a0874cf
cli flags.go changes
aliue Aug 13, 2021
9748c8d
regenerated mocks
aliue Aug 13, 2021
decf85d
regenerated mocks
aliue Aug 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
improved admin handler and changed GetDynamicConfigRequest struct
  • Loading branch information
aliue committed Aug 3, 2021
commit 07481e9580f6bdd7ed6cbed7f7eedc34215445c4
2 changes: 2 additions & 0 deletions common/dynamicconfig/configstore/config_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ func (csc *configStoreClient) UpdateValue(name dc.Key, value interface{}) error
}

func (csc *configStoreClient) RestoreValue(name dc.Key, filters map[dc.Filter]interface{}) error {
//if empty filter provided, update fallback value.
//if u want to remove entire entry, just do update value with empty
currentCached := csc.values.Load().(cacheEntry)
if currentCached.dcEntries == nil {
return dc.NotFoundError
Expand Down
12 changes: 12 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,14 @@ const (
AdminDescribeShardDistributionScope
// AdminGetCrossClusterTasksScope is the metric scope for admin.GetCrossClusterTasks
AdminGetCrossClusterTasksScope
// AdminGetDynamicConfigScope is the metric scope for admin.GetDynamicConfig
AdminGetDynamicConfigScope
// AdminUpdateDynamicConfigScope is the metric scope for admin.UpdateDynamicConfig
AdminUpdateDynamicConfigScope
// AdminRestoreDynamicConfigScope is the metric scope for admin.RestoreDynamicConfig
AdminRestoreDynamicConfigScope
// AdminListDynamicConfigScope is the metric scope for admin.ListDynamicConfig
AdminListDynamicConfigScope

NumAdminScopes
)
Expand Down Expand Up @@ -1447,6 +1455,10 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
AdminRefreshWorkflowTasksScope: {operation: "RefreshWorkflowTasks"},
AdminResendReplicationTasksScope: {operation: "ResendReplicationTasks"},
AdminGetCrossClusterTasksScope: {operation: "AdminGetCrossClusterTasksScope"},
AdminGetDynamicConfigScope: {operation: "AdminGetDynamicConfigScope"},
AdminUpdateDynamicConfigScope: {operation: "AdminUpdateDynamicConfigScope"},
AdminRestoreDynamicConfigScope: {operation: "AdminRestoreDynamicConfigScope"},
AdminListDynamicConfigScope: {operation: "AdminListDynamicConfigScope"},

FrontendStartWorkflowExecutionScope: {operation: "StartWorkflowExecution"},
FrontendPollForDecisionTaskScope: {operation: "PollForDecisionTask"},
Expand Down
10 changes: 9 additions & 1 deletion common/types/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ func (v *RingInfo) GetMembers() (o []*HostInfo) {
}

type GetDynamicConfigRequest struct {
ConfigName string `json:"configName,omitempty"`
ConfigName string `json:"configName,omitempty"`
Filters []*DynamicConfigFilter `json:"filters,omitempty"`
}

func (v *GetDynamicConfigRequest) GetConfigName() (o string) {
Expand All @@ -416,6 +417,13 @@ func (v *GetDynamicConfigRequest) GetConfigName() (o string) {
return
}

func (v *GetDynamicConfigRequest) GetFilters() (o []*DynamicConfigFilter) {
if v != nil && v.Filters != nil {
return v.Filters
}
return
}

type GetDynamicConfigResponse struct {
Value *DataBlob `json:"value,omitempty"`
ValueSource string `json:"valueSource,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions common/types/mapper/proto/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ func FromGetDynamicConfigRequest(t *types.GetDynamicConfigRequest) *adminv1.GetD
}
return &adminv1.GetDynamicConfigRequest{
ConfigName: t.ConfigName,
Filters: FromDynamicConfigFilterArray(t.Filters),
}
}

Expand All @@ -740,6 +741,7 @@ func ToGetDynamicConfigRequest(t *adminv1.GetDynamicConfigRequest) *types.GetDyn
}
return &types.GetDynamicConfigRequest{
ConfigName: t.ConfigName,
Filters: ToDynamicConfigFilterArray(t.Filters),
}
}

Expand Down
2 changes: 2 additions & 0 deletions common/types/mapper/thrift/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ func FromGetDynamicConfigRequest(t *types.GetDynamicConfigRequest) *admin.GetDyn
}
return &admin.GetDynamicConfigRequest{
ConfigName: &t.ConfigName,
Filters: FromDynamicConfigFilterArray(t.Filters),
}
}

Expand All @@ -485,6 +486,7 @@ func ToGetDynamicConfigRequest(t *admin.GetDynamicConfigRequest) *types.GetDynam
}
return &types.GetDynamicConfigRequest{
ConfigName: t.GetConfigName(),
Filters: ToDynamicConfigFilterArray(t.Filters),
}
}

Expand Down
2 changes: 1 addition & 1 deletion idls
Submodule idls updated 1 files
+1 −0 thrift/admin.thrift
1 change: 1 addition & 0 deletions proto/internal/uber/cadence/admin/v1/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ will be returned. If get_all is False and a filter(s) is specified, value that m
If config_name cannot be found in database, default value will be returned. */
message GetDynamicConfigRequest {
string config_name = 1;
repeated DynamicConfigFilter filters = 2;
}

message GetDynamicConfigResponse {
Expand Down
95 changes: 76 additions & 19 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1259,20 +1259,39 @@ func deserializeRawHistoryToken(bytes []byte) (*getWorkflowRawHistoryV2Token, er
return token, err
}

func (adh *adminHandlerImpl) GetDynamicConfig(ctx context.Context, request *types.GetDynamicConfigRequest) (*types.GetDynamicConfigResponse, error) {
func (adh *adminHandlerImpl) GetDynamicConfig(ctx context.Context, request *types.GetDynamicConfigRequest) (_ *types.GetDynamicConfigResponse, retError error) {
defer log.CapturePanic(adh.GetLogger(), &retError)
scope, sw := adh.startRequestProfile(metrics.AdminGetDynamicConfigScope)
defer sw.Stop()

if request == nil || request.ConfigName == "" {
return nil, adh.error(errRequestNotSet, scope)
}

keyVal, err := checkValidKey(request.ConfigName)
if err != nil {
return nil, errors.New("invalid dynamic config parameter name")
return nil, adh.error(err, scope)
}

value, err := adh.params.DynamicConfig.GetValue(keyVal, nil)
if err != nil {
return nil, err
var value interface{}
if request.Filters == nil {
value, err = adh.params.DynamicConfig.GetValue(keyVal, nil)
if err != nil {
return nil, adh.error(err, scope)
}
} else {
convFilters, err := convertFilterMapToList(request.Filters)
if err != nil {
return nil, adh.error(err, scope)
}
value, err = adh.params.DynamicConfig.GetValueWithFilters(keyVal, convFilters, nil)
}

data, err := json.Marshal(value)
if err != nil {
return nil, err
return nil, adh.error(err, scope)
}

return &types.GetDynamicConfigResponse{
Value: &types.DataBlob{
EncodingType: types.EncodingTypeJSON.Ptr(),
Expand All @@ -1282,39 +1301,64 @@ func (adh *adminHandlerImpl) GetDynamicConfig(ctx context.Context, request *type
}, nil
}

func (adh *adminHandlerImpl) UpdateDynamicConfig(ctx context.Context, request *types.UpdateDynamicConfigRequest) error {
func (adh *adminHandlerImpl) UpdateDynamicConfig(ctx context.Context, request *types.UpdateDynamicConfigRequest) (retError error) {
defer log.CapturePanic(adh.GetLogger(), &retError)
scope, sw := adh.startRequestProfile(metrics.AdminUpdateDynamicConfigScope)
defer sw.Stop()

if request == nil {
return adh.error(errRequestNotSet, scope)
}

keyVal, err := checkValidKey(request.ConfigName)
if err != nil {
return errors.New("invalid dynamic config parameter name")
return adh.error(err, scope)
}

return adh.params.DynamicConfig.UpdateValue(keyVal, request.ConfigValues)
}

func (adh *adminHandlerImpl) RestoreDynamicConfig(ctx context.Context, request *types.RestoreDynamicConfigRequest) error {
func (adh *adminHandlerImpl) RestoreDynamicConfig(ctx context.Context, request *types.RestoreDynamicConfigRequest) (retError error) {
defer log.CapturePanic(adh.GetLogger(), &retError)
scope, sw := adh.startRequestProfile(metrics.AdminRestoreDynamicConfigScope)
defer sw.Stop()

if request == nil || request.ConfigName == "" {
return adh.error(errRequestNotSet, scope)
}

keyVal, err := checkValidKey(request.ConfigName)
if err != nil {
return errors.New("invalid dynamic config parameter name")
return adh.error(err, scope)
}

filters := make(map[dc.Filter]interface{})
for _, filter := range request.Filters {
val, err := convertFromDataBlob(filter.Value)
var filters map[dc.Filter]interface{}

if request.Filters == nil {
filters = nil
} else {
filters, err = convertFilterMapToList(request.Filters)
if err != nil {
return err
return adh.error(errInvalidFilters, scope)
}
filters[dc.ParseFilter(filter.Name)] = val
}

return adh.params.DynamicConfig.RestoreValue(keyVal, filters)
}

func (adh *adminHandlerImpl) ListDynamicConfig(ctx context.Context, request *types.ListDynamicConfigRequest) (*types.ListDynamicConfigResponse, error) {
func (adh *adminHandlerImpl) ListDynamicConfig(ctx context.Context, request *types.ListDynamicConfigRequest) (_ *types.ListDynamicConfigResponse, retError error) {
defer log.CapturePanic(adh.GetLogger(), &retError)
scope, sw := adh.startRequestProfile(metrics.AdminListDynamicConfigScope)
defer sw.Stop()

if request == nil {
return nil, adh.error(errRequestNotSet, scope)
}

keyVal, err := checkValidKey(request.ConfigName)
if err != nil {
entries, err2 := adh.params.DynamicConfig.ListValue(dc.UnknownKey)
if err2 != nil {
err = err2
err = adh.error(err2, scope)
}
return &types.ListDynamicConfigResponse{
Entries: entries,
Expand All @@ -1323,7 +1367,7 @@ func (adh *adminHandlerImpl) ListDynamicConfig(ctx context.Context, request *typ

entries, err2 := adh.params.DynamicConfig.ListValue(keyVal)
if err2 != nil {
err = err2
err = adh.error(err2, scope)
}
return &types.ListDynamicConfigResponse{
Entries: entries,
Expand All @@ -1348,3 +1392,16 @@ func convertFromDataBlob(blob *types.DataBlob) (interface{}, error) {
return nil, errors.New("unsupported blob encoding")
}
}

func convertFilterMapToList(filters []*types.DynamicConfigFilter) (map[dc.Filter]interface{}, error) {
newFilters := make(map[dc.Filter]interface{})

for _, filter := range filters {
val, err := convertFromDataBlob(filter.Value)
if err != nil {
return nil, err
}
newFilters[dc.ParseFilter(filter.Name)] = val
}
return newFilters, nil
}
1 change: 1 addition & 0 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ var (
errEmptyReplicationInfo = &types.BadRequestError{Message: "Replication task info is not set."}
errEmptyQueueType = &types.BadRequestError{Message: "Queue type is not set."}
errShuttingDown = &types.InternalServiceError{Message: "Shutting down"}
errInvalidFilters = &types.BadRequestError{Message: "Request Filters are invalid, unable to parse."}

// err for archival
errHistoryNotFound = &types.BadRequestError{Message: "Requested workflow history not found, may have passed retention period."}
Expand Down