Skip to content

Commit

Permalink
Merge branch 'master' into fix_tikv_11463
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored Dec 14, 2021
2 parents d6c55d2 + 3c626f2 commit 0b38d02
Show file tree
Hide file tree
Showing 6 changed files with 391 additions and 3 deletions.
89 changes: 89 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ type Region struct {
PendingPeers []*metapb.Peer
}

// GlobalConfigItem standard format of KV pair in GlobalConfig client
type GlobalConfigItem struct {
Name string
Value string
Error error
}

// Client is a PD (Placement Driver) client.
// It should not be used after calling Close().
type Client interface {
Expand Down Expand Up @@ -109,6 +116,13 @@ type Client interface {
SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitRegionsResponse, error)
// GetOperator gets the status of operator of the specified region.
GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)

// LoadGlobalConfig gets the global config from etcd
LoadGlobalConfig(ctx context.Context, names []string) ([]GlobalConfigItem, error)
// StoreGlobalConfig set the config from etcd
StoreGlobalConfig(ctx context.Context, items []GlobalConfigItem) error
// WatchGlobalConfig returns an stream with all global config and updates
WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem, error)
// UpdateOption updates the client option.
UpdateOption(option DynamicOption, value interface{}) error
// Close closes the client.
Expand Down Expand Up @@ -307,6 +321,8 @@ var (
errClosing = errors.New("[pd] closing")
// errTSOLength is returned when the number of response timestamps is inconsistent with request.
errTSOLength = errors.New("[pd] tso length in rpc response is incorrect")
// errGlobalConfigNotFound is returned when etcd does not contain the globalConfig item
errGlobalConfigNotFound = errors.New("[pd] global config not found")
)

// ClientOption configures client.
Expand Down Expand Up @@ -1745,3 +1761,76 @@ func trimHTTPPrefix(str string) string {
str = strings.TrimPrefix(str, "https://")
return str
}

func (c *client) LoadGlobalConfig(ctx context.Context, names []string) ([]GlobalConfigItem, error) {
resp, err := c.getClient().LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{Names: names})
if err != nil {
return nil, err
}
res := make([]GlobalConfigItem, len(resp.GetItems()))
for i, item := range resp.GetItems() {
cfg := GlobalConfigItem{Name: item.GetName()}
if item.Error != nil {
if item.Error.Type == pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND {
cfg.Error = errGlobalConfigNotFound
} else {
cfg.Error = errors.New("[pd]" + item.Error.Message)
}
} else {
cfg.Value = item.GetValue()
}
res[i] = cfg
}
return res, nil
}

func (c *client) StoreGlobalConfig(ctx context.Context, items []GlobalConfigItem) error {
resArr := make([]*pdpb.GlobalConfigItem, len(items))
for i, it := range items {
resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value}
}
res, err := c.getClient().StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: resArr})
if err != nil {
return err
}
resErr := res.GetError()
if resErr != nil {
return errors.Errorf("[pd]" + resErr.Message)
}
return err
}

