Skip to content

Commit

Permalink
mcs: fix the members field is null (#6518)
Browse files Browse the repository at this point in the history
close #6519

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored May 25, 2023
1 parent f9534ff commit 89d2c83
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 9 deletions.
9 changes: 7 additions & 2 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,9 @@ func (manager *Manager) LoadKeyspaceByID(spaceID uint32) (*keyspacepb.KeyspaceMe
}
return nil
})
meta.Id = spaceID
if meta != nil {
meta.Id = spaceID
}
return meta, err
}

Expand Down Expand Up @@ -671,8 +673,10 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
)
}()
for moreToPatrol {
var defaultKeyspaceGroup *endpoint.KeyspaceGroup
err = manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
defaultKeyspaceGroup, err := manager.kgm.store.LoadKeyspaceGroup(txn, utils.DefaultKeyspaceGroupID)
var err error
defaultKeyspaceGroup, err = manager.kgm.store.LoadKeyspaceGroup(txn, utils.DefaultKeyspaceGroupID)
if err != nil {
return err
}
Expand Down Expand Up @@ -752,6 +756,7 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
if err != nil {
return err
}
manager.kgm.groups[endpoint.StringUserKind(defaultKeyspaceGroup.UserKind)].Put(defaultKeyspaceGroup)
// If all keyspaces in the current batch are assigned, update the next start ID.
manager.nextPatrolStartID = nextStartID
}
Expand Down
17 changes: 13 additions & 4 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/balancer"
"github.com/tikv/pd/pkg/mcs/discovery"
Expand Down Expand Up @@ -149,6 +150,10 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
defer logutil.LogPanic()
defer m.wg.Done()
ticker := time.NewTicker(allocNodesToKeyspaceGroupsInterval)
failpoint.Inject("acceleratedAllocNodes", func() {
ticker.Stop()
ticker = time.NewTicker(time.Millisecond * 100)
})
defer ticker.Stop()
for {
select {
Expand All @@ -162,7 +167,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
}
groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0)
if err != nil {
log.Error("failed to load the all keyspace group", zap.Error(err))
log.Error("failed to load all keyspace groups", zap.Error(err))
continue
}
withError := false
Expand All @@ -171,7 +176,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, utils.KeyspaceGroupDefaultReplicaCount)
if err != nil {
withError = true
log.Error("failed to alloc nodes for keyspace group", zap.Error(err))
log.Error("failed to alloc nodes for keyspace group", zap.Uint32("keyspace-group-id", group.ID), zap.Error(err))
continue
}
group.Members = nodes
Expand Down Expand Up @@ -626,9 +631,12 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount
defer cancel()
ticker := time.NewTicker(allocNodesInterval)
defer ticker.Stop()

var kg *endpoint.KeyspaceGroup
nodes := make([]endpoint.KeyspaceGroupMember, 0, desiredReplicaCount)
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err := m.store.LoadKeyspaceGroup(txn, id)
var err error
kg, err = m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err
}
Expand Down Expand Up @@ -672,7 +680,8 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount
if err != nil {
return nil, err
}
log.Info("alloc nodes for keyspace group", zap.Uint32("id", id), zap.Reflect("nodes", nodes))
m.groups[endpoint.StringUserKind(kg.UserKind)].Put(kg)
log.Info("alloc nodes for keyspace group", zap.Uint32("keyspace-group-id", id), zap.Reflect("nodes", nodes))
return nodes, nil
}

Expand Down
34 changes: 33 additions & 1 deletion tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/testutil"
"github.com/tikv/pd/pkg/election"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
tsopkg "github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/server/apiv2/handlers"
"github.com/tikv/pd/tests"
Expand Down Expand Up @@ -425,3 +426,34 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient()
cancel()
wg.Wait()
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0)
re.Equal(uint32(0), kg.ID)
re.Equal([]uint32{0}, kg.Keyspaces)
re.False(kg.IsSplitting())
// wait for finishing alloc nodes
testutil.Eventually(re, func() bool {
kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0)
return len(kg.Members) == 2
})
testConfig := map[string]string{
"config": "1",
"tso_keyspace_group_id": "0",
"user_kind": "basic",
}
handlersutil.MustCreateKeyspace(re, suite.pdLeaderServer, &handlers.CreateKeyspaceParams{
Name: "test_keyspace",
Config: testConfig,
})
kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0)
testutil.Eventually(re, func() bool {
kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0)
return len(kg.Members) == 2
})
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}
2 changes: 1 addition & 1 deletion tests/server/apiv2/handlers/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func mustMakeTestKeyspaces(re *require.Assertions, server *tests.TestServer, cou
Name: fmt.Sprintf("test_keyspace_%d", i),
Config: testConfig,
}
resultMeta[i] = mustCreateKeyspace(re, server, createRequest)
resultMeta[i] = MustCreateKeyspace(re, server, createRequest)
}
return resultMeta
}
Expand Down
3 changes: 2 additions & 1 deletion tests/server/apiv2/handlers/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func sendUpdateStateRequest(re *require.Assertions, server *tests.TestServer, na
return true, meta.KeyspaceMeta
}

func mustCreateKeyspace(re *require.Assertions, server *tests.TestServer, request *handlers.CreateKeyspaceParams) *keyspacepb.KeyspaceMeta {
// MustCreateKeyspace creates a keyspace with HTTP API.
func MustCreateKeyspace(re *require.Assertions, server *tests.TestServer, request *handlers.CreateKeyspaceParams) *keyspacepb.KeyspaceMeta {
data, err := json.Marshal(request)
re.NoError(err)
httpReq, err := http.NewRequest(http.MethodPost, server.GetAddr()+keyspacesPrefix, bytes.NewBuffer(data))
Expand Down

0 comments on commit 89d2c83

Please sign in to comment.