Skip to content

Commit

Permalink
Added unitest for watching and dynamically applying keyspace group me…
Browse files Browse the repository at this point in the history
…ta changes

Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Apr 4, 2023
1 parent 96ec000 commit 584e550
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 60 deletions.
109 changes: 51 additions & 58 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -153,12 +152,6 @@ func NewKeyspaceGroupManager(

// Initialize this KeyspaceGroupManager
func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error {
kgm.legacySvcStorage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil)
if kgm.legacySvcStorage == nil {
return errors.New("failed to create storage endpoint for keyspace group metadata")
}

// Initialize the default keyspace group if not loading from storage
if !loadFromStorage {
group := &endpoint.KeyspaceGroup{
Expand Down Expand Up @@ -243,13 +236,12 @@ func (kgm *KeyspaceGroupManager) checkInitProgress(ctx context.Context, cancel c
func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, error) {
var (
// The start revision for watching keyspace group membership/distribution change
watchStartRevision int64
groups []*endpoint.KeyspaceGroup
more bool
err error
keyspaceGroupsLoaded uint32
keyspaceGroupsApplied uint32
revision int64
watchStartRevision int64
groups []*endpoint.KeyspaceGroup
more bool
err error
keyspaceGroupsLoaded uint32
revision int64
)

// Load all keyspace groups from etcd and apply the ones assigned to this tso service.
Expand All @@ -273,20 +265,15 @@ func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, err
default:
}

if kgm.isAssignedToMe(group) {
keyspaceGroupsApplied++
kgm.updateKeyspaceGroup(group)
}
kgm.updateKeyspaceGroup(group)
}

if !more {
break
}
}

log.Info("loaded keyspace groups",
zap.Uint32("keyspace-groups-loaded", keyspaceGroupsLoaded),
zap.Uint32("keyspace-groups-applied", keyspaceGroupsApplied))
log.Info("loaded keyspace groups", zap.Uint32("keyspace-groups-loaded", keyspaceGroupsLoaded))
return watchStartRevision, nil
}

