Skip to content

Commit

Permalink
copr: split copr requests by buckets (#32818)
Browse files Browse the repository at this point in the history
close #32968
  • Loading branch information
youjiali1995 authored Mar 11, 2022
1 parent 3226aab commit 328a8bc
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 77 deletions.
6 changes: 3 additions & 3 deletions br/tests/br_key_locked/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,21 @@ type codecPDClient struct {

// GetRegion encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *codecPDClient) GetRegion(ctx context.Context, key []byte) (*pd.Region, error) {
func (c *codecPDClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) {
encodedKey := codec.EncodeBytes(nil, key)
region, err := c.Client.GetRegion(ctx, encodedKey)
return processRegionResult(region, err)
}

func (c *codecPDClient) GetPrevRegion(ctx context.Context, key []byte) (*pd.Region, error) {
func (c *codecPDClient) GetPrevRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) {
encodedKey := codec.EncodeBytes(nil, key)
region, err := c.Client.GetPrevRegion(ctx, encodedKey)
return processRegionResult(region, err)
}

// GetRegionByID encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *codecPDClient) GetRegionByID(ctx context.Context, regionID uint64) (*pd.Region, error) {
func (c *codecPDClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...pd.GetRegionOption) (*pd.Region, error) {
region, err := c.Client.GetRegionByID(ctx, regionID)
return processRegionResult(region, err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/chunk_size_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ func manipulateCluster(cluster testutils.Cluster, splitKeys [][]byte) []uint64 {
if len(splitKeys) == 0 {
return nil
}
region, _ := cluster.GetRegionByKey(splitKeys[0])
region, _, _ := cluster.GetRegionByKey(splitKeys[0])
for _, key := range splitKeys {
if r, _ := cluster.GetRegionByKey(key); r.Id != region.Id {
if r, _, _ := cluster.GetRegionByKey(key); r.Id != region.Id {
panic("all split keys should belong to the same region")
}
}
Expand Down
2 changes: 1 addition & 1 deletion executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestCopClientSend(t *testing.T) {

// Split one region.
key := tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(500))
region, _ := cluster.GetRegionByKey(key)
region, _, _ := cluster.GetRegionByKey(key)
peerID := cluster.AllocID()
cluster.Split(region.GetId(), cluster.AllocID(), key, []uint64{peerID}, peerID)

Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220303073211-00fea37feb66
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/kvproto v0.0.0-20220215045702-d229fcc888c8
github.com/pingcap/kvproto v0.0.0-20220304032058-ccd676426a27
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4
github.com/pingcap/tidb-tools v6.0.0-alpha.0.20220309081549-563c2a342f9c+incompatible
Expand All @@ -69,8 +69,8 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.1-0.20220302130440-46d649305eee
github.com/tikv/pd/client v0.0.0-20220216080339-1b8f82378ee7
github.com/tikv/client-go/v2 v2.0.1-0.20220309055239-7e34d88af3b5
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710
github.com/twmb/murmur3 v1.1.3
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
Expand Down
15 changes: 7 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -571,9 +571,9 @@ github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059/go.mod h1:fMRU1BA1y+r89
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20220106070556-3fa8fa04f898/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220215045702-d229fcc888c8 h1:8VizThgCQJTVwCrfp5JOsiUdpvB8F9nsUcLrnHqjpNY=
github.com/pingcap/kvproto v0.0.0-20220215045702-d229fcc888c8/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220304032058-ccd676426a27 h1:+Ax2NXyAFIITrzgSPWBo3SBZtX/D60VeELCG0B0hqiY=
github.com/pingcap/kvproto v0.0.0-20220304032058-ccd676426a27/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down Expand Up @@ -684,11 +684,10 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.1-0.20220302130440-46d649305eee h1:+cH2vCSzrDT0yWTZG40g3htNP5u+9pcb2qD/1OMcWnI=
github.com/tikv/client-go/v2 v2.0.1-0.20220302130440-46d649305eee/go.mod h1:gaHSp8rnxZ0w36qb6QPPNPh9P0Mu5vAEwCQcc0Brni4=
github.com/tikv/pd/client v0.0.0-20220216070739-26c668271201/go.mod h1:fEvI5fhAuJn1Fn87VJF8ByE9Vc16EzWGoePZB21/nL8=
github.com/tikv/pd/client v0.0.0-20220216080339-1b8f82378ee7 h1:RRiYmyzHgTgV0mrp6Ue6cGn0EAB7U1YHnEfVWEGCKk8=
github.com/tikv/pd/client v0.0.0-20220216080339-1b8f82378ee7/go.mod h1:fEvI5fhAuJn1Fn87VJF8ByE9Vc16EzWGoePZB21/nL8=
github.com/tikv/client-go/v2 v2.0.1-0.20220309055239-7e34d88af3b5 h1:IQOLWIl4VN9Li7EzeyCUGepca9cRU+K02eN3yBEuSIs=
github.com/tikv/client-go/v2 v2.0.1-0.20220309055239-7e34d88af3b5/go.mod h1:0scaG+seu7L56apm+Gjz9vckyO7ABIzM6T7n00mrIXs=
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710 h1:jxgmKOscXSjaFEKQGRyY5qOpK8hLqxs2irb/uDJMtwk=
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710/go.mod h1:AtvppPwkiyUgQlR1W9qSqfTB+OsOIu19jDCOxOsPkmU=
github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2biQ=
Expand Down
4 changes: 2 additions & 2 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,10 @@ func (s *testPessimisticSuite) TestSingleStatementRollback(c *C) {
tableStart := tablecodec.GenTableRecordPrefix(tblID)
s.cluster.SplitKeys(tableStart, tableStart.PrefixNext(), 2)
region1Key := codec.EncodeBytes(nil, tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(1)))
region1, _ := s.cluster.GetRegionByKey(region1Key)
region1, _, _ := s.cluster.GetRegionByKey(region1Key)
region1ID := region1.Id
region2Key := codec.EncodeBytes(nil, tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(3)))
region2, _ := s.cluster.GetRegionByKey(region2Key)
region2, _, _ := s.cluster.GetRegionByKey(region2Key)
region2ID := region2.Id

syncCh := make(chan bool)
Expand Down
14 changes: 9 additions & 5 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa

// copTask contains a related Region and KeyRange for a kv.Request.
type copTask struct {
region tikv.RegionVerID
ranges *KeyRanges
region tikv.RegionVerID
bucketsVer uint64
ranges *KeyRanges

respChan chan *copResponse
storeAddr string
Expand Down Expand Up @@ -180,7 +181,8 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv

rangesLen := ranges.Len()

locs, err := cache.SplitKeyRangesByLocations(bo, ranges)
// TODO(youjiali1995): is there any request type that needn't be splitted by buckets?
locs, err := cache.SplitKeyRangesByBuckets(bo, ranges)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -208,6 +210,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
}
tasks = append(tasks, &copTask{
region: loc.Location.Region,
bucketsVer: loc.getBucketVersion(),
ranges: loc.Ranges.Slice(i, nextI),
respChan: make(chan *copResponse, chanSize),
cmdType: cmdType,
Expand Down Expand Up @@ -319,7 +322,6 @@ type copIteratorWorker struct {

replicaReadSeed uint32

actionOnExceed *rateLimitAction
enableCollectExecutionInfo bool
}

Expand Down Expand Up @@ -441,7 +443,6 @@ func (it *copIterator) open(ctx context.Context, enabledRateLimitAction, enableC
kvclient: txnsnapshot.NewClientHelper(it.store.store, &it.resolvedLocks, &it.committedLocks, false),
memTracker: it.memTracker,
replicaReadSeed: it.replicaReadSeed,
actionOnExceed: it.actionOnExceed,
enableCollectExecutionInfo: enableCollectExecutionInfo,
}
go worker.run(ctx)
Expand Down Expand Up @@ -931,6 +932,9 @@ func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *ti
// if we're handling streaming coprocessor response, lastRange is the range of last
// successful response, otherwise it's nil.
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, cacheKey []byte, cacheValue *coprCacheValue, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange, costTime time.Duration) ([]*copTask, error) {
if ver := resp.pbResp.GetLatestBucketsVersion(); task.bucketsVer < ver {
worker.store.GetRegionCache().UpdateBucketsIfNeeded(task.region, ver)
}
if regionErr := resp.pbResp.GetRegionError(); regionErr != nil {
if rpcCtx != nil && task.storeType == kv.TiDB {
resp.err = errors.Errorf("error: %v", regionErr)
Expand Down
Loading

0 comments on commit 328a8bc

Please sign in to comment.