Skip to content

Commit

Permalink
Merge branch 'master' into fix_keyspace_partrol
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored May 5, 2023
2 parents 2e1f8d1 + 09e6531 commit 40b99b2
Show file tree
Hide file tree
Showing 18 changed files with 446 additions and 142 deletions.
26 changes: 24 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,35 @@ func NewClientWithKeyspaceName(ctx context.Context, keyspace string, svrAddrs []
c.cancel()
return nil, err
}
if err := c.initRetry(c.loadKeyspaceMeta, keyspace); err != nil {
return nil, err
}
return c, nil
}

func (c *client) initRetry(f func(s string) error, str string) error {
var err error
for i := 0; i < c.option.maxRetryTimes; i++ {
if err = f(str); err == nil || strings.Contains(err.Error(), "ENTRY_NOT_FOUND") {
return nil
}
select {
case <-c.ctx.Done():
return err
case <-time.After(time.Second):
}
}
return errors.WithStack(err)
}

func (c *client) loadKeyspaceMeta(keyspace string) error {
keyspaceMeta, err := c.LoadKeyspace(context.TODO(), keyspace)
// Here we ignore ENTRY_NOT_FOUND error and it will set the keyspaceID to 0.
if err != nil && !strings.Contains(err.Error(), "ENTRY_NOT_FOUND") {
return nil, err
return err
}
c.keyspaceID = keyspaceMeta.GetId()
return c, nil
return nil
}

func (c *client) setup() error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestLeadership(t *testing.T) {
// Check the lease.
lease1 := leadership1.getLease()
re.NotNil(lease1)
lease2 := leadership1.getLease()
lease2 := leadership2.getLease()
re.NotNil(lease2)

re.True(lease1.IsExpired())
Expand Down
26 changes: 18 additions & 8 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,21 +475,30 @@ func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind,
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}

changed := false

switch mutation {
case opAdd:
if !slice.Contains(kg.Keyspaces, keyspaceID) {
kg.Keyspaces = append(kg.Keyspaces, keyspaceID)
changed = true
}
case opDelete:
if slice.Contains(kg.Keyspaces, keyspaceID) {
kg.Keyspaces = slice.Remove(kg.Keyspaces, keyspaceID)
lenOfKeyspaces := len(kg.Keyspaces)
kg.Keyspaces = slice.Remove(kg.Keyspaces, keyspaceID)
if lenOfKeyspaces != len(kg.Keyspaces) {
changed = true
}
}
if err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{kg}, true); err != nil {
return err
}

m.groups[userKind].Put(kg)
if changed {
if err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{kg}, true); err != nil {
return err
}

m.groups[userKind].Put(kg)
}
return nil
}

Expand Down Expand Up @@ -528,8 +537,9 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse
updateNew = true
}

