Skip to content

Commit

Permalink
store/tikv: forward requests by unary call (#23362)
Browse files Browse the repository at this point in the history
  • Loading branch information
youjiali1995 authored Mar 17, 2021
1 parent 080ed4e commit 77713d2
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 31 deletions.
15 changes: 14 additions & 1 deletion store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
)

// MaxRecvMsgSize set max gRPC receive message size received from server. If any message size is larger than
Expand All @@ -70,6 +71,9 @@ const (
grpcInitialConnWindowSize = 1 << 30
)

// forwardMetadataKey is the key of gRPC metadata which represents a forwarded request.
const forwardMetadataKey = "tikv-forwarded-host"

// Client is a client that sends RPC.
// It should not be used after calling Close().
type Client interface {
Expand Down Expand Up @@ -354,6 +358,7 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
c.recycleMu.Unlock()
}

// enableBatch means TiDB can send BatchCommands to the connection. It doesn't mean TiDB must do it.
// TiDB will not send batch commands to TiFlash, to resolve the conflict with Batch Cop Request.
enableBatch := req.StoreTp != kv.TiDB && req.StoreTp != kv.TiFlash
c.recycleMu.RLock()
Expand All @@ -363,9 +368,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
return nil, errors.Trace(err)
}

// TiDB uses [gRPC-metadata](https://github.com/grpc/grpc-go/blob/master/Documentation/grpc-metadata.md) to
// indicate a request needs forwarding. gRPC doesn't support setting a metadata for each request in a stream,
// so we don't use BatchCommands for forwarding for now.
canBatch := enableBatch && req.ForwardedHost == ""
// TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since
// request to TiDB is not high frequency.
if config.GetGlobalConfig().TiKVClient.MaxBatchSize > 0 && enableBatch {
if config.GetGlobalConfig().TiKVClient.MaxBatchSize > 0 && canBatch {
if batchReq := req.ToBatchCommandsRequest(); batchReq != nil {
defer trace.StartRegion(ctx, req.Type.String()).End()
return sendBatchRequest(ctx, addr, connArray.batchConn, batchReq, timeout)
Expand All @@ -387,6 +396,10 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R

client := tikvpb.NewTikvClient(clientConn)

// Set metadata for request forwarding. Needn't forward DebugReq.
if req.ForwardedHost != "" {
ctx = metadata.AppendToOutgoingContext(ctx, forwardMetadataKey, req.ForwardedHost)
}
switch req.Type {
case tikvrpc.CmdBatchCop:
return c.getBatchCopStreamResponse(ctx, client, req, timeout, connArray)
Expand Down
85 changes: 76 additions & 9 deletions store/tikv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"google.golang.org/grpc/metadata"
)

func TestT(t *testing.T) {
Expand All @@ -46,15 +49,10 @@ var _ = Suite(&testClientSuite{})
var _ = SerialSuites(&testClientFailSuite{})
var _ = SerialSuites(&testClientSerialSuite{})

func setMaxBatchSize(size uint) {
newConf := config.DefaultConfig()
newConf.TiKVClient.MaxBatchSize = size
config.StoreGlobalConfig(&newConf)
}

func (s *testClientSerialSuite) TestConn(c *C) {
maxBatchSize := config.GetGlobalConfig().TiKVClient.MaxBatchSize
setMaxBatchSize(0)
defer config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.MaxBatchSize = 0
})()

client := NewRPCClient(config.Security{})

Expand All @@ -70,7 +68,6 @@ func (s *testClientSerialSuite) TestConn(c *C) {
conn3, err := client.getConnArray(addr, true)
c.Assert(err, NotNil)
c.Assert(conn3, IsNil)
setMaxBatchSize(maxBatchSize)
}

func (s *testClientSuite) TestRemoveCanceledRequests(c *C) {
Expand Down Expand Up @@ -229,3 +226,73 @@ func (s *testClientSuite) TestCollapseResolveLock(c *C) {
default:
}
}

func (s *testClientSuite) TestForwardMetadata(c *C) {
server, port := startMockTikvService()
c.Assert(port > 0, IsTrue)
defer server.Stop()
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)

// Enable batch and limit the connection count to 1 so that
// there is only one BatchCommands stream.
defer config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.MaxBatchSize = 128
conf.TiKVClient.GrpcConnectionCount = 1
})()
rpcClient := NewRPCClient(config.Security{})
defer rpcClient.closeConns()

var checkCnt uint64
// Check no corresponding metadata if ForwardedHost is empty.
server.setMetaChecker(func(ctx context.Context) error {
atomic.AddUint64(&checkCnt, 1)
// gRPC may set some metadata by default, e.g. "context-type".
md, ok := metadata.FromIncomingContext(ctx)
if ok {
vals := md.Get(forwardMetadataKey)
c.Assert(len(vals), Equals, 0)
}
return nil
})

// Prewrite represents unary-unary call.
prewriteReq := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{})
for i := 0; i < 3; i++ {
_, err := rpcClient.SendRequest(context.Background(), addr, prewriteReq, 10*time.Second)
c.Assert(err, IsNil)
}
// checkCnt should be 1 because BatchCommands is a stream-stream call.
c.Assert(atomic.LoadUint64(&checkCnt), Equals, uint64(1))

