Skip to content

Commit a213ae6

Browse files
LemonHXcrazycs520ti-chi-bot
authored andcommitted
implement GlobalConfig for grpc server and client (tikv#4308)
* add `global_config` for grpc server and client dummy implement `SplitAndScatterRegions` and modify the kv proto deps for build batch send and store globalconfig by using transaction close tikv#4443 Co-authored-by: crazycs <crazycs520@gmail.com> Signed-off-by: lemonhx <lemonhx@lemonhx.tech> * remove unsafe code from global config test rearrange code from global config client Signed-off-by: lemonhx <lemonhx@lemonhx.tech> * check transaction response in store rename variable `receiver` into `globalConfigWatcherCH` due to it cause ambiguity of variable meaning Signed-off-by: lemonhx <lemonhx@lemonhx.tech> * merged kvproto Signed-off-by: lemonhx <lemonhx@lemonhx.tech> Co-authored-by: crazycs <crazycs520@gmail.com> Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io> Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>
1 parent 5d006a4 commit a213ae6

File tree

6 files changed

+391
-3
lines changed

6 files changed

+391
-3
lines changed

client/client.go

+89
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ type Region struct {
4646
PendingPeers []*metapb.Peer
4747
}
4848

49+
// GlobalConfigItem standard format of KV pair in GlobalConfig client
50+
type GlobalConfigItem struct {
51+
Name string
52+
Value string
53+
Error error
54+
}
55+
4956
// Client is a PD (Placement Driver) client.
5057
// It should not be used after calling Close().
5158
type Client interface {
@@ -109,6 +116,13 @@ type Client interface {
109116
SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitRegionsResponse, error)
110117
// GetOperator gets the status of operator of the specified region.
111118
GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)
119+
120+
// LoadGlobalConfig gets the global config from etcd
121+
LoadGlobalConfig(ctx context.Context, names []string) ([]GlobalConfigItem, error)
122+
// StoreGlobalConfig set the config from etcd
123+
StoreGlobalConfig(ctx context.Context, items []GlobalConfigItem) error
124+
// WatchGlobalConfig returns an stream with all global config and updates
125+
WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem, error)
112126
// UpdateOption updates the client option.
113127
UpdateOption(option DynamicOption, value interface{}) error
114128
// Close closes the client.
@@ -307,6 +321,8 @@ var (
307321
errClosing = errors.New("[pd] closing")
308322
// errTSOLength is returned when the number of response timestamps is inconsistent with request.
309323
errTSOLength = errors.New("[pd] tso length in rpc response is incorrect")
324+
// errGlobalConfigNotFound is returned when etcd does not contain the globalConfig item
325+
errGlobalConfigNotFound = errors.New("[pd] global config not found")
310326
)
311327

