Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support coprocessor cache for paging protocol #35787

Merged
merged 9 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions executor/copr_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ func TestIntegrationCopCache(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("create table t (a int primary key)")

// TODO(tiancaiamao) update the test and support cop cache for paging.
tk.MustExec("set @@tidb_enable_paging = off")

tblInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tid := tblInfo.Meta().ID
Expand Down
3 changes: 0 additions & 3 deletions planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,9 +822,6 @@ func TestLimitIndexEstimation(t *testing.T) {
SQL string
Plan []string
}
// When paging is used, there is a 'paging:true' makes the explain output differ.
// IndexLookUp 0.00 root paging:true
tk.MustExec("set @@tidb_enable_paging = off")

analyzeSuiteData := core.GetAnalyzeSuiteData()
analyzeSuiteData.GetTestCases(t, &input, &output)
Expand Down
17 changes: 0 additions & 17 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,6 @@ func TestPushLimitDownIndexLookUpReader(t *testing.T) {
tk.MustExec("insert into tbl values(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5)")
tk.MustExec("analyze table tbl")

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
SQL string
Expand Down Expand Up @@ -3685,9 +3682,6 @@ func TestExtendedStatsSwitch(t *testing.T) {
"1.000000 1",
))

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

// Estimated index scan count is 4 using extended stats.
tk.MustQuery("explain format = 'brief' select * from t use index(b) where a > 3 order by b limit 1").Check(testkit.Rows(
"Limit 1.00 root offset:0, count:1",
Expand Down Expand Up @@ -4557,9 +4551,6 @@ func TestLimitIndexLookUpKeepOrder(t *testing.T) {
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(a int, b int, c int, d int, index idx(a,b,c));")

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
SQL string
Expand Down Expand Up @@ -4786,9 +4777,6 @@ func TestMultiColMaxOneRow(t *testing.T) {
tk.MustExec("create table t1(a int)")
tk.MustExec("create table t2(a int, b int, c int, primary key(a,b))")

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
SQL string
Expand Down Expand Up @@ -5561,8 +5549,6 @@ func TestPreferRangeScanForUnsignedIntHandle(t *testing.T) {

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")
// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
Expand Down Expand Up @@ -5602,9 +5588,6 @@ func TestIssue27083(t *testing.T) {
require.Nil(t, do.StatsHandle().DumpStatsDeltaToKV(handle.DumpAll))
tk.MustExec("analyze table t")

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
SQL string
Expand Down
4 changes: 0 additions & 4 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,10 +538,6 @@ func TestEliminateMaxOneRow(t *testing.T) {
tk.MustExec("create table t2(a int(11) DEFAULT NULL, b int(11) DEFAULT NULL)")
tk.MustExec("create table t3(a int(11) DEFAULT NULL, b int(11) DEFAULT NULL, c int(11) DEFAULT NULL, UNIQUE KEY idx_abc (a, b, c))")

// When paging is used, there is a 'paging:true' makes the explain output differ.
// IndexLookUp 0.00 root paging:true
tk.MustExec("set @@tidb_enable_paging = off")

for i, ts := range input {
testdata.OnRecord(func() {
output[i].SQL = ts
Expand Down
2 changes: 1 addition & 1 deletion planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ func calcPagingCost(ctx sessionctx.Context, indexPlan PhysicalPlan, expectCnt ui

// we want the diff between idxCst and pagingCst here,
// however, the idxCst does not contain seekFactor, so a seekFactor needs to be removed
return pagingCst - sessVars.GetSeekFactor(nil)
return math.Max(pagingCst-sessVars.GetSeekFactor(nil), 0)
}

func (t *rootTask) convertToRootTask(_ sessionctx.Context) *rootTask {
Expand Down
32 changes: 27 additions & 5 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,15 +712,15 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
var cacheKey []byte
var cacheValue *coprCacheValue

// TODO: cache paging copr
// If there are many ranges, it is very likely to be a TableLookupRequest. They are not worth to cache since
// computing is not the main cost. Ignore such requests directly to avoid slowly building the cache key.
if task.cmdType == tikvrpc.CmdCop && !task.paging && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) {
if task.cmdType == tikvrpc.CmdCop && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) {
cKey, err := coprCacheBuildKey(&copReq)
if err == nil {
cacheKey = cKey
cValue := worker.store.coprCache.Get(cKey)
copReq.IsCacheEnabled = true

if cValue != nil && cValue.RegionID == task.region.GetID() && cValue.TimeStamp <= worker.req.StartTs {
// Append cache version to the request to skip Coprocessor computation if possible
// when request result is cached
Expand Down Expand Up @@ -779,7 +779,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
metrics.TiKVCoprocessorHistogram.WithLabelValues(storeID, strconv.FormatBool(staleRead)).Observe(costTime.Seconds())

if worker.req.Paging {
return worker.handleCopPagingResult(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, task, ch, costTime)
return worker.handleCopPagingResult(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, cacheKey, cacheValue, task, ch, costTime)
}

// Handles the response for non-paging copTask.
Expand Down Expand Up @@ -848,8 +848,8 @@ func appendScanDetail(logStr string, columnFamily string, scanInfo *kvrpcpb.Scan
return logStr
}

func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) {
remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, resp, nil, nil, task, ch, nil, costTime)
func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, cacheKey []byte, cacheValue *coprCacheValue, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) {
remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, resp, cacheKey, cacheValue, task, ch, nil, costTime)
if err != nil || len(remainedTasks) != 0 {
// If there is region error or lock error, keep the paging size and retry.
for _, remainedTask := range remainedTasks {
Expand Down Expand Up @@ -954,6 +954,22 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
data := make([]byte, len(cacheValue.Data))
copy(data, cacheValue.Data)
resp.pbResp.Data = data
if worker.req.Paging {
var start, end []byte
if cacheValue.PageStart != nil {
start = make([]byte, len(cacheValue.PageStart))
copy(start, cacheValue.PageStart)
}
if cacheValue.PageEnd != nil {
end = make([]byte, len(cacheValue.PageEnd))
copy(end, cacheValue.PageEnd)
}
// When paging protocol is used, the response key range is part of the cache data.
resp.pbResp.Range = &coprocessor.KeyRange{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set resp.pbResp.Range = nil when cacheValue.PageStart/PageEnd == nil?

Start: start,
End: end,
}
}
resp.detail.CoprCacheHit = true
} else {
// Cache not hit or cache hit but not valid: update the cache if the response can be cached.
Expand All @@ -969,6 +985,12 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
RegionID: task.region.GetID(),
RegionDataVersion: resp.pbResp.CacheLastVersion,
}
// When paging protocol is used, the response key range is part of the cache data.
if r := resp.pbResp.GetRange(); r != nil {
newCacheValue.PageStart = append([]byte{}, r.GetStart()...)
newCacheValue.PageEnd = append([]byte{}, r.GetEnd()...)
}

worker.store.coprCache.Set(cacheKey, &newCacheValue)
}
}
Expand Down
14 changes: 13 additions & 1 deletion store/copr/coprocessor_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type coprCacheValue struct {
TimeStamp uint64
RegionID uint64
RegionDataVersion uint64

// Used in coprocessor paging protocol
PageStart []byte
PageEnd []byte
}

func (v *coprCacheValue) String() string {
Expand All @@ -54,7 +58,7 @@ func (v *coprCacheValue) String() string {
const coprCacheValueSize = int(unsafe.Sizeof(coprCacheValue{}))

func (v *coprCacheValue) Len() int {
return coprCacheValueSize + len(v.Key) + len(v.Data)
return coprCacheValueSize + len(v.Key) + len(v.Data) + len(v.PageStart) + len(v.PageEnd)
}

func newCoprCache(config *config.CoprocessorCache) (*coprCache, error) {
Expand Down Expand Up @@ -108,6 +112,9 @@ func coprCacheBuildKey(copReq *coprocessor.Request) ([]byte, error) {
}
totalLength += 2 + len(r.Start) + 2 + len(r.End)
}
if copReq.PagingSize > 0 {
totalLength += 1
}

key := make([]byte, totalLength)

Expand Down Expand Up @@ -141,6 +148,11 @@ func coprCacheBuildKey(copReq *coprocessor.Request) ([]byte, error) {
dest += len(r.End)
}

// 1 byte when use paging protocol
if copReq.PagingSize > 0 {
key[dest] = 1
}

return key, nil
}

Expand Down
16 changes: 13 additions & 3 deletions store/copr/coprocessor_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ func TestCacheValueLen(t *testing.T) {
RegionID: 0x1,
RegionDataVersion: 0x3,
}
// 72 = (8 byte pointer + 8 byte for length + 8 byte for cap) * 2 + 8 byte * 3
require.Equal(t, 72, v.Len())
// 120 = (8 byte pointer + 8 byte for length + 8 byte for cap) * 4 + 8 byte * 3
require.Equal(t, 120, v.Len())

v = coprCacheValue{
Key: []byte("foobar"),
Expand All @@ -165,7 +165,17 @@ func TestCacheValueLen(t *testing.T) {
RegionID: 0x1,
RegionDataVersion: 0x3,
}
require.Equal(t, 72+len(v.Key)+len(v.Data), v.Len())
require.Equal(t, 120+len(v.Key)+len(v.Data), v.Len())

v = coprCacheValue{
Key: []byte("foobar"),
Data: []byte("12345678"),
TimeStamp: 0x123,
RegionID: 0x1,
RegionDataVersion: 0x3,
PageEnd: []byte("3235"),
}
require.Equal(t, 120+len(v.Key)+len(v.Data)+len(v.PageEnd), v.Len())
}

func TestGetSet(t *testing.T) {
Expand Down