Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20241219054535-6b8c588c3122
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20251218093338-9f0ac2fc9a1a
github.com/pingcap/kvproto v0.0.0-20260204083606-f3cc97745222
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.20.5
Expand All @@ -31,6 +31,7 @@ require (
go.uber.org/zap v1.26.0
golang.org/x/sync v0.12.0
google.golang.org/grpc v1.73.0
google.golang.org/protobuf v1.36.6
modernc.org/mathutil v1.7.1
)

Expand All @@ -55,7 +56,6 @@ require (
golang.org/x/text v0.23.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXH
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20251218093338-9f0ac2fc9a1a h1:uGZ0XGBMtcJTmh+6mlpF/SCRZhJXOPES7lx0oY3NBas=
github.com/pingcap/kvproto v0.0.0-20251218093338-9f0ac2fc9a1a/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20260204083606-f3cc97745222 h1:F5Gur7EuPv8TUZDPgyj2TqVMP8qrCuiVdefy7s9Tp4Q=
github.com/pingcap/kvproto v0.0.0-20260204083606-f3cc97745222/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20250523034308-74f78ae071ee
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/kvproto v0.0.0-20251218093338-9f0ac2fc9a1a
github.com/pingcap/kvproto v0.0.0-20260204083606-f3cc97745222
github.com/pingcap/tidb v1.1.0-beta.0.20250609033843-a165d9fd7c01
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.23.0
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1445,8 +1445,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20251218093338-9f0ac2fc9a1a h1:uGZ0XGBMtcJTmh+6mlpF/SCRZhJXOPES7lx0oY3NBas=
github.com/pingcap/kvproto v0.0.0-20251218093338-9f0ac2fc9a1a/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20260204083606-f3cc97745222 h1:F5Gur7EuPv8TUZDPgyj2TqVMP8qrCuiVdefy7s9Tp4Q=
github.com/pingcap/kvproto v0.0.0-20260204083606-f3cc97745222/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9 h1:qG9BSvlWFEE5otQGamuWedx9LRm0nrHvsQRQiW8SxEs=
Expand Down
3 changes: 3 additions & 0 deletions internal/apicodec/codec_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ func (c *codecV2) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error)
r := *req.SplitRegion()
r.SplitKeys = c.encodeKeys(r.SplitKeys)
req.Req = &r
case tikvrpc.CmdVersionedCop:
// TODO: Support VersionedCop in APIv2.
return nil, errors.New("codecV2 does not support VersionedCop request")
}

return req, nil
Expand Down
12 changes: 10 additions & 2 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,20 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R
return wrapErrConn(tikvrpc.CallDebugRPC(ctx1, client, req))
}

client := tikvpb.NewTikvClient(clientConn)

// Set metadata for request forwarding. Needn't forward DebugReq.
if req.ForwardedHost != "" {
ctx = metadata.AppendToOutgoingContext(ctx, forwardMetadataKey, req.ForwardedHost)
}

// Version-aware coprocessor requests should go through VersionedKv services.
if req.Type == tikvrpc.CmdVersionedCop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BatchReq would be used by default at L354, I suppose this may not be reached, have you executed E2E test for this PR?

Copy link
Contributor Author

@wshwsh12 wshwsh12 Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"ToBatchCommandsRequest" can't converts CmdVersionedCop to BatchReq and return nil. So now CmdVersionedCop always reached this path.
And I have run some sqls in my local and it works.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are some performance risks. How often is this CmdVersionedCop called? If the frequency is high, I think it's necessary to modify kvproto to include it in the BatchReq and BatchResp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, this feature does not have high performance requirements, and RPC requests are not the performance bottleneck. We can optimize it later if performance becomes an issue.”

client := tikvpb.NewVersionedKvClient(clientConn)
ctx1, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return wrapErrConn(tikvrpc.CallVersionedKV(ctx1, client, req))
}

client := tikvpb.NewTikvClient(clientConn)
switch req.Type {
case tikvrpc.CmdBatchCop:
return wrapErrConn(c.getBatchCopStreamResponse(ctx, client, req, timeout, connPool))
Expand Down
74 changes: 74 additions & 0 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"fmt"
"math"
"math/rand"
"net"
"runtime"
"strconv"
"strings"
Expand All @@ -61,8 +62,10 @@ import (
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util/async"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/test/bufconn"
)

func TestConn(t *testing.T) {
Expand Down Expand Up @@ -1153,3 +1156,74 @@ func TestBatchPolicy(t *testing.T) {
}
})
}

type versionedCoprocessorRoutingServer struct {
tikvpb.UnimplementedTikvServer
tikvpb.UnimplementedVersionedKvServer

tikvCalled int32
versionedCalled int32
}