312328
// ClientOption configures client.
@@ -1745,3 +1761,76 @@ func trimHTTPPrefix(str string) string {
17451761
str = strings.TrimPrefix(str, "https://")
17461762
return str
17471763
}
1764+
1765+
func (c *client) LoadGlobalConfig(ctx context.Context, names []string) ([]GlobalConfigItem, error) {
1766+
resp, err := c.getClient().LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{Names: names})
1767+
if err != nil {
1768+
return nil, err
1769+
}
1770+
res := make([]GlobalConfigItem, len(resp.GetItems()))
1771+
for i, item := range resp.GetItems() {
1772+
cfg := GlobalConfigItem{Name: item.GetName()}
1773+
if item.Error != nil {
1774+
if item.Error.Type == pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND {
1775+
cfg.Error = errGlobalConfigNotFound
1776+
} else {
1777+
cfg.Error = errors.New("[pd]" + item.Error.Message)
1778+
}
1779+
} else {
1780+
cfg.Value = item.GetValue()
1781+
}
1782+
res[i] = cfg
1783+
}
1784+
return res, nil
1785+
}
1786+
1787+
func (c *client) StoreGlobalConfig(ctx context.Context, items []GlobalConfigItem) error {
1788+
resArr := make([]*pdpb.GlobalConfigItem, len(items))
1789+
for i, it := range items {
1790+
resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value}
1791+
}
1792+
res, err := c.getClient().StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: resArr})
1793+
if err != nil {
1794+
return err
1795+
}
1796+
resErr := res.GetError()
1797+
if resErr != nil {
1798+
return errors.Errorf("[pd]" + resErr.Message)
1799+
}
1800+
return err
1801+
}
1802+
1803+
func (c *client) WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem, error) {
1804+
globalConfigWatcherCh := make(chan []GlobalConfigItem, 16)
1805+
res, err := c.getClient().WatchGlobalConfig(ctx, &pdpb.WatchGlobalConfigRequest{})
1806+
if err != nil {
1807+
close(globalConfigWatcherCh)
1808+
return nil, err
1809+
}
1810+
go func() {
1811+
defer func() {
1812+
if r := recover(); r != nil {
1813+
log.Error("[pd] panic in client `WatchGlobalConfig`", zap.Any("error", r))
1814+
return
1815+
}
1816+
}()
1817+
for {
1818+
select {
1819+
case <-ctx.Done():
1820+
close(globalConfigWatcherCh)
1821+
return
1822+
default:
1823+
m, err := res.Recv()
1824+
if err != nil {
1825+
return
1826+
}
1827+
arr := make([]GlobalConfigItem, len(m.Changes))
1828+
for j, i := range m.Changes {
1829+
arr[j] = GlobalConfigItem{i.GetName(), i.GetValue(), nil}
1830+
}
1831+
globalConfigWatcherCh <- arr
1832+
}
1833+
}
1834+
}()
1835+
return globalConfigWatcherCh, err
1836+
}

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ require (
3131
github.com/pingcap/errcode v0.3.0
3232
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
3333
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
34-
github.com/pingcap/kvproto v0.0.0-20211109071446-a8b4d34474bc
34+
github.com/pingcap/kvproto v0.0.0-20211213085605-3329b3c5404c
3535
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7
3636
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d
3737
github.com/pingcap/tidb-dashboard v0.0.0-20211206031355-bcc43a01d537

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -335,8 +335,8 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt
335335
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
336336
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
337337
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
338-
github.com/pingcap/kvproto v0.0.0-20211109071446-a8b4d34474bc h1:6goyJr/7qam8KgDLgOd3k2BQAjtPlg+w22YdgClBlIk=
339-
github.com/pingcap/kvproto v0.0.0-20211109071446-a8b4d34474bc/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
338+
github.com/pingcap/kvproto v0.0.0-20211213085605-3329b3c5404c h1:jrPg+QFqQ7VyI30SPzB0ZviHCvDGyZHiASz6Bgomxi0=
339+
github.com/pingcap/kvproto v0.0.0-20211213085605-3329b3c5404c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
340340
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
341341
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
342342
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=

server/grpc_service.go

+87
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ import (
3434
"github.com/tikv/pd/pkg/tsoutil"
3535
"github.com/tikv/pd/server/cluster"
3636
"github.com/tikv/pd/server/core"
37+
"github.com/tikv/pd/server/kv"
3738
"github.com/tikv/pd/server/tso"
3839
"github.com/tikv/pd/server/versioninfo"
40+
"go.etcd.io/etcd/clientv3"
3941
"go.uber.org/zap"
4042
"google.golang.org/grpc"
4143
"google.golang.org/grpc/codes"
@@ -1698,3 +1700,88 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan
16981700
}
16991701
<-done
17001702
}
1703+
1704+
const globalConfigPath = "/global/config/"
1705+
1706+
// StoreGlobalConfig store global config into etcd by transaction
1707+
func (s *GrpcServer) StoreGlobalConfig(ctx context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) {
1708+
ops := make([]clientv3.Op, len(request.Changes))
1709+
for i, item := range request.Changes {
1710+
name := globalConfigPath + item.GetName()
1711+
value := item.GetValue()
1712+
ops[i] = clientv3.OpPut(name, value)
1713+
}
1714+
res, err :=
1715+
kv.NewSlowLogTxn(s.client).Then(ops...).Commit()
1716+
if err != nil {
1717+
return &pdpb.StoreGlobalConfigResponse{Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: err.Error()}}, err
1718+
}
1719+
if !res.Succeeded {
1720+
return &pdpb.StoreGlobalConfigResponse{Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: "failed to execute StoreGlobalConfig transaction"}}, errors.Errorf("failed to execute StoreGlobalConfig transaction")
1721+
}
1722+
return &pdpb.StoreGlobalConfigResponse{}, err
1723+
}
1724+
1725+
// LoadGlobalConfig load global config from etcd
1726+
func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) {
1727+
names := request.Names
1728+
res := make([]*pdpb.GlobalConfigItem, len(names))
1729+
for i, name := range names {
1730+
r, err := s.client.Get(ctx, globalConfigPath+name)
1731+
if err != nil {
1732+
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: err.Error()}}
1733+
} else if len(r.Kvs) == 0 {
1734+
msg := "key " + name + " not found"
1735+
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND, Message: msg}}
1736+
} else {
1737+
res[i] = &pdpb.GlobalConfigItem{Name: name, Value: string(r.Kvs[0].Value)}
1738+
}
1739+
}
1740+
return &pdpb.LoadGlobalConfigResponse{Items: res}, nil
1741+
}
1742+
1743+
// WatchGlobalConfig if the connection of WatchGlobalConfig is end
1744+
// or stoped by whatever reason
1745+
// just reconnect to it.
1746+
func (s *GrpcServer) WatchGlobalConfig(request *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
1747+
ctx, cancel := context.WithCancel(s.Context())
1748+
defer cancel()
1749+
err := s.sendAllGlobalConfig(ctx, server)
1750+
if err != nil {
1751+
return err
1752+
}
1753+
watchChan := s.client.Watch(ctx, globalConfigPath, clientv3.WithPrefix())
1754+
for {
1755+
select {
1756+
case <-ctx.Done():
1757+
return nil
1758+
case res := <-watchChan:
1759+
cfgs := make([]*pdpb.GlobalConfigItem, 0, len(res.Events))
1760+
for _, e := range res.Events {
1761+
if e.Type != clientv3.EventTypePut {
1762+
continue
1763+
}
1764+
cfgs = append(cfgs, &pdpb.GlobalConfigItem{Name: string(e.Kv.Key), Value: string(e.Kv.Value)})
1765+
}
1766+
if len(cfgs) > 0 {
1767+
err := server.Send(&pdpb.WatchGlobalConfigResponse{Changes: cfgs})
1768+
if err != nil {
1769+
return err
1770+
}
1771+
}
1772+
}
1773+
}
1774+
}
1775+
1776+
func (s *GrpcServer) sendAllGlobalConfig(ctx context.Context, server pdpb.PD_WatchGlobalConfigServer) error {
1777+
configList, err := s.client.Get(ctx, globalConfigPath, clientv3.WithPrefix())
1778+
if err != nil {
1779+
return err
1780+
}
1781+
ls := make([]*pdpb.GlobalConfigItem, configList.Count)
1782+
for i, kv := range configList.Kvs {
1783+
ls[i] = &pdpb.GlobalConfigItem{Name: string(kv.Key), Value: string(kv.Value)}
1784+
}
1785+
err = server.Send(&pdpb.WatchGlobalConfigResponse{Changes: ls})
1786+
return err
1787+
}

server/server.go

+5
Original file line numberDiff line numberDiff line change
@@ -1425,3 +1425,8 @@ func (s *Server) SaveTTLConfig(data map[string]interface{}, ttl time.Duration) e
14251425
}
14261426
return nil
14271427
}
1428+
1429+
// SplitAndScatterRegions TODO
1430+
func (s *Server) SplitAndScatterRegions(context context.Context, r *pdpb.SplitAndScatterRegionsRequest) (*pdpb.SplitAndScatterRegionsResponse, error) {
1431+
return nil, errors.New("no implemented")
1432+
}

0 commit comments

Comments
 (0)