Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

copr: split copr requests by buckets #32818

Merged
merged 14 commits into from
Mar 11, 2022
Merged
6 changes: 3 additions & 3 deletions br/tests/br_key_locked/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,21 @@ type codecPDClient struct {

// GetRegion encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *codecPDClient) GetRegion(ctx context.Context, key []byte) (*pd.Region, error) {
func (c *codecPDClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) {
encodedKey := codec.EncodeBytes(nil, key)
region, err := c.Client.GetRegion(ctx, encodedKey)
return processRegionResult(region, err)
}

func (c *codecPDClient) GetPrevRegion(ctx context.Context, key []byte) (*pd.Region, error) {
func (c *codecPDClient) GetPrevRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) {
encodedKey := codec.EncodeBytes(nil, key)
region, err := c.Client.GetPrevRegion(ctx, encodedKey)
return processRegionResult(region, err)
}

// GetRegionByID encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *codecPDClient) GetRegionByID(ctx context.Context, regionID uint64) (*pd.Region, error) {
func (c *codecPDClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...pd.GetRegionOption) (*pd.Region, error) {
region, err := c.Client.GetRegionByID(ctx, regionID)
return processRegionResult(region, err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/chunk_size_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ func manipulateCluster(cluster testutils.Cluster, splitKeys [][]byte) []uint64 {
if len(splitKeys) == 0 {
return nil
}
region, _ := cluster.GetRegionByKey(splitKeys[0])
region, _, _ := cluster.GetRegionByKey(splitKeys[0])
for _, key := range splitKeys {
if r, _ := cluster.GetRegionByKey(key); r.Id != region.Id {
if r, _, _ := cluster.GetRegionByKey(key); r.Id != region.Id {
panic("all split keys should belong to the same region")
}
}
Expand Down
2 changes: 1 addition & 1 deletion executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestCopClientSend(t *testing.T) {

// Split one region.
key := tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(500))
region, _ := cluster.GetRegionByKey(key)
region, _, _ := cluster.GetRegionByKey(key)
peerID := cluster.AllocID()
cluster.Split(region.GetId(), cluster.AllocID(), key, []uint64{peerID}, peerID)

Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220303073211-00fea37feb66
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/kvproto v0.0.0-20220215045702-d229fcc888c8
github.com/pingcap/kvproto v0.0.0-20220304032058-ccd676426a27
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4
github.com/pingcap/tidb-tools v6.0.0-alpha.0.20220309081549-563c2a342f9c+incompatible
Expand All @@ -69,8 +69,8 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.1-0.20220302130440-46d649305eee
github.com/tikv/pd/client v0.0.0-20220216080339-1b8f82378ee7
github.com/tikv/client-go/v2 v2.0.1-0.20220309055239-7e34d88af3b5
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710
github.com/twmb/murmur3 v1.1.3
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
Expand Down
15 changes: 7 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -571,9 +571,9 @@ github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059/go.mod h1:fMRU1BA1y+r89
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20220106070556-3fa8fa04f898/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220215045702-d229fcc888c8 h1:8VizThgCQJTVwCrfp5JOsiUdpvB8F9nsUcLrnHqjpNY=
github.com/pingcap/kvproto v0.0.0-20220215045702-d229fcc888c8/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220304032058-ccd676426a27 h1:+Ax2NXyAFIITrzgSPWBo3SBZtX/D60VeELCG0B0hqiY=
github.com/pingcap/kvproto v0.0.0-20220304032058-ccd676426a27/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down Expand Up @@ -684,11 +684,10 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.1-0.20220302130440-46d649305eee h1:+cH2vCSzrDT0yWTZG40g3htNP5u+9pcb2qD/1OMcWnI=
github.com/tikv/client-go/v2 v2.0.1-0.20220302130440-46d649305eee/go.mod h1:gaHSp8rnxZ0w36qb6QPPNPh9P0Mu5vAEwCQcc0Brni4=
github.com/tikv/pd/client v0.0.0-20220216070739-26c668271201/go.mod h1:fEvI5fhAuJn1Fn87VJF8ByE9Vc16EzWGoePZB21/nL8=
github.com/tikv/pd/client v0.0.0-20220216080339-1b8f82378ee7 h1:RRiYmyzHgTgV0mrp6Ue6cGn0EAB7U1YHnEfVWEGCKk8=
github.com/tikv/pd/client v0.0.0-20220216080339-1b8f82378ee7/go.mod h1:fEvI5fhAuJn1Fn87VJF8ByE9Vc16EzWGoePZB21/nL8=
github.com/tikv/client-go/v2 v2.0.1-0.20220309055239-7e34d88af3b5 h1:IQOLWIl4VN9Li7EzeyCUGepca9cRU+K02eN3yBEuSIs=
github.com/tikv/client-go/v2 v2.0.1-0.20220309055239-7e34d88af3b5/go.mod h1:0scaG+seu7L56apm+Gjz9vckyO7ABIzM6T7n00mrIXs=
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710 h1:jxgmKOscXSjaFEKQGRyY5qOpK8hLqxs2irb/uDJMtwk=
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710/go.mod h1:AtvppPwkiyUgQlR1W9qSqfTB+OsOIu19jDCOxOsPkmU=
github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2biQ=
Expand Down
4 changes: 2 additions & 2 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,10 @@ func (s *testPessimisticSuite) TestSingleStatementRollback(c *C) {
tableStart := tablecodec.GenTableRecordPrefix(tblID)
s.cluster.SplitKeys(tableStart, tableStart.PrefixNext(), 2)
region1Key := codec.EncodeBytes(nil, tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(1)))
region1, _ := s.cluster.GetRegionByKey(region1Key)
region1, _, _ := s.cluster.GetRegionByKey(region1Key)
region1ID := region1.Id
region2Key := codec.EncodeBytes(nil, tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(3)))
region2, _ := s.cluster.GetRegionByKey(region2Key)
region2, _, _ := s.cluster.GetRegionByKey(region2Key)
region2ID := region2.Id

syncCh := make(chan bool)
Expand Down
14 changes: 9 additions & 5 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa

// copTask contains a related Region and KeyRange for a kv.Request.
type copTask struct {
region tikv.RegionVerID
ranges *KeyRanges
region tikv.RegionVerID
bucketsVer uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will it be used for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forget it... Now it's used for reducing UpdateBucketsIfNeeded 0b24a84#diff-23eb1043649369d5956b123b173b799ded8c6b7cbf4519bb32cb3caa01928d75R935.

ranges *KeyRanges

respChan chan *copResponse
storeAddr string
Expand Down Expand Up @@ -180,7 +181,8 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv

rangesLen := ranges.Len()

locs, err := cache.SplitKeyRangesByLocations(bo, ranges)
// TODO(youjiali1995): is there any request type that needn't be splitted by buckets?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the split affect some internal work such as br cheksum, data analyze or statistic update?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they should be splitted by buckets too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does TiFlash ignore buckets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. TiFlash has implemented server-side streaming by BatchCoprocessor which splits ranges by segments.

locs, err := cache.SplitKeyRangesByBuckets(bo, ranges)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -208,6 +210,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
}
tasks = append(tasks, &copTask{
region: loc.Location.Region,
bucketsVer: loc.getBucketVersion(),
ranges: loc.Ranges.Slice(i, nextI),
respChan: make(chan *copResponse, chanSize),
cmdType: cmdType,
Expand Down Expand Up @@ -319,7 +322,6 @@ type copIteratorWorker struct {

replicaReadSeed uint32

actionOnExceed *rateLimitAction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it confirmed to be never used in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, but it's not used now.

enableCollectExecutionInfo bool
}

Expand Down Expand Up @@ -441,7 +443,6 @@ func (it *copIterator) open(ctx context.Context, enabledRateLimitAction, enableC
kvclient: txnsnapshot.NewClientHelper(it.store.store, &it.resolvedLocks, &it.committedLocks, false),
memTracker: it.memTracker,
replicaReadSeed: it.replicaReadSeed,
actionOnExceed: it.actionOnExceed,
enableCollectExecutionInfo: enableCollectExecutionInfo,
}
go worker.run(ctx)
Expand Down Expand Up @@ -931,6 +932,9 @@ func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *ti
// if we're handling streaming coprocessor response, lastRange is the range of last
// successful response, otherwise it's nil.
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, cacheKey []byte, cacheValue *coprCacheValue, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange, costTime time.Duration) ([]*copTask, error) {
if ver := resp.pbResp.GetLatestBucketsVersion(); task.bucketsVer < ver {
worker.store.GetRegionCache().UpdateBucketsIfNeeded(task.region, ver)
}
if regionErr := resp.pbResp.GetRegionError(); regionErr != nil {
if rpcCtx != nil && task.storeType == kv.TiDB {
resp.err = errors.Errorf("error: %v", regionErr)
Expand Down
Loading