Expand Down Expand Up @@ -394,7 +381,7 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (
log.Warn("failed to unmarshal keyspace group",
zap.Uint32("keysapce-group-id", id),
zap.Error(errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause()))
} else if kgm.isAssignedToMe(group) {
} else {
kgm.updateKeyspaceGroup(group)
}
case clientv3.EventTypeDelete:
Expand All @@ -414,7 +401,7 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (

func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool {
for _, member := range group.Members {
if len(member.Address) > 0 && member.Address == kgm.tsoServiceID.ServiceAddr {
if member.Address == kgm.tsoServiceID.ServiceAddr {
return true
}
}
Expand All @@ -430,45 +417,51 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
return
}

if kgm.ams[group.ID].Load() != nil {
log.Info("keyspace group already initialized, so update meta only",
zap.Uint32("keyspace-group-id", group.ID))
kgm.ksgs[group.ID].Store(group)
return
}

uniqueName := fmt.Sprintf("%s-%05d", kgm.electionNamePrefix, group.ID)
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election",
zap.Uint32("keyspace-group-id", group.ID),
zap.String("participant-name", uniqueName),
zap.Uint64("participant-id", uniqueID))
assignedToMe := kgm.isAssignedToMe(group)
if assignedToMe {
if kgm.ams[group.ID].Load() != nil {
log.Info("keyspace group already initialized, so update meta only",
zap.Uint32("keyspace-group-id", group.ID))
kgm.ksgs[group.ID].Store(group)
return
}

participant := member.NewParticipant(kgm.etcdClient)
participant.InitInfo(
uniqueName, uniqueID, path.Join(kgm.tsoSvcRootPath, fmt.Sprintf("%05d", group.ID)),
primaryElectionSuffix, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())
uniqueName := fmt.Sprintf("%s-%05d", kgm.electionNamePrefix, group.ID)
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election",
zap.Uint32("keyspace-group-id", group.ID),
zap.String("participant-name", uniqueName),
zap.Uint64("participant-id", uniqueID))

participant := member.NewParticipant(kgm.etcdClient)
participant.InitInfo(
uniqueName, uniqueID, path.Join(kgm.tsoSvcRootPath, fmt.Sprintf("%05d", group.ID)),
primaryElectionSuffix, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())

// Only the default keyspace group uses the legacy service root path for LoadTimestamp/SyncTimestamp.
var (
tsRootPath string
storage *endpoint.StorageEndpoint
)
if group.ID == mcsutils.DefaultKeySpaceGroupID {
tsRootPath = kgm.legacySvcRootPath
storage = kgm.legacySvcStorage
} else {
tsRootPath = kgm.tsoSvcRootPath
storage = kgm.tsoSvcStorage
}

// Only the default keyspace group uses the legacy service root path for LoadTimestamp/SyncTimestamp.
var (
tsRootPath string
storage *endpoint.StorageEndpoint
)
if group.ID == mcsutils.DefaultKeySpaceGroupID {
tsRootPath = kgm.legacySvcRootPath
storage = kgm.legacySvcStorage
kgm.ams[group.ID].Store(
NewAllocatorManager(
kgm.ctx, true, group.ID, participant, tsRootPath, storage,
kgm.cfg.IsLocalTSOEnabled(), kgm.cfg.GetTSOSaveInterval(),
kgm.cfg.GetTSOUpdatePhysicalInterval(), kgm.cfg.GetLeaderLease(),
kgm.cfg.GetTLSConfig(), kgm.maxResetTSGap))
kgm.ksgs[group.ID].Store(group)
} else {
tsRootPath = kgm.tsoSvcRootPath
storage = kgm.tsoSvcStorage
// Not assigned to me. If this host/pod owns this keyspace group, it should resign.
kgm.deleteKeyspaceGroup(group.ID)
}

kgm.ams[group.ID].Store(
NewAllocatorManager(
kgm.ctx, true, group.ID, participant, tsRootPath, storage,
kgm.cfg.IsLocalTSOEnabled(), kgm.cfg.GetTSOSaveInterval(),
kgm.cfg.GetTSOUpdatePhysicalInterval(), kgm.cfg.GetLeaderLease(),
kgm.cfg.GetTLSConfig(), kgm.maxResetTSGap))
kgm.ksgs[group.ID].Store(group)
}

// deleteKeyspaceGroup deletes the given keyspace group.
Expand Down
136 changes: 134 additions & 2 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"math/rand"
"path"
"reflect"
"sort"
"strconv"
"strings"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/testutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/goleak"
)

Expand Down Expand Up @@ -166,6 +168,101 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadWithDifferentBatchSize() {
}
}

// TestWatchAndDynamicallyApplyChanges tests the keyspace group manager watch and dynamically apply
// keyspace groups' membership/distribution meta changes.
func (suite *keyspaceGroupManagerTestSuite) TestWatchAndDynamicallyApplyChanges() {
re := suite.Require()

// Start with the empty keyspace group assignment.
mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 0, 0)
re.NotNil(mgr)
defer mgr.Close()
err := mgr.Initialize(true)
re.NoError(err)

rootPath := mgr.legacySvcRootPath
svcAddr := mgr.tsoServiceID.ServiceAddr

// Initialize PUT/DELETE events
events := []*etcdEvent{}
// Assign keyspace group 0 to this host/pod/keyspace-group-manager.
// final result: [0]
events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 0, []string{svcAddr})
// Assign keyspace group 1 to this host/pod/keyspace-group-manager.
// final result: [0,1]
events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 1, []string{"unknown", svcAddr})
// Assign keyspace group 2 to other host/pod/keyspace-group-manager.
// final result: [0,1]
events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 2, []string{"unknown"})
// Assign keyspace group 3 to this host/pod/keyspace-group-manager.
// final result: [0,1,3]
events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 3, []string{svcAddr})
// Delete keyspace group 0
// final result: [1,3]
events = generateKeyspaceGroupEvent(events, mvccpb.DELETE, 0, []string{})
// Put keyspace group 4 which doesn't belong to anyone.
// final result: [1,3]
events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 4, []string{})
// Put keyspace group 5 which doesn't belong to anyone.
// final result: [1,3]
events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 5, []string{})
// Assign keyspace group 2 to this host/pod/keyspace-group-manager.
// final result: [1,2,3]
events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 2, []string{svcAddr})
// Reassign keyspace group 3 to no one.
// final result: [1,2]
events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 3, []string{})
// Reassign keyspace group 4 to this host/pod/keyspace-group-manager.
// final result: [1,2,4]
events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 4, []string{svcAddr})

