Skip to content

Commit

Permalink
Add filter and batch handlng when loading keyspace groups meta from s…
Browse files Browse the repository at this point in the history
…torage

Fix root path for LoadTimestamp/SaveTimestamp

Separate initial keyspace groups assignment loading from onlne dynamical change.

Implement loadKeyspaceGroup() which also returns revision.

Changed the timeout mechanism for watching the progress of loading and initialize the keyspace groups assignment

Watch keyspace group membership/distribution meta change and dynamically apply the change

Add TestLoadInitialKeyspaceGroupsAssignment and TestExtractKeyspaceGroupIDFromPath

Add test cases for loading keyspace group assignment

Add retry if there is temporary failure when loading keyspace group assignment from etcd

Removed the mutex.

The original purpose was to prevent concurrent addition/removal of keyspace groups
during critical periods such as service shutdown and online keyspace group, while the former requires
snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is
added/initialized after that. Now we have ensured that's case by always cancelling the watch loop before closing keyspace groups in Close().

Added unitest for watching and dynamically applying keyspace group meta changes

Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Apr 6, 2023
1 parent 9aa3e97 commit de484bc
Show file tree
Hide file tree
Showing 15 changed files with 974 additions and 114 deletions.
4 changes: 3 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,9 @@ func IsLeaderChange(err error) bool {
return true
}
errMsg := err.Error()
return strings.Contains(errMsg, errs.NotLeaderErr) || strings.Contains(errMsg, errs.MismatchLeaderErr)
return strings.Contains(errMsg, errs.NotLeaderErr) ||
strings.Contains(errMsg, errs.MismatchLeaderErr) ||
strings.Contains(errMsg, errs.NotServedErr)
}

func trimHTTPPrefix(str string) string {
Expand Down
14 changes: 11 additions & 3 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@ import (
)

const (
// NotLeaderErr indicates the the non-leader member received the requests which should be received by leader.
// NotLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotLeaderErr = "is not leader"
// MismatchLeaderErr indicates the the non-leader member received the requests which should be received by leader.
// MismatchLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
MismatchLeaderErr = "mismatch leader id"
RetryTimeoutErr = "retry timeout"
// NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotServedErr = "is not served"
RetryTimeoutErr = "retry timeout"
)

// client errors
Expand Down
20 changes: 20 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
# AUTOGENERATED BY github.com/pingcap/errors/errdoc-gen
# YOU CAN CHANGE THE 'description'/'workaround' FIELDS IF THEM ARE IMPROPER.

["ErrLoadKeyspaceGroupsTerminated"]
error = '''
load keyspace groups terminated
'''

["ErrLoadKeyspaceGroupsTimeout"]
error = '''
load keyspace groups timeout
'''

["PD:ErrEncryptionKMS"]
error = '''
KMS error
Expand Down Expand Up @@ -731,11 +741,21 @@ error = '''
get allocator failed, %s
'''

["PD:tso:ErrGetAllocatorManager"]
error = '''
get allocator manager failed, %s
'''

["PD:tso:ErrGetLocalAllocator"]
error = '''
get local allocator failed, %s
'''

["PD:tso:ErrKeyspaceGroupIDInvalid"]
error = '''
the keyspace group id is invalid, %s
'''

["PD:tso:ErrLogicOverflow"]
error = '''
logic part overflow
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/pprof v0.0.0-20211122183932-1daafda22083 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
Expand Down
32 changes: 22 additions & 10 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@ package errs
import "github.com/pingcap/errors"

const (
// NotLeaderErr indicates the the non-leader member received the requests which should be received by leader.
// NotLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the client side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotLeaderErr = "is not leader"
// MismatchLeaderErr indicates the the non-leader member received the requests which should be received by leader.
// MismatchLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the client side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
MismatchLeaderErr = "mismatch leader id"
// NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it.
// Note: keep the same as the ones defined on the client side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotServedErr = "is not served"
)

// common error in multiple packages
Expand All @@ -31,14 +39,18 @@ var (

// tso errors
var (
ErrSetLocalTSOConfig = errors.Normalize("set local tso config failed, %s", errors.RFCCodeText("PD:tso:ErrSetLocalTSOConfig"))
ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator"))
ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator"))
ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS"))
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow"))
ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout"))
ErrSetLocalTSOConfig = errors.Normalize("set local tso config failed, %s", errors.RFCCodeText("PD:tso:ErrSetLocalTSOConfig"))
ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator"))
ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator"))
ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS"))
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow"))
ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout"))
ErrKeyspaceGroupIDInvalid = errors.Normalize("the keyspace group id is invalid, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIDInvalid"))
ErrGetAllocatorManager = errors.Normalize("get allocator manager failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocatorManager"))
ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace groups timeout", errors.RFCCodeText("ErrLoadKeyspaceGroupsTimeout"))
ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace groups terminated", errors.RFCCodeText("ErrLoadKeyspaceGroupsTerminated"))
)