func (c *client) WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem, error) {
globalConfigWatcherCh := make(chan []GlobalConfigItem, 16)
res, err := c.getClient().WatchGlobalConfig(ctx, &pdpb.WatchGlobalConfigRequest{})
if err != nil {
close(globalConfigWatcherCh)
return nil, err
}
go func() {
defer func() {
if r := recover(); r != nil {
log.Error("[pd] panic in client `WatchGlobalConfig`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
close(globalConfigWatcherCh)
return
default:
m, err := res.Recv()
if err != nil {
return
}
arr := make([]GlobalConfigItem, len(m.Changes))
for j, i := range m.Changes {
arr[j] = GlobalConfigItem{i.GetName(), i.GetValue(), nil}
}
globalConfigWatcherCh <- arr
}
}
}()
return globalConfigWatcherCh, err
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20211109071446-a8b4d34474bc
github.com/pingcap/kvproto v0.0.0-20211213085605-3329b3c5404c
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d
github.com/pingcap/tidb-dashboard v0.0.0-20211206031355-bcc43a01d537
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,8 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20211109071446-a8b4d34474bc h1:6goyJr/7qam8KgDLgOd3k2BQAjtPlg+w22YdgClBlIk=
github.com/pingcap/kvproto v0.0.0-20211109071446-a8b4d34474bc/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20211213085605-3329b3c5404c h1:jrPg+QFqQ7VyI30SPzB0ZviHCvDGyZHiASz6Bgomxi0=
github.com/pingcap/kvproto v0.0.0-20211213085605-3329b3c5404c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
87 changes: 87 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import (
"github.com/tikv/pd/pkg/tsoutil"
"github.com/tikv/pd/server/cluster"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/kv"
"github.com/tikv/pd/server/tso"
"github.com/tikv/pd/server/versioninfo"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -1698,3 +1700,88 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan
}
<-done
}

const globalConfigPath = "/global/config/"

// StoreGlobalConfig store global config into etcd by transaction
func (s *GrpcServer) StoreGlobalConfig(ctx context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) {
ops := make([]clientv3.Op, len(request.Changes))
for i, item := range request.Changes {
name := globalConfigPath + item.GetName()
value := item.GetValue()
ops[i] = clientv3.OpPut(name, value)
}
res, err :=
kv.NewSlowLogTxn(s.client).Then(ops...).Commit()
if err != nil {
return &pdpb.StoreGlobalConfigResponse{Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: err.Error()}}, err
}
if !res.Succeeded {
return &pdpb.StoreGlobalConfigResponse{Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: "failed to execute StoreGlobalConfig transaction"}}, errors.Errorf("failed to execute StoreGlobalConfig transaction")
}
return &pdpb.StoreGlobalConfigResponse{}, err
}

// LoadGlobalConfig load global config from etcd
func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) {
names := request.Names
res := make([]*pdpb.GlobalConfigItem, len(names))
for i, name := range names {
r, err := s.client.Get(ctx, globalConfigPath+name)
if err != nil {
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: err.Error()}}
} else if len(r.Kvs) == 0 {
msg := "key " + name + " not found"
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND, Message: msg}}
} else {
res[i] = &pdpb.GlobalConfigItem{Name: name, Value: string(r.Kvs[0].Value)}
}
}
return &pdpb.LoadGlobalConfigResponse{Items: res}, nil
}

// WatchGlobalConfig if the connection of WatchGlobalConfig is end
// or stoped by whatever reason
// just reconnect to it.
func (s *GrpcServer) WatchGlobalConfig(request *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
ctx, cancel := context.WithCancel(s.Context())
defer cancel()
err := s.sendAllGlobalConfig(ctx, server)
if err != nil {
return err
}
watchChan := s.client.Watch(ctx, globalConfigPath, clientv3.WithPrefix())
for {
select {
case <-ctx.Done():
return nil
case res := <-watchChan:
cfgs := make([]*pdpb.GlobalConfigItem, 0, len(res.Events))
for _, e := range res.Events {
if e.Type != clientv3.EventTypePut {
continue
}
cfgs = append(cfgs, &pdpb.GlobalConfigItem{Name: string(e.Kv.Key), Value: string(e.Kv.Value)})
}
if len(cfgs) > 0 {
err := server.Send(&pdpb.WatchGlobalConfigResponse{Changes: cfgs})
if err != nil {
return err
}
}
}
}
}

func (s *GrpcServer) sendAllGlobalConfig(ctx context.Context, server pdpb.PD_WatchGlobalConfigServer) error {
configList, err := s.client.Get(ctx, globalConfigPath, clientv3.WithPrefix())
if err != nil {
return err
}
ls := make([]*pdpb.GlobalConfigItem, configList.Count)
for i, kv := range configList.Kvs {
ls[i] = &pdpb.GlobalConfigItem{Name: string(kv.Key), Value: string(kv.Value)}
}
err = server.Send(&pdpb.WatchGlobalConfigResponse{Changes: ls})
return err
}
5 changes: 5 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,3 +1425,8 @@ func (s *Server) SaveTTLConfig(data map[string]interface{}, ttl time.Duration) e
}
return nil
}

// SplitAndScatterRegions TODO
func (s *Server) SplitAndScatterRegions(context context.Context, r *pdpb.SplitAndScatterRegionsRequest) (*pdpb.SplitAndScatterRegionsResponse, error) {
return nil, errors.New("no implemented")
}
Loading

0 comments on commit 0b38d02

Please sign in to comment.