// Eventually, this keyspace groups manager is expected to serve the following keyspace groups.
idsExpected := []int{1, 2, 4}

// Apply the keyspace group assignment change events to etcd.
for _, event := range events {
switch event.eventType {
case mvccpb.PUT:
err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg)
re.NoError(err)
case mvccpb.DELETE:
err = deleteKeyspaceGroupInEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg.ID)
re.NoError(err)
}
}

// Verify the keyspace group assignment.
testutil.Eventually(re, func() bool {
idsAssigned := collectAssignedKeyspaceGroupIDs(re, mgr)
return reflect.DeepEqual(idsExpected, idsAssigned)
})
}

type etcdEvent struct {
eventType mvccpb.Event_EventType
ksg *endpoint.KeyspaceGroup
}

func generateKeyspaceGroupEvent(
events []*etcdEvent, eventType mvccpb.Event_EventType, id uint32, addrs []string,
) []*etcdEvent {
members := []endpoint.KeyspaceGroupMember{}
for _, addr := range addrs {
members = append(members, endpoint.KeyspaceGroupMember{Address: addr})
}

return append(events,
&etcdEvent{
eventType: eventType,
ksg: &endpoint.KeyspaceGroup{
ID: id,
Members: members,
Keyspaces: []uint32{id},
},
},
)
}

// runTestLoadMultipleKeyspaceGroupsAssignment tests the loading of multiple keyspace group assignment.
func runTestLoadKeyspaceGroupsAssignment(
ctx context.Context,
Expand Down Expand Up @@ -246,6 +343,38 @@ func newUniqueKeyspaceGroupManager(
return keyspaceGroupManager
}

// putKeyspaceGroupToEtcd puts a keyspace group to etcd.
func putKeyspaceGroupToEtcd(
ctx context.Context, etcdClient *clientv3.Client,
rootPath string, group *endpoint.KeyspaceGroup,
) error {
key := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(group.ID)}, "/")
value, err := json.Marshal(group)
if err != nil {
return err
}

if _, err := etcdClient.Put(ctx, key, string(value)); err != nil {
return err
}

return nil
}

// deleteKeyspaceGroupInEtcd deletes a keyspace group in etcd.
func deleteKeyspaceGroupInEtcd(
ctx context.Context, etcdClient *clientv3.Client,
rootPath string, id uint32,
) error {
key := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(id)}, "/")

if _, err := etcdClient.Delete(ctx, key); err != nil {
return err
}

return nil
}

// addKeyspaceGroupAssignment adds a keyspace group assignment to etcd.
func addKeyspaceGroupAssignment(
ctx context.Context, etcdClient *clientv3.Client,
Expand Down Expand Up @@ -287,8 +416,11 @@ func collectAssignedKeyspaceGroupIDs(re *require.Assertions, ksgMgr *KeyspaceGro
re.NotNil(am, fmt.Sprintf("ksg is not nil but am is nil for id %d", i))
re.Equal(i, int(am.ksgID))
re.Equal(i, int(ksg.ID))
if ksg.Members[0].Address == ksgMgr.tsoServiceID.ServiceAddr {
ids = append(ids, i)
for _, m := range ksg.Members {
if m.Address == ksgMgr.tsoServiceID.ServiceAddr {
ids = append(ids, i)
break
}
}
}
}
Expand Down

0 comments on commit 584e550

Please sign in to comment.