Skip to content

Commit

Permalink
store/tikv: set different timeout for tikv requests. (pingcap#1718)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Sep 13, 2016
1 parent 4c9f291 commit 3899c95
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 49 deletions.
27 changes: 17 additions & 10 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tikv

import (
"sync/atomic"
"time"

"github.com/juju/errors"
"github.com/ngaut/log"
Expand All @@ -31,14 +32,18 @@ type Client interface {
// Close should release all data.
Close() error
// SendKVReq sends kv request.
SendKVReq(addr string, req *kvrpcpb.Request) (*kvrpcpb.Response, error)
SendKVReq(addr string, req *kvrpcpb.Request, timeout time.Duration) (*kvrpcpb.Response, error)
// SendCopReq sends coprocessor request.
SendCopReq(addr string, req *coprocessor.Request) (*coprocessor.Response, error)
SendCopReq(addr string, req *coprocessor.Request, timeout time.Duration) (*coprocessor.Response, error)
}

const (
maxConnecion = 150
netTimeout = 20 // seconds
maxConnecion = 150
dialTimeout = 5 * time.Second
writeTimeout = 10 * time.Second
readTimeoutShort = 20 * time.Second // For requests that read/write several key-values.
readTimeoutMedium = 60 * time.Second // For requests that may need scan region.
readTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times.
)

type rpcClient struct {
Expand All @@ -50,13 +55,13 @@ func newRPCClient() *rpcClient {
return &rpcClient{
msgID: 0,
p: NewPools(maxConnecion, func(addr string) (*Conn, error) {
return NewConnection(addr, netTimeout)
return NewConnection(addr, dialTimeout)
}),
}
}

// SendCopReq sends a Request to co-processor and receives Response.
func (c *rpcClient) SendCopReq(addr string, req *coprocessor.Request) (*coprocessor.Response, error) {
func (c *rpcClient) SendCopReq(addr string, req *coprocessor.Request, timeout time.Duration) (*coprocessor.Response, error) {
conn, err := c.p.GetConn(addr)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -66,7 +71,7 @@ func (c *rpcClient) SendCopReq(addr string, req *coprocessor.Request) (*coproces
MsgType: msgpb.MessageType_CopReq,
CopReq: req,
}
err = c.doSend(conn, &msg)
err = c.doSend(conn, &msg, writeTimeout, timeout)
if err != nil {
conn.Close()
return nil, errors.Trace(err)
Expand All @@ -79,7 +84,7 @@ func (c *rpcClient) SendCopReq(addr string, req *coprocessor.Request) (*coproces
}

// SendKVReq sends a Request to kv server and receives Response.
func (c *rpcClient) SendKVReq(addr string, req *kvrpcpb.Request) (*kvrpcpb.Response, error) {
func (c *rpcClient) SendKVReq(addr string, req *kvrpcpb.Request, timeout time.Duration) (*kvrpcpb.Response, error) {
conn, err := c.p.GetConn(addr)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -89,7 +94,7 @@ func (c *rpcClient) SendKVReq(addr string, req *kvrpcpb.Request) (*kvrpcpb.Respo
MsgType: msgpb.MessageType_KvReq,
KvReq: req,
}
err = c.doSend(conn, &msg)
err = c.doSend(conn, &msg, writeTimeout, timeout)
if err != nil {
conn.Close()
return nil, errors.Trace(err)
Expand All @@ -101,15 +106,17 @@ func (c *rpcClient) SendKVReq(addr string, req *kvrpcpb.Request) (*kvrpcpb.Respo
return msg.GetKvResp(), nil
}

func (c *rpcClient) doSend(conn *Conn, msg *msgpb.Message) error {
func (c *rpcClient) doSend(conn *Conn, msg *msgpb.Message, writeTimeout time.Duration, readTimeout time.Duration) error {
curMsgID := atomic.AddUint64(&c.msgID, 1)
log.Debugf("Send request msgID[%d] type[%v]", curMsgID, msg.GetMsgType())
conn.SetWriteDeadline(time.Now().Add(writeTimeout))
if err := util.WriteMessage(conn, curMsgID, msg); err != nil {
return errors.Trace(err)
}
if err := conn.Flush(); err != nil {
return errors.Trace(err)
}
conn.SetReadDeadline(time.Now().Add(readTimeout))
msgID, err := util.ReadMessage(conn.BufioReader(), msg)
if err != nil {
return errors.Trace(err)
Expand Down
10 changes: 5 additions & 5 deletions store/tikv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *testClientSuite) TestSendBySelf(c *C) {
ver := uint64(0)
getReq.Version = ver
req.CmdGetReq = getReq
resp, err := cli.SendKVReq(":61234", req)
resp, err := cli.SendKVReq(":61234", req, readTimeoutShort)
c.Assert(err, IsNil)
c.Assert(req.GetType(), Equals, resp.GetType())
}
Expand All @@ -84,7 +84,7 @@ func (s *testClientSuite) TestRetryClose(c *C) {
defer l.Close()
cli := newRPCClient()
req := new(pb.Request)
resp, err := cli.SendKVReq(":61235", req)
resp, err := cli.SendKVReq(":61235", req, readTimeoutShort)
c.Assert(err, NotNil)
c.Assert(resp, IsNil)
}
Expand All @@ -106,7 +106,7 @@ func (s *testClientSuite) TestRetryReadThenClose(c *C) {
cli := newRPCClient()
req := new(pb.Request)
req.Type = pb.MessageType_CmdGet
resp, err := cli.SendKVReq(":61236", req)
resp, err := cli.SendKVReq(":61236", req, readTimeoutShort)
c.Assert(err, NotNil)
c.Assert(resp, IsNil)
}
Expand Down Expand Up @@ -137,9 +137,9 @@ func (s *testClientSuite) TestWrongMessageID(c *C) {
Type: pb.MessageType_CmdGet,
}
// Wrong ID for the first request, correct for the rests.
_, err := cli.SendKVReq(":61237", req)
_, err := cli.SendKVReq(":61237", req, readTimeoutShort)
c.Assert(err, NotNil)
resp, err := cli.SendKVReq(":61237", req)
resp, err := cli.SendKVReq(":61237", req, readTimeoutShort)
c.Assert(err, IsNil)
c.Assert(resp.GetType(), Equals, req.GetType())
}
Expand Down
45 changes: 26 additions & 19 deletions store/tikv/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,37 @@ import (
"time"

"github.com/juju/errors"
"github.com/ngaut/deadline"
)

const defaultBufSize = 4 * 1024

// Conn is a simple wrapper of net.Conn.
type Conn struct {
addr string
nc net.Conn
closed bool
r *bufio.Reader
w *bufio.Writer
netTimeout int //second
addr string
nc net.Conn
closed bool
r *bufio.Reader
w *bufio.Writer
}

// NewConnection creates a Conn with network timeout..
func NewConnection(addr string, netTimeout int) (*Conn, error) {
return NewConnectionWithSize(addr, netTimeout, defaultBufSize, defaultBufSize)
// NewConnection creates a Conn with dial timeout.
func NewConnection(addr string, dialTimeout time.Duration) (*Conn, error) {
return NewConnectionWithSize(addr, dialTimeout, defaultBufSize, defaultBufSize)
}

// NewConnectionWithSize creates a Conn with network timeout and read/write buffer size.
func NewConnectionWithSize(addr string, netTimeout int, readSize int, writeSize int) (*Conn, error) {
conn, err := net.DialTimeout("tcp", addr, time.Duration(netTimeout)*time.Second)
// NewConnectionWithSize creates a Conn with dial timeout and read/write buffer size.
func NewConnectionWithSize(addr string, dialTimeout time.Duration, readSize int, writeSize int) (*Conn, error) {
conn, err := net.DialTimeout("tcp", addr, dialTimeout)
if err != nil {
return nil, errors.Trace(err)
}

return &Conn{
addr: addr,
nc: conn,
closed: false,
r: bufio.NewReaderSize(deadline.NewDeadlineReader(conn, time.Duration(netTimeout)*time.Second), readSize),
w: bufio.NewWriterSize(deadline.NewDeadlineWriter(conn, time.Duration(netTimeout)*time.Second), writeSize),
netTimeout: netTimeout,
addr: addr,
nc: conn,
closed: false,
r: bufio.NewReaderSize(conn, readSize),
w: bufio.NewWriterSize(conn, writeSize),
}, nil
}

Expand All @@ -74,6 +71,16 @@ func (c *Conn) BufioReader() *bufio.Reader {
return c.r
}

// SetReadDeadline sets the deadline for future Read calls.
func (c *Conn) SetReadDeadline(t time.Time) error {
return c.nc.SetReadDeadline(t)
}

// SetWriteDeadline sets the deadline for future Write calls.
func (c *Conn) SetWriteDeadline(t time.Time) error {
return c.nc.SetWriteDeadline(t)
}

// Close closes the net.Conn.
func (c *Conn) Close() {
if c.closed {
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (it *copIterator) handleTask(bo *Backoffer, task *copTask) (*coprocessor.Re
Data: it.req.Data,
Ranges: task.ranges.toPBRanges(),
}
resp, err := it.store.client.SendCopReq(task.region.GetAddress(), req)
resp, err := it.store.client.SendCopReq(task.region.GetAddress(), req, readTimeoutMedium)
if err != nil {
it.store.regionCache.NextPeer(task.region.VerID())
err = bo.Backoff(boTiKVRPC, err)
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (w *GCWorker) resolveLocks(safePoint uint64) error {
if err != nil {
return errors.Trace(err)
}
resp, err := w.store.SendKVReq(bo, req, region.VerID())
resp, err := w.store.SendKVReq(bo, req, region.VerID(), readTimeoutMedium)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -214,7 +214,7 @@ func (w *GCWorker) doGC(safePoint uint64) error {
if err != nil {
return errors.Trace(err)
}
resp, err := w.store.SendKVReq(bo, req, region.VerID())
resp, err := w.store.SendKVReq(bo, req, region.VerID(), readTimeoutLong)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (s *tikvStore) getTimestampWithRetry(bo *Backoffer) (uint64, error) {
// sendKVReq sends req to tikv server. It will retry internally to find the right
// region leader if i) fails to establish a connection to server or ii) server
// returns `NotLeader`.
func (s *tikvStore) SendKVReq(bo *Backoffer, req *pb.Request, regionID RegionVerID) (*pb.Response, error) {
func (s *tikvStore) SendKVReq(bo *Backoffer, req *pb.Request, regionID RegionVerID, timeout time.Duration) (*pb.Response, error) {
for {
region := s.regionCache.GetRegionByVerID(regionID)
if region == nil {
Expand All @@ -186,7 +186,7 @@ func (s *tikvStore) SendKVReq(bo *Backoffer, req *pb.Request, regionID RegionVer
}, nil
}
req.Context = region.GetContext()
resp, err := s.client.SendKVReq(region.GetAddress(), req)
resp, err := s.client.SendKVReq(region.GetAddress(), req, timeout)
if err != nil {
s.regionCache.NextPeer(region.VerID())
err = bo.Backoff(boTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %s, try next peer later", err, req.Context))
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
if err != nil {
return status, errors.Trace(err)
}
resp, err := lr.store.SendKVReq(bo, req, region.VerID())
resp, err := lr.store.SendKVReq(bo, req, region.VerID(), readTimeoutShort)
if err != nil {
return status, errors.Trace(err)
}
Expand Down Expand Up @@ -193,7 +193,7 @@ func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status txnStatus, cl
if status.isCommitted() {
req.GetCmdResolveLockReq().CommitVersion = status.commitTS()
}
resp, err := lr.store.SendKVReq(bo, req, region.VerID())
resp, err := lr.store.SendKVReq(bo, req, region.VerID(), readTimeoutShort)
if err != nil {
return errors.Trace(err)
}
Expand Down
6 changes: 4 additions & 2 deletions store/tikv/mock-tikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package mocktikv

import (
"time"

"github.com/golang/protobuf/proto"
"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
Expand Down Expand Up @@ -322,7 +324,7 @@ type RPCClient struct {
}

// SendKVReq sends a kv request to mock cluster.
func (c *RPCClient) SendKVReq(addr string, req *kvrpcpb.Request) (*kvrpcpb.Response, error) {
func (c *RPCClient) SendKVReq(addr string, req *kvrpcpb.Request, timeout time.Duration) (*kvrpcpb.Response, error) {
store := c.cluster.GetStoreByAddr(addr)
if store == nil {
return nil, errors.New("connect fail")
Expand All @@ -332,7 +334,7 @@ func (c *RPCClient) SendKVReq(addr string, req *kvrpcpb.Request) (*kvrpcpb.Respo
}

// SendCopReq sends a coprocessor request to mock cluster.
func (c *RPCClient) SendCopReq(addr string, req *coprocessor.Request) (*coprocessor.Response, error) {
func (c *RPCClient) SendCopReq(addr string, req *coprocessor.Request, timeout time.Duration) (*coprocessor.Response, error) {
store := c.cluster.GetStoreByAddr(addr)
if store == nil {
return nil, errors.New("connect fail")
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (s *Scanner) getData(bo *Backoffer) error {
Version: s.startTS(),
},
}
resp, err := s.snapshot.store.SendKVReq(bo, req, region.VerID())
resp, err := s.snapshot.store.SendKVReq(bo, req, region.VerID(), readTimeoutMedium)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
Version: s.version.Ver,
},
}
resp, err := s.store.SendKVReq(bo, req, batch.region)
resp, err := s.store.SendKVReq(bo, req, batch.region, readTimeoutMedium)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -188,7 +188,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
if err != nil {
return nil, errors.Trace(err)
}
resp, err := s.store.SendKVReq(bo, req, region.VerID())
resp, err := s.store.SendKVReq(bo, req, region.VerID(), readTimeoutShort)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions store/tikv/txn_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *txnCommitter) prewriteSingleRegion(bo *Backoffer, batch batchKeys) erro
}

for {
resp, err := c.store.SendKVReq(bo, req, batch.region)
resp, err := c.store.SendKVReq(bo, req, batch.region, readTimeoutShort)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -238,7 +238,7 @@ func (c *txnCommitter) commitSingleRegion(bo *Backoffer, batch batchKeys) error
},
}

resp, err := c.store.SendKVReq(bo, req, batch.region)
resp, err := c.store.SendKVReq(bo, req, batch.region, readTimeoutShort)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -286,7 +286,7 @@ func (c *txnCommitter) cleanupSingleRegion(bo *Backoffer, batch batchKeys) error
StartVersion: c.startTS,
},
}
resp, err := c.store.SendKVReq(bo, req, batch.region)
resp, err := c.store.SendKVReq(bo, req, batch.region, readTimeoutShort)
if err != nil {
return errors.Trace(err)
}
Expand Down

0 comments on commit 3899c95

Please sign in to comment.