Skip to content

Commit

Permalink
Verify returned timestamp
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Jun 8, 2023
1 parent 3189f28 commit f193fe4
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 9 deletions.
5 changes: 0 additions & 5 deletions pkg/utils/tsoutil/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"sync"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/errs"
Expand All @@ -34,10 +33,6 @@ const (
DefaultTSOProxyTimeout = 3 * time.Second
)

type tsoResp interface {
GetTimestamp() *pdpb.Timestamp
}

// TSODispatcher dispatches the TSO requests to the corresponding forwarding TSO channels.
type TSODispatcher struct {
tsoProxyHandleDuration prometheus.Histogram
Expand Down
6 changes: 3 additions & 3 deletions pkg/utils/tsoutil/tso_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Request interface {
getCount() uint32
// process sends request and receive response via stream.
// count defines the count of timestamps to retrieve.
process(ctx context.Context, forwardStream stream, count uint32, tsoProtoFactory ProtoFactory) (tsoResp, error)
process(ctx context.Context, forwardStream stream, count uint32, tsoProtoFactory ProtoFactory) (response, error)
// sendResponseAsync sends the response back to the sender of the request
sendResponseAsync(countSum, physical, firstLogical int64, suffixBits uint32) int64
// sendErrorResponseAsync creates an tso error response and sends it back to the client asynchronously.
Expand Down Expand Up @@ -91,7 +91,7 @@ func (r *TSOProtoRequest) getCount() uint32 {
// count defines the count of timestamps to retrieve.
func (r *TSOProtoRequest) process(
ctx context.Context, forwardStream stream, count uint32, tsoProtoFactory ProtoFactory,
) (tsoResp, error) {
) (response, error) {
return forwardStream.process(ctx, r.request.GetHeader().GetClusterId(), count,
r.request.GetHeader().GetKeyspaceId(), r.request.GetHeader().GetKeyspaceGroupId(), r.request.GetDcLocation())
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func (r *PDProtoRequest) getCount() uint32 {
// count defines the count of timestamps to retrieve.
func (r *PDProtoRequest) process(
ctx context.Context, forwardStream stream, count uint32, tsoProtoFactory ProtoFactory,
) (tsoResp, error) {
) (response, error) {
return forwardStream.process(ctx, r.request.GetHeader().GetClusterId(), count,
utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID, r.request.GetDcLocation())
}
Expand Down
8 changes: 7 additions & 1 deletion tests/integrations/mcs/tso/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/client/tsoutil"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
"google.golang.org/grpc"
Expand Down Expand Up @@ -126,14 +127,19 @@ func (s *tsoProxyTestSuite) verifyTSOProxy(
wg.Add(1)
go func(streamCopy pdpb.PD_TsoClient) {
defer wg.Done()
lastPhysical, lastLogical := int64(0), int64(0)
for i := 0; i < requestsPerClient; i++ {
req := reqs[rand.Intn(requestsPerClient)]
err := streamCopy.Send(req)
re.NoError(err)
resp, err := streamCopy.Recv()
re.NoError(err)
re.Equal(req.GetCount(), resp.GetCount())
fmt.Printf("client %v, req %v, resp %v\n", streamCopy, req, resp)
ts := resp.GetTimestamp()
count := int64(resp.GetCount())
physical, largestLogic, suffixBits := ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
firstLogical := tsoutil.AddLogical(largestLogic, -count+1, suffixBits)
re.False(tsoutil.TSLessEqual(physical, firstLogical, lastPhysical, lastLogical))
}
}(streamCopy)
}
Expand Down

0 comments on commit f193fe4

Please sign in to comment.