// member errors
Expand Down
9 changes: 8 additions & 1 deletion pkg/mcs/tso/server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package server
import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/tso"
"go.uber.org/zap"
)
Expand All @@ -31,12 +32,18 @@ func newHandler(s *Server) *Handler {
}

// ResetTS resets the ts with specified tso.
// TODO: Support multiple keyspace groups.
func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) error {
log.Info("reset-ts",
zap.Uint64("new-ts", ts),
zap.Bool("ignore-smaller", ignoreSmaller),
zap.Bool("skip-upper-bound-check", skipUpperBoundCheck))
tsoAllocator, err := h.s.GetTSOAllocatorManager().GetAllocator(tso.GlobalDCLocation)
tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(mcsutils.DefaultKeySpaceGroupID)
if err != nil {
log.Error("failed to get allocator manager", errs.ZapError(err))
return err
}
tsoAllocator, err := tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation)
if err != nil {
return err
}
Expand Down
43 changes: 33 additions & 10 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ type Server struct {

// Callback functions for different stages
// startCallbacks will be called after the server is started.
startCallbacks []func()
startCallbacks []func()

// for service registry
serviceID *discovery.ServiceRegistryEntry
serviceRegister *discovery.ServiceRegister
}

Expand Down Expand Up @@ -199,14 +202,30 @@ func (s *Server) AddStartCallback(callbacks ...func()) {

// IsServing implements basicserver. It returns whether the server is the leader
// if there is embedded etcd, or the primary otherwise.
// TODO: support multiple keyspace groups
func (s *Server) IsServing() bool {
return atomic.LoadInt64(&s.isRunning) == 1 && s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID).IsLeader()
if atomic.LoadInt64(&s.isRunning) == 0 {
return false
}

member, err := s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID)
if err != nil {
log.Error("failed to get election member", errs.ZapError(err))
return false
}
return member.IsLeader()
}

// GetLeaderListenUrls gets service endpoints from the leader in election group.
// The entry at the index 0 is the primary's service endpoint.
func (s *Server) GetLeaderListenUrls() []string {
return s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID).GetLeaderListenUrls()
member, err := s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID)
if err != nil {
log.Error("failed to get election member", errs.ZapError(err))
return nil
}

return member.GetLeaderListenUrls()
}

// AddServiceReadyCallback implements basicserver.
Expand All @@ -229,8 +248,8 @@ func (s *Server) IsClosed() bool {
}

// GetTSOAllocatorManager returns the manager of TSO Allocator.
func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager {
return s.keyspaceGroupManager.GetAllocatorManager(mcsutils.DefaultKeySpaceGroupID)
func (s *Server) GetTSOAllocatorManager(keyspaceGroupID uint32) (*tso.AllocatorManager, error) {
return s.keyspaceGroupManager.GetAllocatorManager(keyspaceGroupID)
}

// IsLocalRequest checks if the forwarded host is the current host
Expand Down Expand Up @@ -416,11 +435,16 @@ func (s *Server) startServer() (err error) {
}

s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
defaultKsgStorageTSRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))
legacySvcRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.etcdClient, s.listenURL.Host, defaultKsgStorageTSRootPath, tsoSvcRootPath, s.cfg)
s.keyspaceGroupManager.Initialize()
s.serverLoopCtx, s.serviceID, s.etcdClient, s.listenURL.Host, legacySvcRootPath, tsoSvcRootPath, s.cfg)
// The param `false` means that we don't initialize the keyspace group manager
// by loading the keyspace group meta from etcd.
if err := s.keyspaceGroupManager.Initialize(false); err != nil {
return err
}

s.tsoProtoFactory = &tsoutil.TSOProtoFactory{}
s.service = &Service{Server: s}
Expand Down Expand Up @@ -448,8 +472,7 @@ func (s *Server) startServer() (err error) {
}

// Server has started.
entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
serializedEntry, err := entry.Serialize()
serializedEntry, err := s.serviceID.Serialize()
if err != nil {
return err
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package endpoint
import (
"fmt"
"path"
"regexp"
"strconv"
"strings"

Expand Down Expand Up @@ -239,6 +240,22 @@ func KeyspaceGroupIDPath(id uint32) string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey, encodeKeyspaceGroupID(id))
}

// ExtractKeyspaceGroupIDFromPath extracts keyspace group id from the given path, which contains
// the pattern of `tso/keyspace_groups/membership/(\d{5})$`.
func ExtractKeyspaceGroupIDFromPath(path string) (uint32, error) {
pattern := strings.Join([]string{KeyspaceGroupIDPrefix(), `(\d{5})$`}, "/")
re := regexp.MustCompile(pattern)
match := re.FindStringSubmatch(path)
if match == nil {
return 0, fmt.Errorf("invalid keyspace group id path: %s", path)
}
id, err := strconv.ParseUint(match[1], 10, 32)
if err != nil {
return 0, fmt.Errorf("failed to parse keyspace group ID: %v", err)
}
return uint32(id), nil
}

