Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: make TestSplitKeyspaceGroup stable #6584

Merged
merged 8 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 46 additions & 14 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -51,9 +52,11 @@ const (

// GroupManager is the manager of keyspace group related data.
type GroupManager struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
client *clientv3.Client
clusterID uint64

sync.RWMutex
// groups is the cache of keyspace group related information.
Expand Down Expand Up @@ -90,19 +93,19 @@ func NewKeyspaceGroupManager(
cancel: cancel,
store: store,
groups: groups,
client: client,
clusterID: clusterID,
nodesBalancer: balancer.GenByPolicy[string](defaultBalancerPolicy),
serviceRegistryMap: make(map[string]string),
}

// If the etcd client is not nil, start the watch loop for the registered tso servers.
// The PD(TSO) Client relies on this info to discover tso servers.
if client != nil {
m.initTSONodesWatcher(client, clusterID)
m.wg.Add(2)
if m.client != nil {
m.initTSONodesWatcher(m.client, m.clusterID)
m.wg.Add(1)
go m.tsoNodesWatcher.StartWatchLoop()
go m.allocNodesToAllKeyspaceGroups()
}

return m
}

Expand Down Expand Up @@ -137,6 +140,11 @@ func (m *GroupManager) Bootstrap() error {
m.groups[userKind].Put(group)
}

// It will only alloc node when the group manager is on API leader.
if m.client != nil {
m.wg.Add(1)
go m.allocNodesToAllKeyspaceGroups()
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

Expand Down Expand Up @@ -338,11 +346,6 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro
Members: keyspaceGroup.Members,
Keyspaces: keyspaceGroup.Keyspaces,
}
if oldKG.IsSplitting() {
newKG.SplitState = &endpoint.SplitState{
SplitSource: oldKG.SplitState.SplitSource,
}
}
err = m.store.SaveKeyspaceGroup(txn, newKG)
if err != nil {
return err
Expand All @@ -352,6 +355,27 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro
})
}

// saveGroupWithUpdateKeyspace will try to save the given keyspace groups into the storage.
// It only update the keyspace field for the keyspace group.
func (m *GroupManager) saveGroupWithUpdateKeyspace(keyspaceGroup *endpoint.KeyspaceGroup) error {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
oldKG, err := m.store.LoadKeyspaceGroup(txn, keyspaceGroup.ID)
if err != nil {
return err
}
if oldKG.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
newKG := oldKG
newKG.Keyspaces = keyspaceGroup.Keyspaces
err = m.store.SaveKeyspaceGroup(txn, newKG)
if err != nil {
return err
}
return nil
})
}

// GetKeyspaceConfigByKind returns the keyspace config for the given user kind.
func (m *GroupManager) GetKeyspaceConfigByKind(userKind endpoint.UserKind) (map[string]string, error) {
// when server is not in API mode, we don't need to return the keyspace config
Expand Down Expand Up @@ -380,6 +404,8 @@ func (m *GroupManager) getKeyspaceConfigByKindLocked(userKind endpoint.UserKind)
return config, nil
}

var failpointOnce sync.Once

// UpdateKeyspaceForGroup updates the keyspace field for the keyspace group.
func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupID string, keyspaceID uint32, mutation int) error {
// when server is not in API mode, we don't need to update the keyspace for keyspace group
Expand All @@ -391,6 +417,12 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI
return err
}

failpoint.Inject("externalAllocNode", func(val failpoint.Value) {
failpointOnce.Do(func() {
addrs := val.(string)
m.SetNodesForKeyspaceGroup(utils.DefaultKeyspaceGroupID, strings.Split(addrs, ","))
})
})
m.Lock()
defer m.Unlock()
return m.updateKeyspaceForGroupLocked(userKind, id, keyspaceID, mutation)
Expand Down Expand Up @@ -422,7 +454,7 @@ func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind,
}

if changed {
if err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{kg}, true); err != nil {
if err := m.saveGroupWithUpdateKeyspace(kg); err != nil {
return err
}

Expand Down
49 changes: 47 additions & 2 deletions tests/pdctl/keyspace/keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestSplitKeyspaceGroup(t *testing.T) {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`))
keyspaces := make([]string, 0)
for i := 0; i < 500; i++ {
for i := 0; i < 129; i++ { // 128 is the default max txn ops limit in etcd.
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i))
}
tc, err := tests.NewTestAPICluster(ctx, 3, func(conf *config.Config, serverName string) {
Expand Down Expand Up @@ -126,8 +126,53 @@ func TestSplitKeyspaceGroup(t *testing.T) {
output, err := pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
return strings.Contains(string(output), "Success")
}, testutil.WithWaitFor(20*time.Second))
})

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop"))
}

func TestExternalAllocNodeWhenStart(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// external alloc node for keyspace group, when keyspace manager update keyspace info to keyspace group
// we hope the keyspace group can be updated correctly.
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/externalAllocNode", `return("127.0.0.1:2379,127.0.0.1:2380")`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`))
keyspaces := make([]string, 0)
for i := 0; i < 10; i++ {
keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i))
}
tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, serverName string) {
conf.Keyspace.PreAlloc = keyspaces
})
re.NoError(err)
err = tc.RunInitialServers()
re.NoError(err)
pdAddr := tc.GetConfig().GetClientURL()

cmd := pdctlCmd.GetRootCmd()

time.Sleep(2 * time.Second)
tc.WaitLeader()
leaderServer := tc.GetServer(tc.GetLeader())
re.NoError(leaderServer.BootstrapCluster())

// check keyspace group information.
defaultKeyspaceGroupID := fmt.Sprintf("%d", utils.DefaultKeyspaceGroupID)
args := []string{"-u", pdAddr, "keyspace-group"}
testutil.Eventually(re, func() bool {
output, err := pdctl.ExecuteCommand(cmd, append(args, defaultKeyspaceGroupID)...)
re.NoError(err)
var keyspaceGroup endpoint.KeyspaceGroup
err = json.Unmarshal(output, &keyspaceGroup)
re.NoError(err)
return len(keyspaceGroup.Keyspaces) == len(keyspaces)+1 && len(keyspaceGroup.Members) == 2
})

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/externalAllocNode"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop"))
}
8 changes: 2 additions & 6 deletions tests/pdctl/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,12 +463,8 @@ func TestScheduler(t *testing.T) {
result := make(map[string]interface{})
testutil.Eventually(re, func() bool {
mightExec([]string{"-u", pdAddr, "scheduler", "describe", "balance-leader-scheduler"}, &result)
return len(result) != 0
}, testutil.WithTickInterval(50*time.Millisecond))

testutil.Eventually(re, func() bool {
return result["status"] == "paused" && result["summary"] == ""
}, testutil.WithTickInterval(50*time.Millisecond))
return len(result) != 0 && result["status"] == "paused" && result["summary"] == ""
}, testutil.WithWaitFor(30*time.Second))

mustUsage([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler", "60"})
mustExec([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil)
Expand Down