Skip to content

Commit

Permalink
support resource group watch
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Jan 5, 2023
1 parent f65ad52 commit 4366236
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 3 deletions.
2 changes: 2 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ type Client interface {

// KeyspaceClient manages keyspace metadata.
KeyspaceClient
// ResourceManagerClient manages resource manager metadata.
ResourceManagerClient
// Close closes the client.
Close()
}
Expand Down
2 changes: 2 additions & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ require (
go.uber.org/zap v1.20.0
google.golang.org/grpc v1.43.0
)

replace github.com/pingcap/kvproto => github.com/HuSharp/kvproto v0.0.0-20230104081348-c74b371d287f
74 changes: 74 additions & 0 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pd

import (
"context"
"github.com/pingcap/kvproto/pkg/keyspacepb"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"go.uber.org/zap"
"google.golang.org/grpc"
)

// ResourceManagerClient manages keyspace metadata.
type ResourceManagerClient interface {
// WatchResourceGroup watches keyspace meta changes.
WatchResourceGroup(ctx context.Context) (chan []*keyspacepb.KeyspaceMeta, error)
}

// ResourceManagerClient returns the ResourceManagerClient from current PD leader.
func (c *client) resourceManagerClient() rmpb.ResourceManagerClient {
if cc, ok := c.clientConns.Load(c.GetLeaderAddr()); ok {
return rmpb.NewResourceManagerClient(cc.(*grpc.ClientConn))
}
return nil
}

