diff --git a/client/client.go b/client/client.go index d9cb0358f3f..e1498a49899 100644 --- a/client/client.go +++ b/client/client.go @@ -252,20 +252,20 @@ type serviceModeKeeper struct { // triggering service mode switching concurrently. sync.RWMutex serviceMode pdpb.ServiceMode - tsoClient atomic.Value // *tsoClient + tsoClient *tsoClient tsoSvcDiscovery ServiceDiscovery } -func (smk *serviceModeKeeper) close() { - smk.Lock() - defer smk.Unlock() - switch smk.serviceMode { +func (k *serviceModeKeeper) close() { + k.Lock() + defer k.Unlock() + switch k.serviceMode { case pdpb.ServiceMode_API_SVC_MODE: - smk.tsoSvcDiscovery.Close() + k.tsoSvcDiscovery.Close() fallthrough case pdpb.ServiceMode_PD_SVC_MODE: - if tsoCli := smk.tsoClient.Load(); tsoCli != nil { - tsoCli.(*tsoClient).Close() + if k.tsoClient != nil { + k.tsoClient.Close() } case pdpb.ServiceMode_UNKNOWN_SVC_MODE: } @@ -486,8 +486,8 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) { } newTSOCli.Setup() // Replace the old TSO client. - oldTSOClient := c.getTSOClient() - c.tsoClient.Store(newTSOCli) + oldTSOClient := c.tsoClient + c.tsoClient = newTSOCli oldTSOClient.Close() // Replace the old TSO service discovery if needed. oldTSOSvcDiscovery := c.tsoSvcDiscovery @@ -506,11 +506,10 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) { zap.String("new-mode", newMode.String())) } -func (c *client) getTSOClient() *tsoClient { - if tsoCli := c.tsoClient.Load(); tsoCli != nil { - return tsoCli.(*tsoClient) - } - return nil +func (c *client) getServiceClientProxy() (*tsoClient, pdpb.ServiceMode) { + c.RLock() + defer c.RUnlock() + return c.tsoClient, c.serviceMode } func (c *client) scheduleUpdateTokenConnection() { @@ -675,7 +674,7 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur req := tsoReqPool.Get().(*tsoRequest) req.requestCtx = ctx req.clientCtx = c.ctx - tsoClient := c.getTSOClient() + tsoClient, _ := c.getServiceClientProxy() req.start = time.Now() req.dcLocation = dcLocation @@ -704,6 +703,26 @@ func (c *client) GetLocalTS(ctx context.Context, dcLocation string) (physical in return resp.Wait() } +func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) { + tsoClient, serviceMode := c.getServiceClientProxy() + if tsoClient == nil { + return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("tso client is nil") + } + + switch serviceMode { + case pdpb.ServiceMode_UNKNOWN_SVC_MODE: + return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("unknown service mode") + case pdpb.ServiceMode_PD_SVC_MODE: + // If the service mode is switched to API during GetTS() call, which happens during migration, + // returning the default timeline should be fine. + return c.GetTS(ctx) + case pdpb.ServiceMode_API_SVC_MODE: + return tsoClient.getMinTS(ctx) + default: + return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("undefined service mode") + } +} + func handleRegionResponse(res *pdpb.GetRegionResponse) *Region { if res.Region == nil { return nil @@ -1395,7 +1414,7 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e // GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map // For test only. func (c *client) GetTSOAllocators() *sync.Map { - tsoClient := c.getTSOClient() + tsoClient, _ := c.getServiceClientProxy() if tsoClient == nil { return nil } diff --git a/client/client_test.go b/client/client_test.go index 5f6a0b89b42..e82fe861a0e 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/client/testutil" "github.com/tikv/pd/client/tlsutil" + "github.com/tikv/pd/client/tsoutil" "go.uber.org/goleak" "google.golang.org/grpc" ) @@ -32,13 +33,13 @@ func TestMain(m *testing.M) { goleak.VerifyTestMain(m, testutil.LeakOptions...) } -func TestTsLessEqual(t *testing.T) { +func TestTSLessEqual(t *testing.T) { re := require.New(t) - re.True(tsLessEqual(9, 9, 9, 9)) - re.True(tsLessEqual(8, 9, 9, 8)) - re.False(tsLessEqual(9, 8, 8, 9)) - re.False(tsLessEqual(9, 8, 9, 6)) - re.True(tsLessEqual(9, 6, 9, 8)) + re.True(tsoutil.TSLessEqual(9, 9, 9, 9)) + re.True(tsoutil.TSLessEqual(8, 9, 9, 8)) + re.False(tsoutil.TSLessEqual(9, 8, 8, 9)) + re.False(tsoutil.TSLessEqual(9, 8, 9, 6)) + re.True(tsoutil.TSLessEqual(9, 6, 9, 8)) } func TestUpdateURLs(t *testing.T) { diff --git a/client/errs/errno.go b/client/errs/errno.go index e4bb7a21a9b..73bbd41541e 100644 --- a/client/errs/errno.go +++ b/client/errs/errno.go @@ -43,6 +43,7 @@ var ( ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed")) ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout")) ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO")) + ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO")) ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader")) ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember")) ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo")) diff --git a/client/go.mod b/client/go.mod index aa19f4f3e6a..1bb3a49045f 100644 --- a/client/go.mod +++ b/client/go.mod @@ -8,7 +8,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be + github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.11.1 github.com/stretchr/testify v1.8.2 diff --git a/client/go.sum b/client/go.sum index f06d1ee10da..1cfc3e28631 100644 --- a/client/go.sum +++ b/client/go.sum @@ -82,8 +82,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be h1:eHtwHgPzzm8aIZ4x8o7zg1b23cjUl0AikW+SDLpqf3E= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/client/tso_client.go b/client/tso_client.go index 7585fdc34f6..9aae31bba5a 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -22,8 +22,11 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" "google.golang.org/grpc" healthpb "google.golang.org/grpc/health/grpc_health_v1" @@ -31,14 +34,17 @@ import ( // TSOClient is the client used to get timestamps. type TSOClient interface { - // GetTS gets a timestamp from PD. + // GetTS gets a timestamp from PD or TSO microservice. GetTS(ctx context.Context) (int64, int64, error) - // GetTSAsync gets a timestamp from PD, without block the caller. + // GetTSAsync gets a timestamp from PD or TSO microservice, without block the caller. GetTSAsync(ctx context.Context) TSFuture - // GetLocalTS gets a local timestamp from PD. + // GetLocalTS gets a local timestamp from PD or TSO microservice. GetLocalTS(ctx context.Context, dcLocation string) (int64, int64, error) - // GetLocalTSAsync gets a local timestamp from PD, without block the caller. + // GetLocalTSAsync gets a local timestamp from PD or TSO microservice, without block the caller. GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture + // GetMinTS gets a timestamp from PD or the minimal timestamp across all keyspace groups from + // the TSO microservice. + GetMinTS(ctx context.Context) (int64, int64, error) } type tsoRequest struct { @@ -275,3 +281,116 @@ func (c *tsoClient) backupClientConn() (*grpc.ClientConn, string) { } return nil, "" } + +// getMinTS gets a timestamp from PD or the minimal timestamp across all keyspace groups from the TSO microservice. +func (c *tsoClient) getMinTS(ctx context.Context) (physical, logical int64, err error) { + // Immediately refresh the TSO server/pod list + addrs, err := c.svcDiscovery.DiscoverMicroservice(tsoService) + if err != nil { + return 0, 0, errs.ErrClientGetMinTSO.Wrap(err).GenWithStackByCause() + } + if len(addrs) == 0 { + return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("no tso servers/pods discovered") + } + + // Get the minimal timestamp from the TSO servers/pods + var mutex sync.Mutex + resps := make([]*tsopb.GetMinTSResponse, 0) + wg := sync.WaitGroup{} + wg.Add(len(addrs)) + for _, addr := range addrs { + go func(addr string) { + defer wg.Done() + resp, err := c.getMinTSFromSingleServer(ctx, addr, c.option.timeout) + if err != nil || resp == nil { + log.Warn("[tso] failed to get min ts from tso server", + zap.String("address", addr), zap.Error(err)) + return + } + mutex.Lock() + defer mutex.Unlock() + resps = append(resps, resp) + }(addr) + } + wg.Wait() + + // Check the results. The returned minimal timestamp is valid if all the conditions are met: + // 1. The number of responses is equal to the number of TSO servers/pods. + // 2. The number of keyspace groups asked is equal to the number of TSO servers/pods. + // 3. The minimal timestamp is not zero. + var ( + minTS *pdpb.Timestamp + keyspaceGroupsAsked uint32 + ) + if len(resps) == 0 { + return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("none of tso server/pod responded") + } + emptyTS := &pdpb.Timestamp{} + keyspaceGroupsTotal := resps[0].KeyspaceGroupsTotal + for _, resp := range resps { + if resp.KeyspaceGroupsTotal == 0 { + return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("the tso service has no keyspace group") + } + if resp.KeyspaceGroupsTotal != keyspaceGroupsTotal { + return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs( + "the tso service has inconsistent keyspace group total count") + } + keyspaceGroupsAsked += resp.KeyspaceGroupsServing + if tsoutil.CompareTimestamp(resp.Timestamp, emptyTS) > 0 && + (minTS == nil || tsoutil.CompareTimestamp(resp.Timestamp, minTS) < 0) { + minTS = resp.Timestamp + } + } + + if keyspaceGroupsAsked != keyspaceGroupsTotal { + return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs( + fmt.Sprintf("can't query all the tso keyspace groups. Asked %d, expected %d", + keyspaceGroupsAsked, keyspaceGroupsTotal)) + } + + if minTS == nil { + return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("the tso service is not ready") + } + + return minTS.Physical, tsoutil.AddLogical(minTS.Logical, 0, minTS.SuffixBits), nil +} + +func (c *tsoClient) getMinTSFromSingleServer( + ctx context.Context, tsoSrvAddr string, timeout time.Duration, +) (*tsopb.GetMinTSResponse, error) { + cc, err := c.svcDiscovery.GetOrCreateGRPCConn(tsoSrvAddr) + if err != nil { + return nil, errs.ErrClientGetMinTSO.FastGenByArgs( + fmt.Sprintf("can't connect to tso server %s", tsoSrvAddr)) + } + + cctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + resp, err := tsopb.NewTSOClient(cc).GetMinTS( + cctx, &tsopb.GetMinTSRequest{ + Header: &tsopb.RequestHeader{ + ClusterId: c.svcDiscovery.GetClusterID(), + KeyspaceId: c.svcDiscovery.GetKeyspaceID(), + KeyspaceGroupId: c.svcDiscovery.GetKeyspaceGroupID(), + }, + DcLocation: globalDCLocation, + }) + if err != nil { + attachErr := errors.Errorf("error:%s target:%s status:%s", + err, cc.Target(), cc.GetState().String()) + return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause() + } + if resp == nil { + attachErr := errors.Errorf("error:%s target:%s status:%s", + "no min ts info collected", cc.Target(), cc.GetState().String()) + return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause() + } + if resp.GetHeader().GetError() != nil { + attachErr := errors.Errorf("error:%s target:%s status:%s", + resp.GetHeader().GetError().String(), cc.Target(), cc.GetState().String()) + return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause() + } + + return resp, nil +} diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 04d2ea41235..d2d62814619 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" + "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -715,19 +716,14 @@ func (c *tsoClient) processRequests( return err } // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. - firstLogical := addLogical(logical, -count+1, suffixBits) + firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits) c.compareAndSwapTS(dcLocation, physical, firstLogical, suffixBits, count) c.finishRequest(requests, physical, firstLogical, suffixBits, nil) return nil } -// Because of the suffix, we need to shift the count before we add it to the logical part. -func addLogical(logical, count int64, suffixBits uint32) int64 { - return logical + count<= len(t.addrs) } func (t *tsoServerDiscovery) resetFailure() { @@ -414,8 +415,9 @@ func (c *tsoServiceDiscovery) updateMember() error { } keyspaceGroup, err := c.findGroupByKeyspaceID(c.keyspaceID, tsoServerAddr, updateMemberTimeout) if err != nil { - c.tsoServerDiscovery.countFailure() - log.Error("[tso] failed to find the keyspace group", errs.ZapError(err)) + if c.tsoServerDiscovery.countFailure() { + log.Error("[tso] failed to find the keyspace group", errs.ZapError(err)) + } return err } c.tsoServerDiscovery.resetFailure() diff --git a/client/tsoutil/tsoutil.go b/client/tsoutil/tsoutil.go new file mode 100644 index 00000000000..ffc449640ac --- /dev/null +++ b/client/tsoutil/tsoutil.go @@ -0,0 +1,46 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsoutil + +import ( + "github.com/pingcap/kvproto/pkg/pdpb" +) + +// AddLogical shifts the count before we add it to the logical part. +func AddLogical(logical, count int64, suffixBits uint32) int64 { + return logical + count< tsoTwo, returns 1. +// If tsoOne = tsoTwo, returns 0. +// If tsoOne < tsoTwo, returns -1. +func CompareTimestamp(tsoOne, tsoTwo *pdpb.Timestamp) int { + if tsoOne.GetPhysical() > tsoTwo.GetPhysical() || (tsoOne.GetPhysical() == tsoTwo.GetPhysical() && tsoOne.GetLogical() > tsoTwo.GetLogical()) { + return 1 + } + if tsoOne.GetPhysical() == tsoTwo.GetPhysical() && tsoOne.GetLogical() == tsoTwo.GetLogical() { + return 0 + } + return -1 +} diff --git a/errors.toml b/errors.toml index d425288d955..540ed5c3e13 100644 --- a/errors.toml +++ b/errors.toml @@ -746,6 +746,11 @@ error = ''' get local allocator failed, %s ''' +["PD:tso:ErrGetMinTS"] +error = ''' +get min ts failed, %s +''' + ["PD:tso:ErrKeyspaceGroupIDInvalid"] error = ''' the keyspace group id is invalid, %s diff --git a/go.mod b/go.mod index 92bf0fb4f0f..8413fd77b47 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be + github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 diff --git a/go.sum b/go.sum index 56e556f7789..1c4b9276aa5 100644 --- a/go.sum +++ b/go.sum @@ -422,8 +422,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be h1:eHtwHgPzzm8aIZ4x8o7zg1b23cjUl0AikW+SDLpqf3E= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 37524798046..8e9fb83de09 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -54,6 +54,7 @@ var ( ErrLoadKeyspaceGroupsRetryExhausted = errors.Normalize("load keyspace groups retry exhausted, %s", errors.RFCCodeText("PD:tso:ErrLoadKeyspaceGroupsRetryExhausted")) ErrKeyspaceGroupNotInitialized = errors.Normalize("the keyspace group %d isn't initialized", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupNotInitialized")) ErrKeyspaceNotAssigned = errors.Normalize("the keyspace %d isn't assigned to any keyspace group", errors.RFCCodeText("PD:tso:ErrKeyspaceNotAssigned")) + ErrGetMinTS = errors.Normalize("get min ts failed, %s", errors.RFCCodeText("PD:tso:ErrGetMinTS")) ) // member errors diff --git a/pkg/mcs/tso/server/grpc_service.go b/pkg/mcs/tso/server/grpc_service.go index 8b5765b1875..dd0a96b1cba 100644 --- a/pkg/mcs/tso/server/grpc_service.go +++ b/pkg/mcs/tso/server/grpc_service.go @@ -36,7 +36,8 @@ import ( // gRPC errors var ( - ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") + ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") + ErrClusterMismatched = status.Errorf(codes.Unavailable, "cluster mismatched") ) var _ tsopb.TSOServer = (*Service)(nil) @@ -157,6 +158,13 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error { func (s *Service) FindGroupByKeyspaceID( ctx context.Context, request *tsopb.FindGroupByKeyspaceIDRequest, ) (*tsopb.FindGroupByKeyspaceIDResponse, error) { + respKeyspaceGroup := request.GetHeader().GetKeyspaceGroupId() + if errorType, err := s.validRequest(request.GetHeader()); err != nil { + return &tsopb.FindGroupByKeyspaceIDResponse{ + Header: s.wrapErrorToHeader(errorType, err.Error(), respKeyspaceGroup), + }, nil + } + keyspaceID := request.GetKeyspaceId() am, keyspaceGroup, keyspaceGroupID, err := s.keyspaceGroupManager.FindGroupByKeyspaceID(keyspaceID) if err != nil { @@ -199,6 +207,47 @@ func (s *Service) FindGroupByKeyspaceID( }, nil } +// GetMinTS gets the minimum timestamp across all keyspace groups served by the TSO server +// who receives and handles the request. +func (s *Service) GetMinTS( + ctx context.Context, request *tsopb.GetMinTSRequest, +) (*tsopb.GetMinTSResponse, error) { + respKeyspaceGroup := request.GetHeader().GetKeyspaceGroupId() + if errorType, err := s.validRequest(request.GetHeader()); err != nil { + return &tsopb.GetMinTSResponse{ + Header: s.wrapErrorToHeader(errorType, err.Error(), respKeyspaceGroup), + }, nil + } + + minTS, kgAskedCount, kgTotalCount, err := s.keyspaceGroupManager.GetMinTS(request.GetDcLocation()) + if err != nil { + return &tsopb.GetMinTSResponse{ + Header: s.wrapErrorToHeader( + tsopb.ErrorType_UNKNOWN, err.Error(), respKeyspaceGroup), + Timestamp: &minTS, + KeyspaceGroupsServing: kgAskedCount, + KeyspaceGroupsTotal: kgTotalCount, + }, nil + } + + return &tsopb.GetMinTSResponse{ + Header: s.header(respKeyspaceGroup), + Timestamp: &minTS, + KeyspaceGroupsServing: kgAskedCount, + KeyspaceGroupsTotal: kgTotalCount, + }, nil +} + +func (s *Service) validRequest(header *tsopb.RequestHeader) (tsopb.ErrorType, error) { + if s.IsClosed() || s.keyspaceGroupManager == nil { + return tsopb.ErrorType_NOT_BOOTSTRAPPED, ErrNotStarted + } + if header == nil || header.GetClusterId() != s.clusterID { + return tsopb.ErrorType_CLUSTER_MISMATCHED, ErrClusterMismatched + } + return tsopb.ErrorType_OK, nil +} + func (s *Service) header(keyspaceGroupBelongTo uint32) *tsopb.ResponseHeader { if s.clusterID == 0 { return s.wrapErrorToHeader( diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 9141e85af19..a780e7da74e 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -1070,7 +1070,7 @@ func (am *AllocatorManager) deleteAllocatorGroup(dcLocation string) { // HandleRequest forwards TSO allocation requests to correct TSO Allocators. func (am *AllocatorManager) HandleRequest(dcLocation string, count uint32) (pdpb.Timestamp, error) { - if dcLocation == "" { + if len(dcLocation) == 0 { dcLocation = GlobalDCLocation } allocatorGroup, exist := am.getAllocatorGroup(dcLocation) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 732b3954797..7038ef8e373 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -688,6 +688,52 @@ func (kgm *KeyspaceGroupManager) checkKeySpaceGroupID(id uint32) error { fmt.Sprintf("%d shouldn't >= %d", id, mcsutils.MaxKeyspaceGroupCountInUse)) } +// GetMinTS returns the minimum timestamp across all keyspace groups served by this TSO server/pod. +func (kgm *KeyspaceGroupManager) GetMinTS( + dcLocation string, +) (_ pdpb.Timestamp, kgAskedCount, kgTotalCount uint32, err error) { + kgm.RLock() + defer kgm.RUnlock() + + var minTS *pdpb.Timestamp + for i, am := range kgm.ams { + if kgm.kgs[i] != nil { + kgTotalCount++ + } + // If any keyspace group hasn't elected primary, we can't know its current timestamp of + // the group, so as to the min ts across all keyspace groups. Return error in this case. + if am != nil && !am.member.IsLeaderElected() { + return pdpb.Timestamp{}, kgAskedCount, kgTotalCount, errs.ErrGetMinTS.FastGenByArgs("leader is not elected") + } + // Skip the keyspace groups that are not served by this TSO Server/Pod. + if am == nil || !am.IsLeader() { + continue + } + kgAskedCount++ + // Skip the keyspace groups that are split targets, because they always have newer + // time lines than the existing split sources thus won't contribute to the min ts. + if kgm.kgs[i] != nil && kgm.kgs[i].IsSplitTarget() { + continue + } + ts, err := am.HandleRequest(dcLocation, 1) + if err != nil { + return pdpb.Timestamp{}, kgAskedCount, kgTotalCount, err + } + if minTS == nil || tsoutil.CompareTimestamp(&ts, minTS) < 0 { + minTS = &ts + } + } + + if minTS == nil { + // This TSO server/pod is not serving any keyspace group, return an empty timestamp, + // and the client needs to skip the empty timestamps when collecting the min timestamp + // from all TSO servers/pods. + return pdpb.Timestamp{}, kgAskedCount, kgTotalCount, nil + } + + return *minTS, kgAskedCount, kgTotalCount, nil +} + func genNotServedErr(perr *perrors.Error, keyspaceGroupID uint32) error { return perr.FastGenByArgs( fmt.Sprintf( diff --git a/tests/integrations/client/go.mod b/tests/integrations/client/go.mod index 98f6b733717..c0c409d24c9 100644 --- a/tests/integrations/client/go.mod +++ b/tests/integrations/client/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/docker/go-units v0.4.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be + github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/client/go.sum b/tests/integrations/client/go.sum index 697e6e4ceb1..056801db320 100644 --- a/tests/integrations/client/go.sum +++ b/tests/integrations/client/go.sum @@ -385,8 +385,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be h1:eHtwHgPzzm8aIZ4x8o7zg1b23cjUl0AikW+SDLpqf3E= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index db801e1ad63..aae3bdeb93a 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -12,7 +12,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be + github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.2 diff --git a/tests/integrations/mcs/go.sum b/tests/integrations/mcs/go.sum index 54afef52efe..68c319e0b94 100644 --- a/tests/integrations/mcs/go.sum +++ b/tests/integrations/mcs/go.sum @@ -385,8 +385,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be h1:eHtwHgPzzm8aIZ4x8o7zg1b23cjUl0AikW+SDLpqf3E= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 31286c8e190..799fccd42e4 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -98,20 +98,22 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByDefaultKeysp // a keyspace group before, will be served by the default keyspace group. re := suite.Require() testutil.Eventually(re, func() bool { - for _, server := range suite.tsoCluster.GetServers() { - allServed := true - for _, keyspaceID := range []uint32{0, 1, 2} { + for _, keyspaceID := range []uint32{0, 1, 2} { + served := false + for _, server := range suite.tsoCluster.GetServers() { if server.IsKeyspaceServing(keyspaceID, mcsutils.DefaultKeyspaceGroupID) { tam, err := server.GetTSOAllocatorManager(mcsutils.DefaultKeyspaceGroupID) re.NoError(err) re.NotNil(tam) - } else { - allServed = false + served = true + break } } - return allServed + if !served { + return false + } } - return false + return true }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) // Any keyspace that was assigned to a keyspace group before, except default keyspace, diff --git a/tests/integrations/tso/__debug_bin b/tests/integrations/tso/__debug_bin new file mode 100755 index 00000000000..d364acb99a5 Binary files /dev/null and b/tests/integrations/tso/__debug_bin differ diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 7b787d3c16f..204534e7ffa 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -137,6 +137,24 @@ func (suite *tsoClientTestSuite) SetupSuite() { suite.keyspaceIDs = append(suite.keyspaceIDs, keyspaceGroup.keyspaceIDs...) } + // Make sure all keyspace groups are available. + testutil.Eventually(re, func() bool { + for _, keyspaceID := range suite.keyspaceIDs { + served := false + for _, server := range suite.tsoCluster.GetServers() { + if server.IsKeyspaceServing(keyspaceID, mcsutils.DefaultKeyspaceGroupID) { + served = true + break + } + } + if !served { + return false + } + } + return true + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + // Create clients and make sure they all have discovered the tso service. suite.clients = mcs.WaitForMultiKeyspacesTSOAvailable( suite.ctx, re, suite.keyspaceIDs, strings.Split(suite.backendEndpoints, ",")) re.Equal(len(suite.keyspaceIDs), len(suite.clients)) @@ -225,6 +243,38 @@ func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() { wg.Wait() } +// TestGetMinTS tests the correctness of GetMinTS. +func (suite *tsoClientTestSuite) TestGetMinTS() { + var wg sync.WaitGroup + wg.Add(tsoRequestConcurrencyNumber * len(suite.clients)) + for i := 0; i < tsoRequestConcurrencyNumber; i++ { + for _, client := range suite.clients { + go func(client pd.Client) { + defer wg.Done() + var lastMinTS uint64 + for j := 0; j < tsoRequestRound; j++ { + physical, logical, err := client.GetMinTS(suite.ctx) + suite.NoError(err) + minTS := tsoutil.ComposeTS(physical, logical) + suite.Less(lastMinTS, minTS) + lastMinTS = minTS + + // Now we check whether the returned ts is the minimum one + // among all keyspace groups, i.e., the returned ts is + // less than the new timestamps of all keyspace groups. + for _, client := range suite.clients { + physical, logical, err := client.GetTS(suite.ctx) + suite.NoError(err) + ts := tsoutil.ComposeTS(physical, logical) + suite.Less(minTS, ts) + } + } + }(client) + } + } + wg.Wait() +} + // More details can be found in this issue: https://github.com/tikv/pd/issues/4884 func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() { re := suite.Require() diff --git a/tests/integrations/tso/go.mod b/tests/integrations/tso/go.mod index 401d05bb151..910be73dea0 100644 --- a/tests/integrations/tso/go.mod +++ b/tests/integrations/tso/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be + github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/tso/go.sum b/tests/integrations/tso/go.sum index 8b573f3ed9e..cc3d7e6bfd2 100644 --- a/tests/integrations/tso/go.sum +++ b/tests/integrations/tso/go.sum @@ -383,8 +383,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be h1:eHtwHgPzzm8aIZ4x8o7zg1b23cjUl0AikW+SDLpqf3E= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tools/pd-tso-bench/go.sum b/tools/pd-tso-bench/go.sum index ab85155db70..f4e31d3679a 100644 --- a/tools/pd-tso-bench/go.sum +++ b/tools/pd-tso-bench/go.sum @@ -851,8 +851,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be h1:eHtwHgPzzm8aIZ4x8o7zg1b23cjUl0AikW+SDLpqf3E= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=