// CopStream represents unary-stream call.
copStreamReq := tikvrpc.NewRequest(tikvrpc.CmdCopStream, &coprocessor.Request{})
_, err := rpcClient.SendRequest(context.Background(), addr, copStreamReq, 10*time.Second)
c.Assert(err, IsNil)
c.Assert(atomic.LoadUint64(&checkCnt), Equals, uint64(2))

checkCnt = 0
forwardedHost := "127.0.0.1:6666"
// Check the metadata exists.
server.setMetaChecker(func(ctx context.Context) error {
atomic.AddUint64(&checkCnt, 1)
// gRPC may set some metadata by default, e.g. "context-type".
md, ok := metadata.FromIncomingContext(ctx)
c.Assert(ok, IsTrue)
vals := md.Get(forwardMetadataKey)
c.Assert(vals, DeepEquals, []string{forwardedHost})
return nil
})

prewriteReq.ForwardedHost = forwardedHost
for i := 0; i < 3; i++ {
_, err = rpcClient.SendRequest(context.Background(), addr, prewriteReq, 10*time.Second)
c.Assert(err, IsNil)
}
// checkCnt should be 3 because we don't use BatchCommands for redirection for now.
c.Assert(atomic.LoadUint64(&checkCnt), Equals, uint64(3))

copStreamReq.ForwardedHost = forwardedHost
_, err = rpcClient.SendRequest(context.Background(), addr, copStreamReq, 10*time.Second)
c.Assert(err, IsNil)
c.Assert(atomic.LoadUint64(&checkCnt), Equals, uint64(4))
}
12 changes: 12 additions & 0 deletions store/tikv/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ func StoreGlobalConfig(config *Config) {
globalConf.Store(config)
}

// UpdateGlobal updates the global config, and provide a restore function that can be used to restore to the original.
func UpdateGlobal(f func(conf *Config)) func() {
g := GetGlobalConfig()
restore := func() {
StoreGlobalConfig(g)
}
newConf := *g
f(&newConf)
StoreGlobalConfig(&newConf)
return restore
}

