Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support resource group watch #5830

Merged
merged 12 commits into from
Jan 19, 2023
61 changes: 56 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1821,18 +1821,69 @@ func trimHTTPPrefix(str string) string {
}

func (c *client) LoadGlobalConfig(ctx context.Context, configPath string) ([]GlobalConfigItem, int64, error) {
// TODO: complete this function with new implementation.
return nil, 0, nil
resp, err := c.getClient().LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{ConfigPath: configPath})
if err != nil {
return nil, 0, err
}

res := make([]GlobalConfigItem, len(resp.GetItems()))
for i, item := range resp.GetItems() {
cfg := GlobalConfigItem{Name: item.GetName()}
cfg.Value = item.GetValue()
res[i] = cfg
}
return res, resp.GetRevision(), nil
}

func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error {
// TODO: complete this function with new implementation.
resArr := make([]*pdpb.GlobalConfigItem, len(items))
for i, it := range items {
resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value, Kind: it.EventType}
}
_, err := c.getClient().StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: resArr, ConfigPath: configPath})
if err != nil {
return err
}
return nil
}

func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error) {
// TODO: complete this function with new implementation.
return nil, nil
// TODO: Add retry mechanism
// register watch components there
globalConfigWatcherCh := make(chan []GlobalConfigItem, 16)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to return the max revision of each []GlobalConfigItem so the caller can do incremental retry?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since resource client is just for testing, I will add a retry mechanism after I solve pd client/tidb/CDC compatibility...

res, err := c.getClient().WatchGlobalConfig(ctx, &pdpb.WatchGlobalConfigRequest{
ConfigPath: configPath,
Revision: revision,
})
if err != nil {
close(globalConfigWatcherCh)
return nil, err
}
go func() {
defer func() {
close(globalConfigWatcherCh)
if r := recover(); r != nil {
log.Error("[pd] panic in client `WatchGlobalConfig`", zap.Any("error", r))
return
}
}()
for {
m, err := res.Recv()
if err != nil {
return
}
arr := make([]GlobalConfigItem, len(m.Changes))
for j, i := range m.Changes {
arr[j] = GlobalConfigItem{i.GetKind(), i.GetName(), i.GetValue()}
}
select {
case <-ctx.Done():
return
case globalConfigWatcherCh <- arr:
}
}
}()
return globalConfigWatcherCh, err
}

func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
Expand Down
3 changes: 2 additions & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ module github.com/tikv/pd/client
go 1.16

require (
github.com/gogo/protobuf v1.3.2
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6
github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.0
github.com/stretchr/testify v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6 h1:jilku71qYv56a7uM+Q3AJxf4J+iXe3aGovkd0f8hcgc=
github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934 h1:LB+BrfyO5fsz5pwN3V4HvTrpZTAmsjB4VkCEBLbjYUw=
github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
60 changes: 55 additions & 5 deletions client/resourcemanager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"context"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"go.uber.org/zap"
Expand All @@ -28,8 +30,9 @@ import (
type actionType int

const (
add actionType = 0
modify actionType = 1
add actionType = 0
modify actionType = 1
groupSettingsPathPrefix = "resource_group/settings"
)

// ResourceManagerClient manages resource group info and token request.
Expand All @@ -39,6 +42,7 @@ type ResourceManagerClient interface {
AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error)
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
}

Expand Down Expand Up @@ -124,12 +128,58 @@ func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName stri
return resp.GetBody(), nil
}