func (s *versionedCoprocessorRoutingServer) Coprocessor(ctx context.Context, req *coprocessor.Request) (*coprocessor.Response, error) {
atomic.AddInt32(&s.tikvCalled, 1)
return &coprocessor.Response{Data: []byte("tikv")}, nil
}

func (s *versionedCoprocessorRoutingServer) VersionedCoprocessor(ctx context.Context, req *coprocessor.Request) (*coprocessor.Response, error) {
atomic.AddInt32(&s.versionedCalled, 1)
return &coprocessor.Response{Data: []byte("versioned")}, nil
}

func TestVersionedCoprocessorRouting(t *testing.T) {
defer config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.MaxBatchSize = 0
})()

const bufSize = 1024 * 1024
lis := bufconn.Listen(bufSize)
t.Cleanup(func() { _ = lis.Close() })

srv := grpc.NewServer()
serverImpl := &versionedCoprocessorRoutingServer{}
tikvpb.RegisterTikvServer(srv, serverImpl)
tikvpb.RegisterVersionedKvServer(srv, serverImpl)
go func() { _ = srv.Serve(lis) }()
t.Cleanup(srv.Stop)

client := NewRPCClient(WithGRPCDialOptions(grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) {
return lis.Dial()
})))
t.Cleanup(func() { _ = client.Close() })

addr := "bufnet"

req := tikvrpc.NewRequest(tikvrpc.CmdVersionedCop, &coprocessor.Request{
VersionedRanges: []*coprocessor.VersionedKeyRange{{
Range: &coprocessor.KeyRange{Start: []byte("a"), End: []byte("b")},
ReadTs: 1,
}},
})
req.StoreTp = tikvrpc.TiKV

resp, err := client.SendRequest(context.Background(), addr, req, 3*time.Second)
require.NoError(t, err)
require.Equal(t, "versioned", string(resp.Resp.(*coprocessor.Response).Data))
require.Equal(t, int32(0), atomic.LoadInt32(&serverImpl.tikvCalled))
require.Equal(t, int32(1), atomic.LoadInt32(&serverImpl.versionedCalled))

// VersionedRanges should not affect routing; non-versioned cmd must still call TiKV services.
req = tikvrpc.NewRequest(tikvrpc.CmdCop, &coprocessor.Request{
VersionedRanges: []*coprocessor.VersionedKeyRange{{
Range: &coprocessor.KeyRange{Start: []byte("a"), End: []byte("b")},
ReadTs: 1,
}},
})
req.StoreTp = tikvrpc.TiKV