// ParsePath parses this path.
// Path example: tikv://etcd-node1:port,etcd-node2:port?cluster=1&disableGC=false
func ParsePath(path string) (etcdAddrs []string, disableGC bool, err error) {
Expand Down
56 changes: 53 additions & 3 deletions store/tikv/mock_tikv_service_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package tikv

import (
"context"
"fmt"
"net"
"sync"
"time"

"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pingcap/tidb/store/tikv/logutil"
"go.uber.org/zap"
Expand All @@ -13,9 +17,33 @@ import (

type server struct {
tikvpb.TikvServer
grpcServer *grpc.Server
// metaChecker check the metadata of each request. Now only requests
// which need redirection set it.
metaChecker struct {
sync.Mutex
check func(context.Context) error
}
}

func (s *server) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) {
if err := s.checkMetadata(ctx); err != nil {
return nil, err
}
return &kvrpcpb.PrewriteResponse{}, nil
}

func (s *server) CoprocessorStream(req *coprocessor.Request, ss tikvpb.Tikv_CoprocessorStreamServer) error {
if err := s.checkMetadata(ss.Context()); err != nil {
return err
}
return ss.Send(&coprocessor.Response{})
}

func (s *server) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {
if err := s.checkMetadata(ss.Context()); err != nil {
return err
}
for {
req, err := ss.Recv()
if err != nil {
Expand Down Expand Up @@ -43,8 +71,27 @@ func (s *server) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {
}
}

func (s *server) setMetaChecker(check func(context.Context) error) {
s.metaChecker.Lock()
s.metaChecker.check = check
s.metaChecker.Unlock()
}

func (s *server) checkMetadata(ctx context.Context) error {
s.metaChecker.Lock()
defer s.metaChecker.Unlock()
if s.metaChecker.check != nil {
return s.metaChecker.check(ctx)
}
return nil
}

func (s *server) Stop() {
s.grpcServer.Stop()
}

// Try to start a gRPC server and retrun the server instance and binded port.
func startMockTikvService() (*grpc.Server, int) {
func startMockTikvService() (*server, int) {
port := -1
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 0))
if err != nil {
Expand All @@ -53,8 +100,11 @@ func startMockTikvService() (*grpc.Server, int) {
return nil, port
}
port = lis.Addr().(*net.TCPAddr).Port

server := &server{}
s := grpc.NewServer(grpc.ConnectionTimeout(time.Minute))
tikvpb.RegisterTikvServer(s, &server{})
tikvpb.RegisterTikvServer(s, server)
server.grpcServer = s
go func() {
if err = s.Serve(lis); err != nil {
logutil.BgLogger().Error(
Expand All @@ -63,5 +113,5 @@ func startMockTikvService() (*grpc.Server, int) {
)
}
}()
return s, port
return server, port
}
4 changes: 2 additions & 2 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ type RegionCache struct {

mu struct {
sync.RWMutex // mutex protect cached region
regions map[RegionVerID]*Region // cached regions be organized as regionVerID to region ref mapping
sorted *btree.BTree // cache regions be organized as sorted key to region ref mapping
regions map[RegionVerID]*Region // cached regions are organized as regionVerID to region ref mapping
sorted *btree.BTree // cache regions are organized as sorted key to region ref mapping
}
storeMu struct {
sync.RWMutex
Expand Down
37 changes: 21 additions & 16 deletions store/tikv/tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ type Request struct {
ReplicaReadType kv.ReplicaReadType // different from `kvrpcpb.Context.ReplicaRead`
ReplicaReadSeed *uint32 // pointer to follower read seed in snapshot/coprocessor
StoreTp kv.StoreType
// ForwardedHost is the address of a store which will handle the request. It's different from
// the address the request sent to.
// If it's not empty, the store which receive the request will forward it to
// the forwarded host. It's useful when network partition occurs.
ForwardedHost string
}

// NewRequest returns new kv rpc request.
Expand All @@ -202,6 +207,22 @@ func NewReplicaReadRequest(typ CmdType, pointer interface{}, replicaReadType kv.
return req
}

// EnableStaleRead enables stale read
func (req *Request) EnableStaleRead() {
req.StaleRead = true
req.ReplicaReadType = kv.ReplicaReadMixed
req.ReplicaRead = false
}

// IsDebugReq check whether the req is debug req.
func (req *Request) IsDebugReq() bool {
switch req.Type {
case CmdDebugGetRegionProperties:
return true
}
return false
}

// Get returns GetRequest in request.
func (req *Request) Get() *kvrpcpb.GetRequest {
return req.Req.(*kvrpcpb.GetRequest)
Expand Down Expand Up @@ -397,13 +418,6 @@ func (req *Request) TxnHeartBeat() *kvrpcpb.TxnHeartBeatRequest {
return req.Req.(*kvrpcpb.TxnHeartBeatRequest)
}

// EnableStaleRead enables stale read
func (req *Request) EnableStaleRead() {
req.StaleRead = true
req.ReplicaReadType = kv.ReplicaReadMixed
req.ReplicaRead = false
}

// ToBatchCommandsRequest converts the request to an entry in BatchCommands request.
func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Request {
switch req.Type {
Expand Down Expand Up @@ -463,15 +477,6 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques
return nil
}

// IsDebugReq check whether the req is debug req.
func (req *Request) IsDebugReq() bool {
switch req.Type {
case CmdDebugGetRegionProperties:
return true
}
return false
}

// Response wraps all kv/coprocessor responses.
type Response struct {
Resp interface{}
Expand Down

0 comments on commit 77713d2

Please sign in to comment.