-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
copr: split copr requests by buckets #32818
Changes from all commits
e8e898a
ced3b50
340d8af
374f3a2
1ec3f08
9cc24d0
67db774
861bb76
108221b
dc0981c
46e273a
c4e2a99
923f13a
4f86e6c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -146,8 +146,9 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa | |
|
||
// copTask contains a related Region and KeyRange for a kv.Request. | ||
type copTask struct { | ||
region tikv.RegionVerID | ||
ranges *KeyRanges | ||
region tikv.RegionVerID | ||
bucketsVer uint64 | ||
ranges *KeyRanges | ||
|
||
respChan chan *copResponse | ||
storeAddr string | ||
|
@@ -180,7 +181,8 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv | |
|
||
rangesLen := ranges.Len() | ||
|
||
locs, err := cache.SplitKeyRangesByLocations(bo, ranges) | ||
// TODO(youjiali1995): is there any request type that needn't be splitted by buckets? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will the split affect some internal work such as br cheksum, data analyze or statistic update? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, they should be splitted by buckets too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does TiFlash ignore buckets? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. TiFlash has implemented server-side streaming by |
||
locs, err := cache.SplitKeyRangesByBuckets(bo, ranges) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
|
@@ -208,6 +210,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv | |
} | ||
tasks = append(tasks, &copTask{ | ||
region: loc.Location.Region, | ||
bucketsVer: loc.getBucketVersion(), | ||
ranges: loc.Ranges.Slice(i, nextI), | ||
respChan: make(chan *copResponse, chanSize), | ||
cmdType: cmdType, | ||
|
@@ -319,7 +322,6 @@ type copIteratorWorker struct { | |
|
||
replicaReadSeed uint32 | ||
|
||
actionOnExceed *rateLimitAction | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it confirmed to be never used in the future? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know, but it's not used now. |
||
enableCollectExecutionInfo bool | ||
} | ||
|
||
|
@@ -441,7 +443,6 @@ func (it *copIterator) open(ctx context.Context, enabledRateLimitAction, enableC | |
kvclient: txnsnapshot.NewClientHelper(it.store.store, &it.resolvedLocks, &it.committedLocks, false), | ||
memTracker: it.memTracker, | ||
replicaReadSeed: it.replicaReadSeed, | ||
actionOnExceed: it.actionOnExceed, | ||
enableCollectExecutionInfo: enableCollectExecutionInfo, | ||
} | ||
go worker.run(ctx) | ||
|
@@ -931,6 +932,9 @@ func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *ti | |
// if we're handling streaming coprocessor response, lastRange is the range of last | ||
// successful response, otherwise it's nil. | ||
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, cacheKey []byte, cacheValue *coprCacheValue, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange, costTime time.Duration) ([]*copTask, error) { | ||
if ver := resp.pbResp.GetLatestBucketsVersion(); task.bucketsVer < ver { | ||
worker.store.GetRegionCache().UpdateBucketsIfNeeded(task.region, ver) | ||
} | ||
if regionErr := resp.pbResp.GetRegionError(); regionErr != nil { | ||
if rpcCtx != nil && task.storeType == kv.TiDB { | ||
resp.err = errors.Errorf("error: %v", regionErr) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will it be used for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forget it... Now it's used for reducing UpdateBucketsIfNeeded 0b24a84#diff-23eb1043649369d5956b123b173b799ded8c6b7cbf4519bb32cb3caa01928d75R935.