Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 committed Apr 11, 2023
1 parent c85cbb0 commit 2d186a4
Show file tree
Hide file tree
Showing 12 changed files with 320 additions and 188 deletions.
2 changes: 1 addition & 1 deletion pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func makeCreateKeyspaceRequests(count int) []*CreateKeyspaceRequest {
requests := make([]*CreateKeyspaceRequest, count)
for i := 0; i < count; i++ {
requests[i] = &CreateKeyspaceRequest{
Name: fmt.Sprintf("test_keyspace%d", i),
Name: fmt.Sprintf("test_keyspace_%d", i),
Config: map[string]string{
testConfig1: "100",
testConfig2: "200",
Expand Down
209 changes: 127 additions & 82 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ package keyspace

import (
"context"
"encoding/json"
"strconv"
"sync"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/balancer"
"github.com/pingcap/errors"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/slice"
Expand All @@ -38,6 +38,10 @@ const (
defaultBalancerPolicy = balancer.PolicyRoundRobin
allocNodeTimeout = 1 * time.Second
allocNodeInterval = 10 * time.Millisecond
// TODO: move it to etcdutil
watchEtcdChangeRetryInterval = 1 * time.Second
maxRetryTimes = 25
retryInterval = 100 * time.Millisecond
)

const (
Expand All @@ -55,6 +59,7 @@ type GroupManager struct {
// groups is the cache of keyspace group related information.
// user kind -> keyspace group
groups map[endpoint.UserKind]*indexedHeap

// store is the storage for keyspace group related information.
store endpoint.KeyspaceGroupStorage

Expand All @@ -67,26 +72,28 @@ type GroupManager struct {

policy balancer.Policy

// TODO: add user kind with different balancer
// when we ensure where the correspondence between tso node and user kind will be found
nodesBalancer balancer.Balancer[string]
}

// NewKeyspaceGroupManager creates a Manager of keyspace group related data.
func NewKeyspaceGroupManager(ctx context.Context, store endpoint.KeyspaceGroupStorage, client *clientv3.Client, clusterID uint64) *GroupManager {
ctx, cancel := context.WithCancel(ctx)
key := discovery.TSOAddrsPath(clusterID)
key := discovery.TSOPath(clusterID)
groups := make(map[endpoint.UserKind]*indexedHeap)
for i := 0; i < int(endpoint.UserKindCount); i++ {
groups[endpoint.UserKind(i)] = newIndexedHeap(int(utils.MaxKeyspaceGroupCountInUse))
}
return &GroupManager{
ctx: ctx,
cancel: cancel,
groups: groups,
store: store,
client: client,
tsoServiceKey: key,
tsoServiceEndKey: clientv3.GetPrefixRangeEnd(key) + "/",
policy: defaultBalancerPolicy,
groups: groups,
}
}

Expand Down Expand Up @@ -125,17 +132,11 @@ func (m *GroupManager) Bootstrap() error {
m.groups[userKind].Put(group)
}

// If the etcd client is not nil, start the watch loop.
if m.client != nil {
m.nodesBalancer = balancer.GenByPolicy[string](m.policy)
resp, err := etcdutil.EtcdKVGet(m.client, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey))
if err != nil {
return err
}
for _, item := range resp.Kvs {
m.nodesBalancer.Put(string(item.Value))
}
m.wg.Add(1)
go m.startWatchLoop(resp.Header.GetRevision())
go m.startWatchLoop()
}
return nil
}
Expand All @@ -146,49 +147,88 @@ func (m *GroupManager) Close() {
m.wg.Wait()
}

func (m *GroupManager) startWatchLoop(revision int64) {
func (m *GroupManager) startWatchLoop() {
defer m.wg.Done()
if err := m.watchServiceAddrs(revision); err != nil {
log.Error("watch service addresses failed", zap.Error(err))
}
}

func (m *GroupManager) watchServiceAddrs(revision int64) error {
ctx, cancel := context.WithCancel(m.ctx)
defer cancel()
var (
revision int64
err error
)
for i := 0; i < maxRetryTimes; i++ {
select {
case <-ctx.Done():
return
case <-time.After(retryInterval):
}
resp, err := etcdutil.EtcdKVGet(m.client, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey))
if err == nil {
revision = resp.Header.Revision
for _, item := range resp.Kvs {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(item.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry", zap.Error(err))
continue
}
m.nodesBalancer.Put(s.ServiceAddr)
}
break
}
}
if err != nil {
log.Warn("failed to get tso service addrs from etcd", zap.Error(err))
}
for {
select {
case <-ctx.Done():
return nil
return
default:
}
watchChan := m.client.Watch(ctx, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey), clientv3.WithRev(revision))
for {
select {
case <-ctx.Done():
return nil
case wresp := <-watchChan:
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, the watcher will watch again with the compact revision",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
break
}
if wresp.Err() != nil {
log.Error("watch is canceled or closed",
zap.Int64("required-revision", revision),
zap.Error(wresp.Err()))
return wresp.Err()
nextRevision, err := m.watchServiceAddrs(ctx, revision)
if err != nil {
log.Error("watcher canceled unexpectedly and a new watcher will start after a while",
zap.Int64("next-revision", nextRevision),
zap.Time("retry-at", time.Now().Add(watchEtcdChangeRetryInterval)),
zap.Error(err))
revision = nextRevision
time.Sleep(watchEtcdChangeRetryInterval)
}
}
}

func (m *GroupManager) watchServiceAddrs(ctx context.Context, revision int64) (int64, error) {
watcher := clientv3.NewWatcher(m.client)
defer watcher.Close()
for {
WatchChan:
watchChan := watcher.Watch(ctx, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey), clientv3.WithRev(revision))
select {
case <-ctx.Done():
return revision, nil
case wresp := <-watchChan:
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, the watcher will watch again with the compact revision",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
goto WatchChan
}
if wresp.Err() != nil {
log.Error("watch is canceled or closed",
zap.Int64("required-revision", revision),
zap.Error(wresp.Err()))
return revision, wresp.Err()
}
for _, event := range wresp.Events {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(event.Kv.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry", zap.Error(err))
}
for _, event := range wresp.Events {
addr := string(event.Kv.Value)
switch event.Type {
case clientv3.EventTypePut:
m.nodesBalancer.Put(addr)
case clientv3.EventTypeDelete:
m.nodesBalancer.Delete(addr)
}
switch event.Type {
case clientv3.EventTypePut:
m.nodesBalancer.Put(s.ServiceAddr)
case clientv3.EventTypeDelete:
m.nodesBalancer.Delete(s.ServiceAddr)
}
}
}
Expand All @@ -199,22 +239,6 @@ func (m *GroupManager) watchServiceAddrs(revision int64) error {
func (m *GroupManager) CreateKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGroup) error {
m.Lock()
defer m.Unlock()
enableAllocate := true
failpoint.Inject("disableAllocate", func() {
enableAllocate = false
})
if enableAllocate {
for _, keyspaceGroup := range keyspaceGroups {
// TODO: consider the case that the node offline
members := m.AllocNodesForGroup(keyspaceGroup.Replica)
if len(members) == 0 {
// directly return error if no available node.
// It means that the number of nodes is reducing between the check of controller and the execution of this function.
return errNoAvailableNode
}
keyspaceGroup.Members = members
}
}
if err := m.saveKeyspaceGroups(keyspaceGroups, false); err != nil {
return err
}
Expand Down Expand Up @@ -294,10 +318,9 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro
return ErrKeyspaceGroupExists
}
newKG := &endpoint.KeyspaceGroup{
ID: keyspaceGroup.ID,
UserKind: keyspaceGroup.UserKind,
ID: keyspaceGroup.ID,
UserKind: keyspaceGroup.UserKind,
Keyspaces: keyspaceGroup.Keyspaces,
Replica: keyspaceGroup.Replica,
}
m.store.SaveKeyspaceGroup(txn, newKG)
}
Expand Down Expand Up @@ -401,30 +424,52 @@ func (m *GroupManager) GetNodesNum() int {
return len(m.nodesBalancer.GetAll())
}

// AllocNodesForGroup allocates nodes for the keyspace group.
// Note: the replica should be less than the number of nodes.
func (m *GroupManager) AllocNodesForGroup(replica int) []endpoint.KeyspaceGroupMember {
// AllocNodesForKeyspaceGroup allocates nodes for the keyspace group.
func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]endpoint.KeyspaceGroupMember, error) {
ctx, cancel := context.WithTimeout(m.ctx, allocNodeTimeout)
defer cancel()
ticker := time.NewTicker(allocNodeInterval)
defer ticker.Stop()
exists := make(map[string]struct{})
nodes := make([]endpoint.KeyspaceGroupMember, 0, replica)
for len(exists) < replica {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err := m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err
}
addr := m.nodesBalancer.Next()
if addr == "" { // no node
return nil
if kg == nil {
return ErrKeyspaceGroupNotExists
}
if _, ok := exists[addr]; ok {
continue
exists := make(map[string]struct{})
for _, member := range kg.Members {
exists[member.Address] = struct{}{}
nodes = append(nodes, member)
}
for len(exists) < replica {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
}
num := len(m.nodesBalancer.GetAll())
if num < replica || num == 0 { // double check
return errNoAvailableNode
}
addr := m.nodesBalancer.Next()
if addr == "" {
return errNoAvailableNode
}
if _, ok := exists[addr]; ok {
continue
}
exists[addr] = struct{}{}
nodes = append(nodes, endpoint.KeyspaceGroupMember{Address: addr})
}
exists[addr] = struct{}{}
nodes = append(nodes, endpoint.KeyspaceGroupMember{Address: addr})
kg.Members = nodes
m.store.SaveKeyspaceGroup(txn, kg)
return nil
})
if err != nil {
return nil, err
}
return nodes
return nodes, nil
}
3 changes: 0 additions & 3 deletions pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/mock/mockconfig"
Expand All @@ -42,7 +41,6 @@ func TestKeyspaceGroupTestSuite(t *testing.T) {
}

func (suite *keyspaceGroupTestSuite) SetupTest() {
suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/disableAllocate", "return(true)"))
suite.ctx, suite.cancel = context.WithCancel(context.Background())
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
suite.kgm = NewKeyspaceGroupManager(suite.ctx, store, nil, 0)
Expand All @@ -53,7 +51,6 @@ func (suite *keyspaceGroupTestSuite) SetupTest() {
}

func (suite *keyspaceGroupTestSuite) TearDownTest() {
suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/disableAllocate"))
suite.cancel()
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ var (
ErrKeyspaceExists = errors.New("keyspace already exists")
// ErrKeyspaceGroupExists indicates target keyspace group already exists.
ErrKeyspaceGroupExists = errors.New("keyspace group already exists")
errModifyDefault = errors.New("cannot modify default keyspace's state")
errIllegalOperation = errors.New("unknown operation")
errNoAvailableNode = errors.New("no available node")
// ErrKeyspaceGroupNotExists indicates target keyspace group does not exist.
ErrKeyspaceGroupNotExists = errors.New("keyspace group does not exist")
errModifyDefault = errors.New("cannot modify default keyspace's state")
errIllegalOperation = errors.New("unknown operation")
errNoAvailableNode = errors.New("no available node")

// stateTransitionTable lists all allowed next state for the given current state.
// Note that transit from any state to itself is allowed for idempotence.
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/discovery/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func discoveryPath(clusterID, serviceName string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey}, "/")
}

// TSOAddrsPath returns the path to store TSO addresses.
func TSOAddrsPath(clusterID uint64) string {
// TSOPath returns the path to store TSO addresses.
func TSOPath(clusterID uint64) string {
return discoveryPath(strconv.FormatUint(clusterID, 10), "tso") + "/"
}
2 changes: 0 additions & 2 deletions pkg/storage/endpoint/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ type KeyspaceGroup struct {
// KeyspaceLookupTable is for fast lookup if a given keyspace belongs to this keyspace group.
// It's not persisted and will be built when loading from storage.
KeyspaceLookupTable map[uint32]struct{} `json:"-"`
// Replica is the tso replica count of the keyspace group.
Replica int `json:"replica"`
}

// KeyspaceGroupStorage is the interface for keyspace group storage.
Expand Down
Loading

0 comments on commit 2d186a4

Please sign in to comment.