From 4366236b34ff87365d4bd55963d5fb4ffb8a1e05 Mon Sep 17 00:00:00 2001 From: husharp Date: Thu, 5 Jan 2023 09:44:55 +0800 Subject: [PATCH] support resource group watch Signed-off-by: husharp --- client/client.go | 2 + client/go.mod | 2 + client/resource_manager_client.go | 74 +++++++++++++++++++ go.mod | 2 +- go.sum | 4 +- .../resource_manager/server/grpc_service.go | 68 +++++++++++++++++ 6 files changed, 149 insertions(+), 3 deletions(-) create mode 100644 client/resource_manager_client.go diff --git a/client/client.go b/client/client.go index b7e15fe6eb23..57df7ede43b0 100644 --- a/client/client.go +++ b/client/client.go @@ -137,6 +137,8 @@ type Client interface { // KeyspaceClient manages keyspace metadata. KeyspaceClient + // ResourceManagerClient manages resource manager metadata. + ResourceManagerClient // Close closes the client. Close() } diff --git a/client/go.mod b/client/go.mod index 09277d77a4d9..79dff615f93e 100644 --- a/client/go.mod +++ b/client/go.mod @@ -14,3 +14,5 @@ require ( go.uber.org/zap v1.20.0 google.golang.org/grpc v1.43.0 ) + +replace github.com/pingcap/kvproto => github.com/HuSharp/kvproto v0.0.0-20230104081348-c74b371d287f diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go new file mode 100644 index 000000000000..05e6510cb8fa --- /dev/null +++ b/client/resource_manager_client.go @@ -0,0 +1,74 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pd + +import ( + "context" + "github.com/pingcap/kvproto/pkg/keyspacepb" + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/pingcap/log" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +// ResourceManagerClient manages keyspace metadata. +type ResourceManagerClient interface { + // WatchResourceGroup watches keyspace meta changes. + WatchResourceGroup(ctx context.Context) (chan []*keyspacepb.KeyspaceMeta, error) +} + +// ResourceManagerClient returns the ResourceManagerClient from current PD leader. +func (c *client) resourceManagerClient() rmpb.ResourceManagerClient { + if cc, ok := c.clientConns.Load(c.GetLeaderAddr()); ok { + return rmpb.NewResourceManagerClient(cc.(*grpc.ClientConn)) + } + return nil +} + +// WatchResourceGroup watches keyspace meta changes. +// It returns a stream of slices of keyspace metadata. +// The first message in stream contains all current keyspaceMeta, +// all subsequent messages contains new put events for all keyspaces. +func (c *client) WatchResourceGroup(ctx context.Context) (chan []*rmpb.ResourceGroup, error) { + resourceGroupWatcherChan := make(chan []*rmpb.ResourceGroup) + req := &rmpb.WatchResourceGroupRequest{} + stream, err := c.resourceManagerClient().WatchResourceGroup(ctx, req) + if err != nil { + close(resourceGroupWatcherChan) + return nil, err + } + go func() { + defer func() { + if r := recover(); r != nil { + log.Error("[pd] panic in keyspace client `WatchResourceGroups`", zap.Any("error", r)) + return + } + }() + for { + select { + case <-ctx.Done(): + close(resourceGroupWatcherChan) + return + default: + resp, err := stream.Recv() + if err != nil { + return + } + resourceGroupWatcherChan <- resp.Group + } + } + }() + return resourceGroupWatcherChan, err +} diff --git a/go.mod b/go.mod index 1a0da186e61d..989a60033cf7 100644 --- a/go.mod +++ b/go.mod @@ -190,4 +190,4 @@ require ( // After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`. // replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch -replace github.com/pingcap/kvproto => github.com/nolouch/kvproto v0.0.0-20230104035728-9dcd8bcfcf2b +replace github.com/pingcap/kvproto => github.com/HuSharp/kvproto v0.0.0-20230104081348-c74b371d287f diff --git a/go.sum b/go.sum index 582c0c3e37b7..6207ad1e9a54 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/AlekSi/gocov-xml v1.0.0 h1:4QctJBgXEkbzeKz6PJy6bt3JSPNSN4I2mITYW+eKUo github.com/AlekSi/gocov-xml v1.0.0/go.mod h1:J0qYeZ6tDg4oZubW9mAAgxlqw39PDfoEkzB3HXSbEuA= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/HuSharp/kvproto v0.0.0-20230104081348-c74b371d287f h1:R2X25LSHyaEi3AZC+QPJ38w3zIsOPGRTAmgbhAiXKb8= +github.com/HuSharp/kvproto v0.0.0-20230104081348-c74b371d287f/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= @@ -330,8 +332,6 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5 h1:BvoENQQU+fZ9uukda/RzCAL/191HHwJA5b13R6diVlY= github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/nolouch/kvproto v0.0.0-20230104035728-9dcd8bcfcf2b h1:2si08qfbhPfz6ZxKmt1tNmp2FDQgh5xTJPELL/Jj0+k= -github.com/nolouch/kvproto v0.0.0-20230104035728-9dcd8bcfcf2b/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/oleiade/reflections v1.0.1 h1:D1XO3LVEYroYskEsoSiGItp9RUxG6jWnCVvrqH0HHQM= github.com/oleiade/reflections v1.0.1/go.mod h1:rdFxbxq4QXVZWj0F+e9jqjDkc7dbp97vkRixKo2JR60= diff --git a/pkg/mcs/resource_manager/server/grpc_service.go b/pkg/mcs/resource_manager/server/grpc_service.go index a0f9c1c5c38d..0a65998a542c 100644 --- a/pkg/mcs/resource_manager/server/grpc_service.go +++ b/pkg/mcs/resource_manager/server/grpc_service.go @@ -16,12 +16,15 @@ package server import ( "context" + "encoding/json" + "github.com/tikv/pd/pkg/utils/syncutil" "net/http" "github.com/pingcap/errors" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/tikv/pd/pkg/mcs/registry" "github.com/tikv/pd/server" + "go.etcd.io/etcd/clientv3" "google.golang.org/grpc" ) @@ -44,6 +47,14 @@ type Service struct { ctx context.Context manager *Manager // settings + *server.GrpcServer + // Backing storage for key dictionary. + etcdClient *clientv3.Client + mu struct { + syncutil.Mutex + // Revision of keys loaded from etcd. Guarded by mu. + groupsRevision int64 + } } // NewService creates a new resource manager service. @@ -72,6 +83,12 @@ func (s *Service) GetManager() *Manager { return s.manager } +func (s *Service) groupsRevision() int64 { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.groupsRevision +} + // GetResourceGroup implements ResourceManagerServer.GetResourceGroup. func (s *Service) GetResourceGroup(ctx context.Context, req *rmpb.GetResourceGroupRequest) (*rmpb.GetResourceGroupResponse, error) { rg := s.manager.GetResourceGroup(req.ResourceGroupName) @@ -114,6 +131,57 @@ func (s *Service) DeleteResourceGroup(ctx context.Context, req *rmpb.DeleteResou return &rmpb.DeleteResourceGroupResponse{Body: "Success!"}, nil } +// WatchResourceGroup implements ResourceManagerServer.DeleteResourceGroup. +func (s *Service) WatchResourceGroup(request *rmpb.WatchResourceGroupRequest, stream rmpb.ResourceManager_WatchResourceGroupServer) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := s.sendAllResourceGroups(ctx, stream) + if err != nil { + return err + } + watchChan := s.etcdClient.Watch(ctx, GroupSettingsPathPrefix, clientv3.WithPrefix(), clientv3.WithRev(s.groupsRevision())) + for { + select { + case <-ctx.Done(): + return nil + case res := <-watchChan: + resourceGroups := make([]*rmpb.ResourceGroup, 0, len(res.Events)) + for _, event := range res.Events { + if event.Type != clientv3.EventTypePut { + continue + } + var group ResourceGroup + if err := json.Unmarshal(event.Kv.Value, &group); err != nil { + panic(err) + } + resourceGroups = append(resourceGroups, group.IntoProtoResourceGroup()) + } + if len(resourceGroups) > 0 { + if err = stream.Send(&rmpb.WatchResourceGroupResponse{Group: resourceGroups}); err != nil { + return err + } + } + } + } +} + +func (s *Service) sendAllResourceGroups(ctx context.Context, stream rmpb.ResourceManager_WatchResourceGroupServer) error { + getResp, err := s.etcdClient.Get(ctx, GroupSettingsPathPrefix, clientv3.WithPrefix()) + if err != nil { + return err + } + groups := make([]*rmpb.ResourceGroup, getResp.Count) + for i, kv := range getResp.Kvs { + var group ResourceGroup + if err = json.Unmarshal(kv.Value, group); err != nil { + return err + } + groups[i] = group.IntoProtoResourceGroup() + } + return stream.Send(&rmpb.WatchResourceGroupResponse{Group: groups}) +} + // ModifyResourceGroup implements ResourceManagerServer.ModifyResourceGroup. func (s *Service) ModifyResourceGroup(ctx context.Context, req *rmpb.PutResourceGroupRequest) (*rmpb.PutResourceGroupResponse, error) { err := s.manager.ModifyResourceGroup(req.GetGroup())