// WatchResourceGroup [just for TEST] watches resource groups changes.
// It returns a stream of slices of resource groups.
// The first message in stream contains all current resource groups,
// all subsequent messages contains new events[PUT/DELETE] for all resource groups.
func (c *client) WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) {
configChan, err := c.WatchGlobalConfig(ctx, groupSettingsPathPrefix, revision)
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
resourceGroupWatcherChan := make(chan []*rmpb.ResourceGroup)
go func() {
defer func() {
close(resourceGroupWatcherChan)
if r := recover(); r != nil {
log.Error("[pd] panic in ResourceManagerClient `WatchResourceGroups`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
return
case res, ok := <-configChan:
if !ok {
return
}
groups := make([]*rmpb.ResourceGroup, 0, len(res))
for _, item := range res {
switch item.EventType {
case pdpb.EventType_PUT:
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal([]byte(item.Value), group); err != nil {
return
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
}
groups = append(groups, group)
case pdpb.EventType_DELETE:
continue
}
}
resourceGroupWatcherChan <- groups
}
}
}()
return resourceGroupWatcherChan, err
}

func (c *client) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) {
req := &tokenRequest{
done: make(chan error, 1),
requestCtx: ctx,
clientCtx: c.ctx,
Requeset: request,
Request: request,
}
c.tokenDispatcher.tokenBatchController.tokenRequestCh <- req
grantedTokens, err := req.Wait()
Expand All @@ -143,7 +193,7 @@ type tokenRequest struct {
clientCtx context.Context
requestCtx context.Context
done chan error
Requeset *rmpb.TokenBucketsRequest
Request *rmpb.TokenBucketsRequest
TokenBuckets []*rmpb.TokenBucketResponse
}

Expand Down Expand Up @@ -232,7 +282,7 @@ func (c *client) handleResourceTokenDispatcher(dispatcherCtx context.Context, tb
}

func (c *client) processTokenRequests(stream rmpb.ResourceManager_AcquireTokenBucketsClient, t *tokenRequest) error {
req := t.Requeset
req := t.Request
if err := stream.Send(req); err != nil {
err = errors.WithStack(err)
t.done <- err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6
github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d
github.com/pingcap/tidb-dashboard v0.0.0-20221201151320-ea3ee6971f2e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMtVcOkjUcuQKh+YrluSo7+7YMCQSzy30=
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6 h1:jilku71qYv56a7uM+Q3AJxf4J+iXe3aGovkd0f8hcgc=
github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934 h1:LB+BrfyO5fsz5pwN3V4HvTrpZTAmsjB4VkCEBLbjYUw=
github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
Expand Down
75 changes: 39 additions & 36 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ const (
// tso
maxMergeTSORequests = 10000
defaultTSOProxyTimeout = 3 * time.Second

// global config
globalConfigPath = "/global/config/"
)

// gRPC errors
Expand Down Expand Up @@ -1888,9 +1885,13 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan
func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) {
ops := make([]clientv3.Op, len(request.Changes))
for i, item := range request.Changes {
name := globalConfigPath + item.GetName()
value := item.GetValue()
ops[i] = clientv3.OpPut(name, value)
name := item.GetName()
switch item.GetKind() {
case pdpb.EventType_PUT:
ops[i] = clientv3.OpPut(s.GetFinalPathWithinPD(request.GetConfigPath()+name), item.GetValue())
case pdpb.EventType_DELETE:
ops[i] = clientv3.OpDelete(s.GetFinalPathWithinPD(request.GetConfigPath() + name))
}
}
res, err :=
kv.NewSlowLogTxn(s.client).Then(ops...).Commit()
Expand All @@ -1900,61 +1901,63 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo
if !res.Succeeded {
return &pdpb.StoreGlobalConfigResponse{}, errors.Errorf("failed to execute StoreGlobalConfig transaction")
}
return &pdpb.StoreGlobalConfigResponse{}, err
return &pdpb.StoreGlobalConfigResponse{}, nil
}

// LoadGlobalConfig load global config from etcd
func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) {
// TODO: complete this function with new implementation
return &pdpb.LoadGlobalConfigResponse{}, nil
configPath := request.GetConfigPath()
r, err := s.client.Get(ctx, s.GetFinalPathWithinPD(configPath), clientv3.WithPrefix())
if err != nil {
return &pdpb.LoadGlobalConfigResponse{}, err
}
res := make([]*pdpb.GlobalConfigItem, len(r.Kvs))
for i, value := range r.Kvs {
res[i] = &pdpb.GlobalConfigItem{Kind: pdpb.EventType_PUT, Name: string(value.Key), Value: string(value.Value)}
}
return &pdpb.LoadGlobalConfigResponse{Items: res, Revision: r.Header.GetRevision()}, nil
}

// WatchGlobalConfig if the connection of WatchGlobalConfig is end
// or stoped by whatever reason
// just reconnect to it.
func (s *GrpcServer) WatchGlobalConfig(_ *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
// or stopped by whatever reason, just reconnect to it.
// Watch on revision which greater than or equal to the required revision.
func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
ctx, cancel := context.WithCancel(s.Context())
defer cancel()
err := s.sendAllGlobalConfig(ctx, server)
if err != nil {
return err
}
watchChan := s.client.Watch(ctx, globalConfigPath, clientv3.WithPrefix())
revision := req.GetRevision()
// If the revision is compacted, will meet required revision has been compacted error.
// - If required revision < CompactRevision, we need to reload all configs to avoid losing data.
// - If required revision >= CompactRevision, just keep watching.
watchChan := s.client.Watch(ctx, s.GetFinalPathWithinPD(req.GetConfigPath()), clientv3.WithPrefix(), clientv3.WithRev(revision))
for {
select {
case <-ctx.Done():
return nil
case res := <-watchChan:
if revision < res.CompactRevision {
if err := server.Send(&pdpb.WatchGlobalConfigResponse{
Revision: res.CompactRevision,
Header: s.wrapErrorToHeader(pdpb.ErrorType_DATA_COMPACTED,
fmt.Sprintf("required watch revision: %d is smaller than current compact/min revision. %d", revision, res.CompactRevision)),
}); err != nil {
return err
}
}
revision = res.Header.GetRevision()

cfgs := make([]*pdpb.GlobalConfigItem, 0, len(res.Events))
for _, e := range res.Events {
if e.Type != clientv3.EventTypePut {
continue
}
cfgs = append(cfgs, &pdpb.GlobalConfigItem{Name: string(e.Kv.Key), Value: string(e.Kv.Value)})
cfgs = append(cfgs, &pdpb.GlobalConfigItem{Name: string(e.Kv.Key), Value: string(e.Kv.Value), Kind: pdpb.EventType(e.Type)})
}
if len(cfgs) > 0 {
err := server.Send(&pdpb.WatchGlobalConfigResponse{Changes: cfgs})
if err != nil {
if err := server.Send(&pdpb.WatchGlobalConfigResponse{Changes: cfgs, Revision: res.Header.GetRevision()}); err != nil {
return err
}
}
}
}
}

func (s *GrpcServer) sendAllGlobalConfig(ctx context.Context, server pdpb.PD_WatchGlobalConfigServer) error {
configList, err := s.client.Get(ctx, globalConfigPath, clientv3.WithPrefix())
if err != nil {
return err
}
ls := make([]*pdpb.GlobalConfigItem, configList.Count)
for i, kv := range configList.Kvs {
ls[i] = &pdpb.GlobalConfigItem{Name: string(kv.Key), Value: string(kv.Value)}
}
err = server.Send(&pdpb.WatchGlobalConfigResponse{Changes: ls})
return err
}

// Evict the leaders when the store is damaged. Damaged regions are emergency errors
// and requires user to manually remove the `evict-leader-scheduler` with pd-ctl
func (s *GrpcServer) handleDamagedStore(stats *pdpb.StoreStats) {
Expand Down
5 changes: 5 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,11 @@ func (s *Server) GetHTTPClient() *http.Client {
return s.httpClient
}

// GetFinalPathWithinPD returns the etcd path.
func (s *Server) GetFinalPathWithinPD(configPath string) string {
return strings.Join([]string{s.rootPath, configPath}, "/")
}

// GetLeader returns the leader of PD cluster(i.e the PD leader).
func (s *Server) GetLeader() *pdpb.Member {
return s.member.GetLeader()
Expand Down
Loading