Skip to content

Commit

Permalink
Fix test failure
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Apr 3, 2023
1 parent 92d67c1 commit a65c456
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 32 deletions.
4 changes: 2 additions & 2 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

["ErrLoadKeyspaceGroupsTerminated"]
error = '''
load keyspace grops terminated
load keyspace groups terminated
'''

["ErrLoadKeyspaceGroupsTimeout"]
error = '''
load keyspace grops timeout
load keyspace groups timeout
'''

["PD:ErrEncryptionKMS"]
Expand Down
18 changes: 15 additions & 3 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,21 +194,33 @@ func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error {

// Close this KeyspaceGroupManager
func (kgm *KeyspaceGroupManager) Close() {
log.Info("closing keyspace group manager")
kgm.cancel()
kgm.wg.Wait()

kgm.closeKeyspaceGroups()
log.Info("keyspace group manager closed")
}

func (kgm *KeyspaceGroupManager) closeKeyspaceGroups() {
kgm.mu.Lock()
defer kgm.mu.Unlock()

log.Info("closing all keyspace groups")

wg := sync.WaitGroup{}
for i := range kgm.ams {
if mgr := kgm.ams[i].Load(); mgr != nil {
mgr.close()
if am := kgm.ams[i].Load(); am != nil {
wg.Add(1)
go func(am *AllocatorManager) {
defer wg.Done()
am.close()
log.Info("keyspace group closed", zap.Uint32("keyspace-group-id", am.ksgID))
}(am)
}
}
wg.Wait()

log.Info("All keyspace groups closed")
}

func (kgm *KeyspaceGroupManager) checkInitProgress(ctx context.Context, cancel context.CancelFunc, done chan struct{}) {
Expand Down
80 changes: 53 additions & 27 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package tso
import (
"context"
"encoding/json"
"math/rand"
"path"
"strings"
"sync"
Expand Down Expand Up @@ -117,44 +118,48 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() {
// TestLoadKeyspaceGroupsAssignment tests the loading of the keyspace group assignment.
func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsAssignment() {
re := suite.Require()
maxCountInUse := int(mcsutils.MaxKeyspaceGroupCountInUse)
// maxCountInUse := int(1024)
// Test the loading of empty keyspace group assignment.
runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, 0, 0)
runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, 0, 0, 100)
// Test the loading of single keyspace group assignment.
runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, 1, 0)
runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, 1, 0, 100)
// Test the loading of multiple keyspace group assignment.
runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, 3, 0)
runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg,
int(mcsutils.MaxKeyspaceGroupCountInUse-1), 0)
runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg,
int(mcsutils.MaxKeyspaceGroupCountInUse), 0)
runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, 3, 0, 100)
runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, maxCountInUse-1, 0, 10)
runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, maxCountInUse, 0, 10)
// Test the loading of the keyspace group assignment which exceeds the maximum
// keyspace group count. In this case, the manager should only load/serve the
// first MaxKeyspaceGroupCountInUse keyspace groups and ignore the rest
runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg,
int(mcsutils.MaxKeyspaceGroupCountInUse+1), 0)
runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, maxCountInUse+1, 0, 10)
}