// WatchResourceGroup watches keyspace meta changes.
// It returns a stream of slices of keyspace metadata.
// The first message in stream contains all current keyspaceMeta,
// all subsequent messages contains new put events for all keyspaces.
func (c *client) WatchResourceGroup(ctx context.Context) (chan []*rmpb.ResourceGroup, error) {
resourceGroupWatcherChan := make(chan []*rmpb.ResourceGroup)
req := &rmpb.WatchResourceGroupRequest{}
stream, err := c.resourceManagerClient().WatchResourceGroup(ctx, req)
if err != nil {
close(resourceGroupWatcherChan)
return nil, err
}
go func() {
defer func() {
if r := recover(); r != nil {
log.Error("[pd] panic in keyspace client `WatchResourceGroups`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
close(resourceGroupWatcherChan)
return
default:
resp, err := stream.Recv()
if err != nil {
return
}
resourceGroupWatcherChan <- resp.Group
}
}
}()
return resourceGroupWatcherChan, err
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,4 @@ require (
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch

replace github.com/pingcap/kvproto => github.com/nolouch/kvproto v0.0.0-20230104035728-9dcd8bcfcf2b
replace github.com/pingcap/kvproto => github.com/HuSharp/kvproto v0.0.0-20230104081348-c74b371d287f
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ github.com/AlekSi/gocov-xml v1.0.0 h1:4QctJBgXEkbzeKz6PJy6bt3JSPNSN4I2mITYW+eKUo
github.com/AlekSi/gocov-xml v1.0.0/go.mod h1:J0qYeZ6tDg4oZubW9mAAgxlqw39PDfoEkzB3HXSbEuA=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/HuSharp/kvproto v0.0.0-20230104081348-c74b371d287f h1:R2X25LSHyaEi3AZC+QPJ38w3zIsOPGRTAmgbhAiXKb8=
github.com/HuSharp/kvproto v0.0.0-20230104081348-c74b371d287f/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
Expand Down Expand Up @@ -330,8 +332,6 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5 h1:BvoENQQU+fZ9uukda/RzCAL/191HHwJA5b13R6diVlY=
github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nolouch/kvproto v0.0.0-20230104035728-9dcd8bcfcf2b h1:2si08qfbhPfz6ZxKmt1tNmp2FDQgh5xTJPELL/Jj0+k=
github.com/nolouch/kvproto v0.0.0-20230104035728-9dcd8bcfcf2b/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/oleiade/reflections v1.0.1 h1:D1XO3LVEYroYskEsoSiGItp9RUxG6jWnCVvrqH0HHQM=
github.com/oleiade/reflections v1.0.1/go.mod h1:rdFxbxq4QXVZWj0F+e9jqjDkc7dbp97vkRixKo2JR60=
Expand Down
68 changes: 68 additions & 0 deletions pkg/mcs/resource_manager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ package server

import (
"context"
"encoding/json"
"github.com/tikv/pd/pkg/utils/syncutil"
"net/http"

"github.com/pingcap/errors"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/server"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
)

Expand All @@ -44,6 +47,14 @@ type Service struct {
ctx context.Context
manager *Manager
// settings
*server.GrpcServer
// Backing storage for key dictionary.
etcdClient *clientv3.Client
mu struct {
syncutil.Mutex
// Revision of keys loaded from etcd. Guarded by mu.
groupsRevision int64
}
}

// NewService creates a new resource manager service.
Expand Down Expand Up @@ -72,6 +83,12 @@ func (s *Service) GetManager() *Manager {
return s.manager
}

func (s *Service) groupsRevision() int64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.groupsRevision
}

// GetResourceGroup implements ResourceManagerServer.GetResourceGroup.
func (s *Service) GetResourceGroup(ctx context.Context, req *rmpb.GetResourceGroupRequest) (*rmpb.GetResourceGroupResponse, error) {
rg := s.manager.GetResourceGroup(req.ResourceGroupName)
Expand Down Expand Up @@ -114,6 +131,57 @@ func (s *Service) DeleteResourceGroup(ctx context.Context, req *rmpb.DeleteResou
return &rmpb.DeleteResourceGroupResponse{Body: "Success!"}, nil
}

// WatchResourceGroup implements ResourceManagerServer.DeleteResourceGroup.
func (s *Service) WatchResourceGroup(request *rmpb.WatchResourceGroupRequest, stream rmpb.ResourceManager_WatchResourceGroupServer) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := s.sendAllResourceGroups(ctx, stream)
if err != nil {
return err
}
watchChan := s.etcdClient.Watch(ctx, GroupSettingsPathPrefix, clientv3.WithPrefix(), clientv3.WithRev(s.groupsRevision()))
for {
select {
case <-ctx.Done():
return nil
case res := <-watchChan:
resourceGroups := make([]*rmpb.ResourceGroup, 0, len(res.Events))
for _, event := range res.Events {
if event.Type != clientv3.EventTypePut {
continue
}
var group ResourceGroup
if err := json.Unmarshal(event.Kv.Value, &group); err != nil {
panic(err)
}
resourceGroups = append(resourceGroups, group.IntoProtoResourceGroup())
}
if len(resourceGroups) > 0 {
if err = stream.Send(&rmpb.WatchResourceGroupResponse{Group: resourceGroups}); err != nil {
return err
}
}
}
}
}

func (s *Service) sendAllResourceGroups(ctx context.Context, stream rmpb.ResourceManager_WatchResourceGroupServer) error {
getResp, err := s.etcdClient.Get(ctx, GroupSettingsPathPrefix, clientv3.WithPrefix())
if err != nil {
return err
}
groups := make([]*rmpb.ResourceGroup, getResp.Count)
for i, kv := range getResp.Kvs {
var group ResourceGroup
if err = json.Unmarshal(kv.Value, group); err != nil {
return err
}
groups[i] = group.IntoProtoResourceGroup()
}
return stream.Send(&rmpb.WatchResourceGroupResponse{Group: groups})
}

// ModifyResourceGroup implements ResourceManagerServer.ModifyResourceGroup.
func (s *Service) ModifyResourceGroup(ctx context.Context, req *rmpb.PutResourceGroupRequest) (*rmpb.PutResourceGroupResponse, error) {
err := s.manager.ModifyResourceGroup(req.GetGroup())
Expand Down

0 comments on commit 4366236

Please sign in to comment.