// encodeKeyspaceGroupID from uint32 to string.
func encodeKeyspaceGroupID(groupID uint32) string {
return fmt.Sprintf("%05d", groupID)
Expand Down
48 changes: 48 additions & 0 deletions pkg/storage/endpoint/key_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,51 @@ func BenchmarkRegionPath(b *testing.B) {
_ = RegionPath(uint64(i))
}
}

func TestExtractKeyspaceGroupIDFromPath(t *testing.T) {
re := require.New(t)

rightCases := []struct {
path string
id uint32
}{
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00000", id: 0},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00001", id: 1},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345", id: 12345},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/99999", id: 99999},
{path: "tso/keyspace_groups/membership/00000", id: 0},
{path: "tso/keyspace_groups/membership/00001", id: 1},
{path: "tso/keyspace_groups/membership/12345", id: 12345},
{path: "tso/keyspace_groups/membership/99999", id: 99999},
}

for _, tt := range rightCases {
id, err := ExtractKeyspaceGroupIDFromPath(tt.path)
re.Equal(tt.id, id)
re.NoError(err)
}

wrongCases := []struct {
path string
}{
{path: ""},
{path: "00001"},
{path: "xxx/keyspace_groups/membership/00001"},
{path: "tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/xxx/keyspace_groups/membership/00001"},
{path: "/pd/{cluster_id}/tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/123456"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/1234a"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345a"},
}

for _, tt := range wrongCases {
_, err := ExtractKeyspaceGroupIDFromPath(tt.path)
re.Error(err)
}
}
10 changes: 9 additions & 1 deletion pkg/storage/endpoint/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,19 @@ import (
"go.etcd.io/etcd/clientv3"
)

// KeyspaceGroupMember defines an election member which campaigns for the primary of the keyspace group.
type KeyspaceGroupMember struct {
Address string `json:"address"`
}

// KeyspaceGroup is the keyspace group.
type KeyspaceGroup struct {
ID uint32 `json:"id"`
UserKind string `json:"user-kind"`
// TODO: add `Members` field
// Members are the election members which campaign for the primary of the keyspace group.
Members []KeyspaceGroupMember `json:"members"`
// Keyspaces are the keyspace IDs which belong to the keyspace group.
Keyspaces []uint32 `json:"keyspaces"`
}

// KeyspaceGroupStorage is the interface for keyspace group storage.
Expand Down
5 changes: 3 additions & 2 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ func (am *AllocatorManager) close() {
log.Info("closed the allocator manager")
}

func (am *AllocatorManager) getMember() *ElectionMember {
return &am.member
func (am *AllocatorManager) getMember() ElectionMember {
return am.member
}

// SetLocalTSOConfig receives the zone label of this PD server and write it into etcd as dc-location
Expand Down Expand Up @@ -1072,6 +1072,7 @@ func (am *AllocatorManager) HandleRequest(dcLocation string, count uint32) (pdpb
err := errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found, generate timestamp failed", dcLocation))
return pdpb.Timestamp{}, err
}

return allocatorGroup.allocator.GenerateTSO(count)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error)

// Have dc-locations configured in the cluster, use the Global TSO generation way.
// (whit synchronization with other Local TSO Allocators)
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(gta.ctx)
defer cancel()
for i := 0; i < maxRetryCount; i++ {
var (
Expand Down Expand Up @@ -237,7 +237,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error)
skipCheck = true
goto SETTING_PHASE
}
// Is skipCheck is false and globalTSOResp remains the same, it means the estimatedTSO is valide.
// Is skipCheck is false and globalTSOResp remains the same, it means the estimatedTSO is valid.
if !skipCheck && tsoutil.CompareTimestamp(&globalTSOResp, estimatedMaxTSO) == 0 {
tsoCounter.WithLabelValues("global_tso_estimate", gta.timestampOracle.dcLocation).Inc()
}
Expand Down Expand Up @@ -309,7 +309,7 @@ type syncResp struct {

// SyncMaxTS is used to sync MaxTS with all Local TSO Allocator leaders in dcLocationMap.
// If maxTSO is the biggest TSO among all Local TSO Allocators, it will be written into
// each allocator and remines the same after the synchronization.
// each allocator and remains the same after the synchronization.
// If not, it will be replaced with the new max Local TSO and return.
func (gta *GlobalTSOAllocator) SyncMaxTS(
ctx context.Context,
Expand Down
Loading

0 comments on commit de484bc

Please sign in to comment.