// TestLoadWithDifferentBatchSize tests the loading of the keyspace group assignment with the different batch size.
func (suite *keyspaceGroupManagerTestSuite) TestLoadWithDifferentBatchSize() {
re := suite.Require()

maxCount := mcsutils.MaxKeyspaceGroupCountInUse
batchSize := int64(17)
maxCount := uint32(1024)
params := []struct {
batchSize int64
count int
batchSize int64
count int
probabilityAssignToMe int // percentage of assigning keyspace groups to this host/pod
}{
{batchSize: 1, count: 1},
{batchSize: 2, count: int(maxCount / 10)},
{batchSize: 7, count: int(maxCount / 10)},
{batchSize: defaultLoadKeyspaceGroupsBatchSize, count: int(defaultLoadKeyspaceGroupsBatchSize)},
{batchSize: int64(maxCount / 13), count: int(maxCount / 13)},
{batchSize: int64(maxCount), count: int(maxCount / 13)},
{batchSize: 1, count: 1, probabilityAssignToMe: 100},
{batchSize: 2, count: int(maxCount / 10), probabilityAssignToMe: 100},
{batchSize: 7, count: int(maxCount / 10), probabilityAssignToMe: 100},
{batchSize: batchSize, count: int(batchSize), probabilityAssignToMe: 50},
{batchSize: int64(maxCount / 13), count: int(maxCount / 13), probabilityAssignToMe: 50},
{batchSize: int64(maxCount), count: int(maxCount / 13), probabilityAssignToMe: 10},
}

for _, param := range params {
runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, param.count-1, param.batchSize)
runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, param.count, param.batchSize)
runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, param.count+1, param.batchSize)
runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg,
param.count-1, param.batchSize, param.probabilityAssignToMe)
runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg,
param.count, param.batchSize, param.probabilityAssignToMe)
runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg,
param.count+1, param.batchSize, param.probabilityAssignToMe)
}
}

Expand All @@ -163,25 +168,35 @@ func runTestLoadKeyspaceGroupsAssignment(
ctx context.Context, re *require.Assertions, etcdClient *clientv3.Client, cfg *TestServiceConfig,
numberOfKeypaceGroupsToAdd int,
loadKeyspaceGroupsBatchSize int64, // set to 0 to use the default value
probabilityAssignToMe int, // percentage of assigning keyspace groups to this host/pod
) {
ids := make(map[uint32]bool, 0)
mgr := newUniqueKeyspaceGroupManager(ctx, etcdClient, cfg, 0, loadKeyspaceGroupsBatchSize)
re.NotNil(mgr)
defer mgr.Close()

const step = 30
keyspaceGroupsAdded := sync.Map{}
wg := sync.WaitGroup{}
for i := 0; i < numberOfKeypaceGroupsToAdd; i += step {
wg.Add(1)
go func(startID int) {
defer wg.Done()

endID := startID + step
if endID > numberOfKeypaceGroupsToAdd {
endID = numberOfKeypaceGroupsToAdd
}

randomGen := rand.New(rand.NewSource(time.Now().UnixNano()))
for j := startID; j < endID; j++ {
assignToMe := false
if randomGen.Intn(100) < probabilityAssignToMe {
assignToMe = true
keyspaceGroupsAdded.Store(uint32(j), struct{}{})
}
addKeyspaceGroupAssignment(
ctx, etcdClient,
ctx, etcdClient, assignToMe,
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(j))
}
}(i)
Expand All @@ -193,12 +208,16 @@ func runTestLoadKeyspaceGroupsAssignment(
if numberOfKeypaceGroupsToAdd > len(mgr.ams) {
numberOfKeypaceGroupsToAdd = len(mgr.ams)
}
for i := 0; i < numberOfKeypaceGroupsToAdd; i++ {
ids[uint32(i)] = true
}
// Set the expected result.
keyspaceGroupsAdded.Range(func(key, _ interface{}) bool {
ids[key.(uint32)] = true
return true
})

err := mgr.Initialize(true)
re.NoError(err)

// Verify the keyspace group assignment.
re.True(verifyKeyspaceGroupAssignment(ids, mgr))
}

Expand Down Expand Up @@ -226,11 +245,18 @@ func newUniqueKeyspaceGroupManager(

// addKeyspaceGroupAssignment adds a keyspace group assignment to etcd.
func addKeyspaceGroupAssignment(
ctx context.Context, etcdClient *clientv3.Client, rootPath, svcAddr string, id uint32,
ctx context.Context, etcdClient *clientv3.Client,
assignToMe bool, rootPath, svcAddr string, id uint32,
) error {
var location string
if assignToMe {
location = svcAddr
} else {
location = uuid.NewString()
}
group := &endpoint.KeyspaceGroup{
ID: id,
Members: []endpoint.KeyspaceGroupMember{{Location: svcAddr}},
Members: []endpoint.KeyspaceGroupMember{{Location: location}},
Keyspaces: []uint32{id},
}

Expand Down

0 comments on commit a65c456

Please sign in to comment.