Skip to content

Commit

Permalink
support etcd and resource group
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Jan 12, 2023
1 parent bac5bf4 commit 492c62c
Show file tree
Hide file tree
Showing 18 changed files with 4,030 additions and 349 deletions.
38 changes: 22 additions & 16 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ type Region struct {

// GlobalConfigItem standard format of KV pair in GlobalConfig client
type GlobalConfigItem struct {
Name string
Value string
Error error
ItemKind pdpb.ItemKind
Name string
Value string
Error error
}

// Client is a PD (Placement Driver) client.
Expand Down Expand Up @@ -122,11 +123,11 @@ type Client interface {
GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)

// LoadGlobalConfig gets the global config from etcd
LoadGlobalConfig(ctx context.Context, names []string) ([]GlobalConfigItem, error)
LoadGlobalConfig(ctx context.Context, configPath string) ([]GlobalConfigItem, int64, error)
// StoreGlobalConfig set the config from etcd
StoreGlobalConfig(ctx context.Context, items []GlobalConfigItem) error
StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error
// WatchGlobalConfig returns an stream with all global config and updates
WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem, error)
WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error)
// UpdateOption updates the client option.
UpdateOption(option DynamicOption, value interface{}) error

Expand Down Expand Up @@ -1822,10 +1823,10 @@ func trimHTTPPrefix(str string) string {
return str
}

func (c *client) LoadGlobalConfig(ctx context.Context, names []string) ([]GlobalConfigItem, error) {
resp, err := c.getClient().LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{Names: names})
func (c *client) LoadGlobalConfig(ctx context.Context, configPath string) ([]GlobalConfigItem, int64, error) {
resp, err := c.getClient().LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{ConfigPath: configPath})
if err != nil {
return nil, err
return nil, 0, err
}
res := make([]GlobalConfigItem, len(resp.GetItems()))
for i, item := range resp.GetItems() {
Expand All @@ -1841,15 +1842,16 @@ func (c *client) LoadGlobalConfig(ctx context.Context, names []string) ([]Global
}
res[i] = cfg
}
return res, nil
return res, resp.GetRevision(), nil
}

func (c *client) StoreGlobalConfig(ctx context.Context, items []GlobalConfigItem) error {
func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error {
resArr := make([]*pdpb.GlobalConfigItem, len(items))
for i, it := range items {
resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value}
resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value, Kind: it.ItemKind}
}
res, err := c.getClient().StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: resArr})
fmt.Printf("GlobalConfigItem %+v", resArr)
res, err := c.getClient().StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: resArr, ConfigPath: configPath})
if err != nil {
return err
}
Expand All @@ -1860,9 +1862,13 @@ func (c *client) StoreGlobalConfig(ctx context.Context, items []GlobalConfigItem
return err
}

func (c *client) WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem, error) {
func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error) {
// register watch components
globalConfigWatcherCh := make(chan []GlobalConfigItem, 16)
res, err := c.getClient().WatchGlobalConfig(ctx, &pdpb.WatchGlobalConfigRequest{})
res, err := c.getClient().WatchGlobalConfig(ctx, &pdpb.WatchGlobalConfigRequest{
ConfigPath: configPath,
Revision: revision,
})
if err != nil {
close(globalConfigWatcherCh)
return nil, err
Expand All @@ -1886,7 +1892,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem
}
arr := make([]GlobalConfigItem, len(m.Changes))
for j, i := range m.Changes {
arr[j] = GlobalConfigItem{i.GetName(), i.GetValue(), nil}
arr[j] = GlobalConfigItem{i.GetKind(), i.GetName(), i.GetValue(), nil}
}
globalConfigWatcherCh <- arr
}
Expand Down
7 changes: 4 additions & 3 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ module github.com/tikv/pd/client
go 1.16

require (
github.com/gogo/protobuf v1.3.2
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230110033234-055843a0a07d
github.com/pingcap/kvproto v0.0.0-20230111073505-de69cb94beae
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.0
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.8.1
go.uber.org/goleak v1.1.11
go.uber.org/zap v1.20.0
google.golang.org/grpc v1.43.0
google.golang.org/grpc v1.51.0
)
Loading

0 comments on commit 492c62c

Please sign in to comment.