resp, err = client.SendRequest(context.Background(), addr, req, 3*time.Second)
require.NoError(t, err)
require.Equal(t, "tikv", string(resp.Resp.(*coprocessor.Response).Data))
require.Equal(t, int32(1), atomic.LoadInt32(&serverImpl.tikvCalled))
require.Equal(t, int32(1), atomic.LoadInt32(&serverImpl.versionedCalled))
}
7 changes: 5 additions & 2 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,7 @@ func (s *sendReqState) send() (canceled bool) {
s.Stats.RecordRPCRuntimeStats(req.Type, rpcDuration)
if val, fpErr := util.EvalFailpoint("tikvStoreRespResult"); fpErr == nil {
if val.(bool) {
if req.Type == tikvrpc.CmdCop && bo.GetTotalSleep() == 0 {
if (req.Type == tikvrpc.CmdCop || req.Type == tikvrpc.CmdVersionedCop) && bo.GetTotalSleep() == 0 {
s.vars.resp, s.vars.err = &tikvrpc.Response{
Resp: &coprocessor.Response{RegionError: &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}},
}, nil
Expand Down Expand Up @@ -2258,7 +2258,10 @@ func (s *RegionRequestSender) validateReadTS(ctx context.Context, req *tikvrpc.R

var readTS uint64
switch req.Type {
case tikvrpc.CmdGet, tikvrpc.CmdScan, tikvrpc.CmdBatchGet, tikvrpc.CmdCop, tikvrpc.CmdCopStream, tikvrpc.CmdBatchCop, tikvrpc.CmdScanLock, tikvrpc.CmdBufferBatchGet:
case tikvrpc.CmdGet, tikvrpc.CmdScan, tikvrpc.CmdBatchGet,
tikvrpc.CmdCop, tikvrpc.CmdCopStream, tikvrpc.CmdBatchCop,
tikvrpc.CmdVersionedCop,
tikvrpc.CmdScanLock, tikvrpc.CmdBufferBatchGet:
readTS = req.GetStartTS()

// TODO: Check transactional write requests that has implicit read.
Expand Down
3 changes: 2 additions & 1 deletion internal/locate/replica_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,8 @@ func isReadReqConfigurableTimeout(req *tikvrpc.Request) bool {
func isReadReq(tp tikvrpc.CmdType) bool {
switch tp {
case tikvrpc.CmdGet, tikvrpc.CmdBatchGet, tikvrpc.CmdScan,
tikvrpc.CmdCop, tikvrpc.CmdBatchCop, tikvrpc.CmdCopStream:
tikvrpc.CmdCop, tikvrpc.CmdBatchCop, tikvrpc.CmdCopStream,
tikvrpc.CmdVersionedCop:
return true
default:
return false
Expand Down
11 changes: 11 additions & 0 deletions tikvrpc/cmds_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tikvrpc/gen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ cmds=(
PhysicalScanLock
Cop
BatchCop
VersionedCop
MvccGetByKey
MvccGetByStartTs
SplitRegion
Expand Down
30 changes: 29 additions & 1 deletion tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ const (
CmdMPPConn // TODO: These non TiKV RPCs should be moved out of TiKV client
CmdMPPCancel // TODO: These non TiKV RPCs should be moved out of TiKV client
CmdMPPAlive // TODO: These non TiKV RPCs should be moved out of TiKV client
CmdVersionedCop

CmdMvccGetByKey CmdType = 1024 + iota
CmdMvccGetByStartTs
Expand Down Expand Up @@ -192,6 +193,8 @@ func (t CmdType) String() string {
return "CopStream"
case CmdBatchCop:
return "BatchCop"
case CmdVersionedCop:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest putting CmdVersionedCop right after CmdCop wherever it appears to keep consistency, except CmdVersionedCop is at the end of the CmdType enum.

return "VersionedCop"
case CmdMPPTask:
return "DispatchMPPTask"
case CmdMPPConn:
Expand Down Expand Up @@ -490,6 +493,11 @@ func (req *Request) Cop() *coprocessor.Request {
return req.Req.(*coprocessor.Request)
}

// VersionedCop returns coprocessor request in request.
func (req *Request) VersionedCop() *coprocessor.Request {
return req.Req.(*coprocessor.Request)
}

// BatchCop returns BatchCop request in request.
func (req *Request) BatchCop() *coprocessor.BatchRequest {
return req.Req.(*coprocessor.BatchRequest)
Expand Down Expand Up @@ -697,6 +705,8 @@ func (req *Request) GetSize() int {
size = req.Scan().Size()
case CmdCop:
size = req.Cop().Size()
case CmdVersionedCop:
size = req.VersionedCop().Size()
case CmdPrewrite:
size = req.Prewrite().Size()
case CmdCommit:
Expand Down Expand Up @@ -1005,7 +1015,7 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) {
p = &kvrpcpb.RawChecksumResponse{
RegionError: e,
}
case CmdCop:
case CmdCop, CmdVersionedCop:
p = &coprocessor.Response{
RegionError: e,
}
Expand Down Expand Up @@ -1301,6 +1311,22 @@ func CallDebugRPC(ctx context.Context, client debugpb.DebugClient, req *Request)
return resp, err
}

// CallVersionedKV launches a versioned kv rpc call.
func CallVersionedKV(ctx context.Context, client tikvpb.VersionedKvClient, req *Request) (*Response, error) {
resp := &Response{}
var err error
switch req.Type {
case CmdVersionedCop:
resp.Resp, err = client.VersionedCoprocessor(ctx, req.VersionedCop())
default:
return nil, errors.Errorf("invalid request type: %v", req.Type)
}
if err != nil {
return nil, errors.WithStack(err)
}
return resp, nil
}

// Lease is used to implement grpc stream timeout.
type Lease struct {
Cancel context.CancelFunc
Expand Down Expand Up @@ -1507,6 +1533,8 @@ func (req *Request) GetStartTS() uint64 {
return req.Cop().GetStartTs()
case CmdBatchCop:
return req.BatchCop().GetStartTs()
case CmdVersionedCop:
return req.VersionedCop().GetStartTs()
case CmdMvccGetByStartTs:
return req.MvccGetByStartTs().GetStartTs()
default:
Expand Down
1 change: 1 addition & 0 deletions tikvrpc/tikvrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func TestTiDB51921(t *testing.T) {
NewRequest(CmdCop, &coprocessor.Request{}),
NewRequest(CmdCopStream, &coprocessor.Request{}),
NewRequest(CmdBatchCop, &coprocessor.BatchRequest{}),
NewRequest(CmdVersionedCop, &coprocessor.Request{}),
NewRequest(CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{}),
NewRequest(CmdMvccGetByStartTs, &kvrpcpb.MvccGetByStartTsRequest{}),
NewRequest(CmdSplitRegion, &kvrpcpb.SplitRegionRequest{}),
Expand Down
Loading