if slice.Contains(oldKG.Keyspaces, keyspaceID) {
oldKG.Keyspaces = slice.Remove(oldKG.Keyspaces, keyspaceID)
lenOfOldKeyspaces := len(oldKG.Keyspaces)
oldKG.Keyspaces = slice.Remove(oldKG.Keyspaces, keyspaceID)
if lenOfOldKeyspaces != len(oldKG.Keyspaces) {
updateOld = true
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,9 @@ func (s *Server) GetMember(keyspaceID, keyspaceGroupID uint32) (tso.ElectionMemb
return member, nil
}

// ResignPrimary resigns the primary of the given keyspace and keyspace group.
func (s *Server) ResignPrimary() error {
member, err := s.keyspaceGroupManager.GetElectionMember(
mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
// ResignPrimary resigns the primary of the given keyspace.
func (s *Server) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error {
member, err := s.keyspaceGroupManager.GetElectionMember(keyspaceID, keyspaceGroupID)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func (suite *operatorControllerTestSuite) TestConcurrentRemoveOperator() {
oc.SetOperator(op1)

suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/concurrentRemoveOperator", "return(true)"))
defer suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/concurrentRemoveOperator"))

var wg sync.WaitGroup
wg.Add(2)
Expand Down
11 changes: 6 additions & 5 deletions pkg/slice/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ func Contains[T comparable](slice []T, value T) bool {

// Remove removes the value from the slice.
func Remove[T comparable](slice []T, value T) []T {
for i := 0; i < len(slice); i++ {
if slice[i] == value {
slice = append(slice[:i], slice[i+1:]...)
i--
i, j := 0, 0
for ; i < len(slice); i++ {
if slice[i] != value {
slice[j] = slice[i]
j++
}
}
return slice
return slice[:j]
}
25 changes: 24 additions & 1 deletion pkg/slice/slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestSliceContains(t *testing.T) {
re.False(slice.Contains(is, int64(4)))
}

func TestSliceRemove(t *testing.T) {
func TestSliceRemoveGenericTypes(t *testing.T) {
t.Parallel()
re := require.New(t)
ss := []string{"a", "b", "c"}
Expand All @@ -75,3 +75,26 @@ func TestSliceRemove(t *testing.T) {
is = slice.Remove(is, 1)
re.Equal([]int64{2, 3}, is)
}

func TestSliceRemove(t *testing.T) {
t.Parallel()
re := require.New(t)

is := []int64{}
is = slice.Remove(is, 1)
re.Equal([]int64{}, is)

is = []int64{1}
is = slice.Remove(is, 2)
re.Equal([]int64{1}, is)
is = slice.Remove(is, 1)
re.Equal([]int64{}, is)

is = []int64{1, 2, 3}
is = slice.Remove(is, 1)
re.Equal([]int64{2, 3}, is)

is = []int64{1, 1, 1}
is = slice.Remove(is, 1)
re.Equal([]int64{}, is)
}
2 changes: 1 addition & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ func (kgm *KeyspaceGroupManager) FindGroupByKeyspaceID(
return curAM, curKeyspaceGroup, curKeyspaceGroupID, nil
}

// GetElectionMember returns the election member of the given keyspace group
// GetElectionMember returns the election member of the keyspace group serving the given keyspace.
func (kgm *KeyspaceGroupManager) GetElectionMember(
keyspaceID, keyspaceGroupID uint32,
) (ElectionMember, error) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func TestEtcdClientSync(t *testing.T) {
re.Len(listResp3.Members, 1)
re.Equal(uint64(etcd2.Server.ID()), listResp3.Members[0].ID)

require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval"))
}

func TestEtcdWithHangLeaderEnableCheck(t *testing.T) {
Expand All @@ -251,13 +251,13 @@ func TestEtcdWithHangLeaderEnableCheck(t *testing.T) {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval", "return(true)"))
err = checkEtcdWithHangLeader(t)
re.NoError(err)
require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval"))

// Test with disable check.
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck", "return(true)"))
err = checkEtcdWithHangLeader(t)
re.Error(err)
require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck"))
}

func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) {
Expand Down
13 changes: 9 additions & 4 deletions tests/integrations/mcs/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mcs

import (
"context"
"fmt"
"time"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -92,12 +93,16 @@ func (tc *TestTSOCluster) DestroyServer(addr string) {
}

// ResignPrimary resigns the primary TSO server.
func (tc *TestTSOCluster) ResignPrimary() {
tc.GetPrimary(mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID).ResignPrimary()
func (tc *TestTSOCluster) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error {
primaryServer := tc.GetPrimaryServer(keyspaceID, keyspaceGroupID)
if primaryServer == nil {
return fmt.Errorf("no tso server serves this keyspace %d", keyspaceID)
}
return primaryServer.ResignPrimary(keyspaceID, keyspaceGroupID)
}

// GetPrimary returns the primary TSO server.
func (tc *TestTSOCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server {
// GetPrimaryServer returns the primary TSO server of the given keyspace
func (tc *TestTSOCluster) GetPrimaryServer(keyspaceID, keyspaceGroupID uint32) *tso.Server {
for _, server := range tc.servers {
if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) {
return server
Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/mcs/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (suite *keyspaceGroupTestSuite) TearDownTest() {
func (suite *keyspaceGroupTestSuite) TestAllocNodesUpdate() {
// add three nodes.
nodes := make(map[string]bs.Server)
for i := 0; i < utils.KeyspaceGroupDefaultReplicaCount+2; i++ {
for i := 0; i < utils.KeyspaceGroupDefaultReplicaCount+1; i++ {
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
defer cleanup()
nodes[s.GetAddr()] = s
Expand Down Expand Up @@ -113,7 +113,7 @@ func (suite *keyspaceGroupTestSuite) TestAllocNodesUpdate() {
oldMembers[member.Address] = struct{}{}
}

// alloc node update to 2.
// alloc node update to 3.
params.Replica = utils.KeyspaceGroupDefaultReplicaCount + 1
got, code = suite.tryAllocNodesForKeyspaceGroup(id, params)
suite.Equal(http.StatusOK, code)
Expand Down
87 changes: 85 additions & 2 deletions tests/integrations/mcs/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
)

var once sync.Once
Expand All @@ -48,13 +49,25 @@ func InitLogger(cfg *tso.Config) (err error) {
return err
}

// SetupClientWithKeyspace creates a TSO client for test.
func SetupClientWithKeyspace(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client {
// SetupClientWithDefaultKeyspaceName creates a TSO client with default keyspace name for test.
func SetupClientWithDefaultKeyspaceName(
ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption,
) pd.Client {
cli, err := pd.NewClientWithKeyspaceName(ctx, "", endpoints, pd.SecurityOption{}, opts...)
re.NoError(err)
return cli
}

// SetupClientWithKeyspaceID creates a TSO client with the given keyspace id for test.
func SetupClientWithKeyspaceID(
ctx context.Context, re *require.Assertions,
keyspaceID uint32, endpoints []string, opts ...pd.ClientOption,
) pd.Client {
cli, err := pd.NewClientWithKeyspace(ctx, keyspaceID, endpoints, pd.SecurityOption{}, opts...)
re.NoError(err)
return cli
}

// StartSingleResourceManagerTestServer creates and starts a resource manager server with default config for testing.
func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*rm.Server, func()) {
cfg := rm.NewConfig()
Expand Down Expand Up @@ -137,3 +150,73 @@ func WaitForTSOServiceAvailable(ctx context.Context, pdClient pd.Client) error {
}
return errors.WithStack(err)
}

// CheckMultiKeyspacesTSO checks the correctness of TSO for multiple keyspaces.
func CheckMultiKeyspacesTSO(
ctx context.Context, re *require.Assertions,
clients []pd.Client, parallelAct func(),
) {
ctx, cancel := context.WithCancel(ctx)
wg := sync.WaitGroup{}
wg.Add(len(clients))

for _, client := range clients {
go func(cli pd.Client) {
defer wg.Done()
var ts, lastTS uint64
for {
select {
case <-ctx.Done():
// Make sure the lastTS is not empty
re.NotEmpty(lastTS)
return
default:
}
physical, logical, err := cli.GetTS(ctx)
// omit the error check since there are many kinds of errors
if err != nil {
continue
}
ts = tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
}
}(client)
}

wg.Add(1)
go func() {
defer wg.Done()
parallelAct()
cancel()
}()

wg.Wait()
}

// WaitForMultiKeyspacesTSOAvailable waits for the given keyspaces being served by the tso server side
func WaitForMultiKeyspacesTSOAvailable(
ctx context.Context, re *require.Assertions,
keyspaceIDs []uint32, backendEndpoints []string,
) []pd.Client {
wg := sync.WaitGroup{}
wg.Add(len(keyspaceIDs))

clients := make([]pd.Client, 0, len(keyspaceIDs))
for _, keyspaceID := range keyspaceIDs {
cli := SetupClientWithKeyspaceID(ctx, re, keyspaceID, backendEndpoints)
re.NotNil(cli)
clients = append(clients, cli)

go func() {
defer wg.Done()
testutil.Eventually(re, func() bool {
_, _, err := cli.GetTS(ctx)
return err == nil
})
}()
}

wg.Wait()
return clients
}
Loading

0 comments on commit 40b99b2

Please sign in to comment.