Skip to content

Commit

Permalink
store/tikv: remove canceled requests before sending
Browse files Browse the repository at this point in the history
Cherry-pick #10634
  • Loading branch information
coocood authored Jun 3, 2019
1 parent 5310ff5 commit 5927716
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 4 deletions.
8 changes: 4 additions & 4 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ func NewBackoffFn(base, cap, jitter int) func(ctx context.Context, maxSleepMs in
}
select {
case <-time.After(time.Duration(realSleep) * time.Millisecond):
attempts++
lastSleep = sleep
return lastSleep
case <-ctx.Done():
return 0
}

attempts++
lastSleep = sleep
return lastSleep
}
}

Expand Down
25 changes: 25 additions & 0 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ type batchCommandsEntry struct {
err error
}

func (b *batchCommandsEntry) isCanceled() bool {
return atomic.LoadInt32(&b.canceled) == 1
}

// fetchAllPendingRequests fetches all pending requests from the channel.
func fetchAllPendingRequests(
ch chan *batchCommandsEntry,
Expand Down Expand Up @@ -454,6 +458,10 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) {
bestBatchWaitSize += 1
}

length = removeCanceledRequests(&entries, &requests)
if length == 0 {
continue // All requests are canceled.
}
maxBatchID := atomic.AddUint64(&batchCommandsClient.idAlloc, uint64(length))
for i := 0; i < length; i++ {
requestID := uint64(i) + maxBatchID - uint64(length)
Expand Down Expand Up @@ -484,6 +492,23 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) {
}
}

// removeCanceledRequests removes canceled requests before sending.
func removeCanceledRequests(
entries *[]*batchCommandsEntry,
requests *[]*tikvpb.BatchCommandsRequest_Request) int {
validEntries := (*entries)[:0]
validRequets := (*requests)[:0]
for _, e := range *entries {
if !e.isCanceled() {
validEntries = append(validEntries, e)
validRequets = append(validRequets, e.req)
}
}
*entries = validEntries
*requests = validRequets
return len(*entries)
}

// rpcClient is RPC client struct.
// TODO: Add flow control between RPC clients in TiDB ond RPC servers in TiKV.
// Since we use shared client connection to communicate to the same TiKV, it's possible
Expand Down
25 changes: 25 additions & 0 deletions store/tikv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pingcap/tidb/config"
)

Expand Down Expand Up @@ -50,3 +51,27 @@ func (s *testClientSuite) TestConn(c *C) {
c.Assert(err, NotNil)
c.Assert(conn3, IsNil)
}

func (s *testClientSuite) TestRemoveCanceledRequests(c *C) {
req := new(tikvpb.BatchCommandsRequest_Request)
entries := []*batchCommandsEntry{
{canceled: 1, req: req},
{canceled: 0, req: req},
{canceled: 1, req: req},
{canceled: 1, req: req},
{canceled: 0, req: req},
}
entryPtr := &entries[0]
requests := make([]*tikvpb.BatchCommandsRequest_Request, len(entries))
for i := range entries {
requests[i] = entries[i].req
}
length := removeCanceledRequests(&entries, &requests)
c.Assert(length, Equals, 2)
for _, e := range entries {
c.Assert(e.isCanceled(), IsFalse)
}
c.Assert(len(requests), Equals, 2)
newEntryPtr := &entries[0]
c.Assert(entryPtr, Equals, newEntryPtr)
}

0 comments on commit 5927716

Please sign in to comment.