From 3393cf96a3806be673e333c39e645e6b4005491f Mon Sep 17 00:00:00 2001 From: Shenghui Wu <793703860@qq.com> Date: Tue, 15 Mar 2022 18:35:52 +0800 Subject: [PATCH] *: support mpp partition for tiflash (#31043) close pingcap/tidb#32347 --- distsql/request_builder.go | 6 + executor/builder.go | 9 +- executor/mpp_gather.go | 6 +- executor/partition_table.go | 12 +- executor/table_reader.go | 69 ++++++-- executor/tiflash_test.go | 62 ++++++- go.mod | 6 +- go.sum | 11 +- kv/kv.go | 9 + kv/mpp.go | 4 + planner/core/explain.go | 4 + planner/core/fragment.go | 67 +++++--- planner/core/integration_test.go | 38 ++++ planner/core/physical_plans.go | 13 ++ planner/core/plan_to_pb.go | 11 ++ .../core/testdata/integration_suite_in.json | 9 + .../core/testdata/integration_suite_out.json | 45 +++++ store/copr/batch_coprocessor.go | 162 +++++++++++++----- store/copr/batch_request_sender.go | 21 ++- store/copr/coprocessor.go | 2 + store/copr/mpp.go | 37 ++-- .../unistore/cophandler/closure_exec.go | 10 +- store/mockstore/unistore/cophandler/mpp.go | 22 +++ store/mockstore/unistore/tikv/server.go | 14 ++ table/tables/tables.go | 14 ++ 25 files changed, 547 insertions(+), 116 deletions(-) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 2b1961de2d973..34fd765bd610e 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -217,6 +217,12 @@ func (builder *RequestBuilder) SetAllowBatchCop(batchCop bool) *RequestBuilder { return builder } +// SetPartitionIDAndRanges sets `PartitionIDAndRanges` property. +func (builder *RequestBuilder) SetPartitionIDAndRanges(PartitionIDAndRanges []kv.PartitionIDAndRanges) *RequestBuilder { + builder.PartitionIDAndRanges = PartitionIDAndRanges + return builder +} + func (builder *RequestBuilder) getIsolationLevel() kv.IsoLevel { switch builder.Tp { case kv.ReqTypeAnalyze: diff --git a/executor/builder.go b/executor/builder.go index a4a56736985f2..d8021deec9ff6 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3199,15 +3199,18 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E } }) if useMPPExecution(b.ctx, v) { + plannercore.SetMppOrBatchCopForTableScan(v.GetTablePlan()) return b.buildMPPGather(v) } - ret, err := buildNoRangeTableReader(b, v) + ts, err := v.GetTableScan() if err != nil { b.err = err return nil } - - ts, err := v.GetTableScan() + if v.BatchCop { + ts.IsMPPOrBatchCop = true + } + ret, err := buildNoRangeTableReader(b, v) if err != nil { b.err = err return nil diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index 3299b487599d8..3a8ce366742b6 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -65,7 +65,11 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error { dagReq.EncodeType = tipb.EncodeType_TypeChunk } for _, mppTask := range pf.ExchangeSender.Tasks { - err := updateExecutorTableID(context.Background(), dagReq.RootExecutor, mppTask.TableID, true) + if mppTask.PartitionTableIDs != nil { + err = updateExecutorTableID(context.Background(), dagReq.RootExecutor, true, mppTask.PartitionTableIDs) + } else { + err = updateExecutorTableID(context.Background(), dagReq.RootExecutor, true, []int64{mppTask.TableID}) + } if err != nil { return errors.Trace(err) } diff --git a/executor/partition_table.go b/executor/partition_table.go index f38c2f1b01861..2ec4e6f421cdb 100644 --- a/executor/partition_table.go +++ b/executor/partition_table.go @@ -22,18 +22,20 @@ import ( "github.com/pingcap/tipb/go-tipb" ) -func updateExecutorTableID(ctx context.Context, exec *tipb.Executor, partitionID int64, recursive bool) error { +func updateExecutorTableID(ctx context.Context, exec *tipb.Executor, recursive bool, partitionIDs []int64) error { var child *tipb.Executor switch exec.Tp { case tipb.ExecType_TypeTableScan: - exec.TblScan.TableId = partitionID + exec.TblScan.TableId = partitionIDs[0] // For test coverage. if tmp := ctx.Value("nextPartitionUpdateDAGReq"); tmp != nil { m := tmp.(map[int64]struct{}) - m[partitionID] = struct{}{} + m[partitionIDs[0]] = struct{}{} } + case tipb.ExecType_TypePartitionTableScan: + exec.PartitionTableScan.PartitionIds = partitionIDs case tipb.ExecType_TypeIndexScan: - exec.IdxScan.TableId = partitionID + exec.IdxScan.TableId = partitionIDs[0] case tipb.ExecType_TypeSelection: child = exec.Selection.Child case tipb.ExecType_TypeAggregation, tipb.ExecType_TypeStreamAgg: @@ -54,7 +56,7 @@ func updateExecutorTableID(ctx context.Context, exec *tipb.Executor, partitionID return errors.Trace(fmt.Errorf("unknown new tipb protocol %d", exec.Tp)) } if child != nil && recursive { - return updateExecutorTableID(ctx, child, partitionID, recursive) + return updateExecutorTableID(ctx, child, recursive, partitionIDs) } return nil } diff --git a/executor/table_reader.go b/executor/table_reader.go index 51199c5f0c648..adae20d401efa 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -263,20 +263,32 @@ func (e *TableReaderExecutor) Close() error { // to fetch all results. func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) { if e.storeType == kv.TiFlash && e.kvRangeBuilder != nil { - // TiFlash cannot support to access multiple tables/partitions within one KVReq, so we have to build KVReq for each partition separately. - kvReqs, err := e.buildKVReqSeparately(ctx, ranges) - if err != nil { - return nil, err - } - var results []distsql.SelectResult - for _, kvReq := range kvReqs { - result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) + if !e.batchCop { + // TiFlash cannot support to access multiple tables/partitions within one KVReq, so we have to build KVReq for each partition separately. + kvReqs, err := e.buildKVReqSeparately(ctx, ranges) if err != nil { return nil, err } - results = append(results, result) + var results []distsql.SelectResult + for _, kvReq := range kvReqs { + result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) + if err != nil { + return nil, err + } + results = append(results, result) + } + return distsql.NewSerialSelectResults(results), nil } - return distsql.NewSerialSelectResults(results), nil + // Use PartitionTable Scan + kvReq, err := e.buildKVReqForPartitionTableScan(ctx, ranges) + if err != nil { + return nil, err + } + result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) + if err != nil { + return nil, err + } + return result, nil } kvReq, err := e.buildKVReq(ctx, ranges) @@ -300,7 +312,7 @@ func (e *TableReaderExecutor) buildKVReqSeparately(ctx context.Context, ranges [ kvReqs := make([]*kv.Request, 0, len(kvRanges)) for i, kvRange := range kvRanges { e.kvRanges = append(e.kvRanges, kvRange...) - if err := updateExecutorTableID(ctx, e.dagPB.RootExecutor, pids[i], true); err != nil { + if err := updateExecutorTableID(ctx, e.dagPB.RootExecutor, true, []int64{pids[i]}); err != nil { return nil, err } var builder distsql.RequestBuilder @@ -325,6 +337,41 @@ func (e *TableReaderExecutor) buildKVReqSeparately(ctx context.Context, ranges [ return kvReqs, nil } +func (e *TableReaderExecutor) buildKVReqForPartitionTableScan(ctx context.Context, ranges []*ranger.Range) (*kv.Request, error) { + pids, kvRanges, err := e.kvRangeBuilder.buildKeyRangeSeparately(ranges) + if err != nil { + return nil, err + } + partitionIDAndRanges := make([]kv.PartitionIDAndRanges, 0, len(pids)) + for i, kvRange := range kvRanges { + partitionIDAndRanges = append(partitionIDAndRanges, kv.PartitionIDAndRanges{ + ID: pids[i], + KeyRanges: kvRange, + }) + } + if err := updateExecutorTableID(ctx, e.dagPB.RootExecutor, true, pids); err != nil { + return nil, err + } + var builder distsql.RequestBuilder + reqBuilder := builder.SetPartitionIDAndRanges(partitionIDAndRanges) + kvReq, err := reqBuilder. + SetDAGRequest(e.dagPB). + SetStartTS(e.startTS). + SetDesc(e.desc). + SetKeepOrder(e.keepOrder). + SetStreaming(e.streaming). + SetReadReplicaScope(e.readReplicaScope). + SetFromSessionVars(e.ctx.GetSessionVars()). + SetFromInfoSchema(e.ctx.GetInfoSchema()). + SetMemTracker(e.memTracker). + SetStoreType(e.storeType). + SetAllowBatchCop(e.batchCop).Build() + if err != nil { + return nil, err + } + return kvReq, nil +} + func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.Range) (*kv.Request, error) { var builder distsql.RequestBuilder var reqBuilder *distsql.RequestBuilder diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index a90b49da7575f..13a3cc3bbf432 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -467,7 +467,7 @@ func TestPartitionTable(t *testing.T) { // mock executor does not support use outer table as build side for outer join, so need to // force the inner table as build side tk.MustExec("set tidb_opt_mpp_outer_join_fixed_build_side=1") - failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(4)`) + failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(1)`) tk.MustQuery("select count(*) from t").Check(testkit.Rows("4")) failpoint.Disable("github.com/pingcap/tidb/executor/checkTotalMPPTasks") tk.MustExec("set @@session.tidb_partition_prune_mode='static-only'") @@ -489,7 +489,7 @@ func TestPartitionTable(t *testing.T) { tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") tk.MustExec("set @@session.tidb_allow_mpp=ON") // test if it is really work. - failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(8)`) + failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(2)`) tk.MustQuery("select count(*) from t1 , t where t1.a = t.a").Check(testkit.Rows("4")) // test partition prune tk.MustQuery("select count(*) from t1 , t where t1.a = t.a and t1.a < 2 and t.a < 2").Check(testkit.Rows("1")) @@ -507,7 +507,7 @@ func TestPartitionTable(t *testing.T) { tk.MustExec("insert into t2 values(3,0)") tk.MustExec("insert into t2 values(4,0)") // test with no partition table - failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(9)`) + failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(3)`) tk.MustQuery("select count(*) from t1 , t, t2 where t1.a = t.a and t2.a = t.a").Check(testkit.Rows("4")) failpoint.Disable("github.com/pingcap/tidb/executor/checkTotalMPPTasks") @@ -527,10 +527,10 @@ func TestPartitionTable(t *testing.T) { tk.MustExec("insert into t3 values(3,4)") tk.MustExec("insert into t3 values(4,6)") - failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(7)`) + failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(2)`) tk.MustQuery("select count(*) from t, t3 where t3.a = t.a and t3.b <= 4").Check(testkit.Rows("3")) failpoint.Disable("github.com/pingcap/tidb/executor/checkTotalMPPTasks") - failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(5)`) + failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(2)`) tk.MustQuery("select count(*) from t, t3 where t3.a = t.a and t3.b > 10").Check(testkit.Rows("0")) failpoint.Disable("github.com/pingcap/tidb/executor/checkTotalMPPTasks") failpoint.Disable("github.com/pingcap/tidb/executor/checkUseMPP") @@ -1142,3 +1142,55 @@ func TestForbidTiFlashIfExtraPhysTableIDIsNeeded(t *testing.T) { require.NotContains(t, res, "tikv") tk.MustExec("rollback") } + +func TestTiflashPartitionTableScan(t *testing.T) { + store, clean := createTiFlashStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(\n a int,\n primary key(a)\n) partition by range(a) (\n partition p1 values less than (10),\n partition p2 values less than (20),\n partition p3 values less than (30),\n partition p4 values less than (40),\n partition p5 values less than (50)\n);") + tk.MustExec("alter table t set tiflash replica 1") + tb := external.GetTableByName(t, tk, "test", "t") + err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true) + require.NoError(t, err) + time.Sleep(2 * time.Second) + tk.MustExec("insert into t values(1),(11),(21),(31),(41);") + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic';") + tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\";") + // MPP + tk.MustExec("set @@session.tidb_allow_mpp=ON;") + tk.MustQuery("select count(*) from t where a < 12;").Check(testkit.Rows("2")) + + // BatchCop + tk.MustExec("set @@session.tidb_allow_mpp=OFF;") + tk.MustExec("set @@tidb_allow_batch_cop = 2;") + tk.MustQuery("select count(*) from t where a < 12;").Check(testkit.Rows("2")) + + // test retry batch cop + // MPP + wg := sync.WaitGroup{} + wg.Add(1) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy", `return(true)`)) + go func() { + time.Sleep(100 * time.Millisecond) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy")) + wg.Done() + }() + tk.MustExec("set @@session.tidb_allow_mpp=ON;") + tk.MustQuery("select count(*) from t where a < 12;").Check(testkit.Rows("2")) + wg.Wait() + + // BatchCop + wg.Add(1) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy", `return(true)`)) + go func() { + time.Sleep(100 * time.Millisecond) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy")) + wg.Done() + }() + tk.MustExec("set @@session.tidb_allow_mpp=OFF;") + tk.MustExec("set @@tidb_allow_batch_cop = 2;") + tk.MustQuery("select count(*) from t where a < 12;").Check(testkit.Rows("2")) + wg.Wait() +} diff --git a/go.mod b/go.mod index fc5418f0f5ba3..1a3a23f3c5977 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb-tools v6.0.0-alpha.0.20220309081549-563c2a342f9c+incompatible github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e - github.com/pingcap/tipb v0.0.0-20220110031732-29e23c62eeac + github.com/pingcap/tipb v0.0.0-20220215045658-d12dec7a7609 github.com/prometheus/client_golang v1.11.0 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.26.0 @@ -90,10 +90,10 @@ require ( golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a golang.org/x/sync v0.0.0-20210220032951-036812b2e83c - golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e + golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5 golang.org/x/text v0.3.7 golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba - golang.org/x/tools v0.1.8 + golang.org/x/tools v0.1.9 google.golang.org/api v0.54.0 google.golang.org/grpc v1.43.0 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index e7a881b95c29d..7b3ebb15e6fbc 100644 --- a/go.sum +++ b/go.sum @@ -584,8 +584,8 @@ github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 h1:HYbcxtnkN3s5tqr github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4/go.mod h1:sDCsM39cGiv2vwunZkaFA917vVkqDTGSPbbV7z4Oops= github.com/pingcap/tidb-tools v6.0.0-alpha.0.20220309081549-563c2a342f9c+incompatible h1:oJa/SdxUweAwihGI83pBRle0LBrGPXMPkp2eElH6sF8= github.com/pingcap/tidb-tools v6.0.0-alpha.0.20220309081549-563c2a342f9c+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tipb v0.0.0-20220110031732-29e23c62eeac h1:bVklq/Np5uHUylW4Htyhi92TNrvIqVwht/+bHk0R/Tk= -github.com/pingcap/tipb v0.0.0-20220110031732-29e23c62eeac/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= +github.com/pingcap/tipb v0.0.0-20220215045658-d12dec7a7609 h1:BiCS1ZRnW0szOvTAa3gCqWIhyo+hv83SVaBgrUghXIU= +github.com/pingcap/tipb v0.0.0-20220215045658-d12dec7a7609/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 h1:49lOXmGaUpV9Fz3gd7TFZY106KVlPVa5jcYD1gaQf98= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -1025,8 +1025,9 @@ golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211013075003-97ac67df715c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5 h1:y/woIyUBFbpQGKS0u1aHF/40WUDnek3fPOyD08H5Vng= +golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1110,8 +1111,8 @@ golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.8 h1:P1HhGGuLW4aAclzjtmJdf0mJOjVUZUzOTqkAkWL+l6w= -golang.org/x/tools v0.1.8/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= +golang.org/x/tools v0.1.9 h1:j9KsMiaP1c3B0OTQGth0/k+miLGTgLsAFUCrF2vLcF8= +golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/kv/kv.go b/kv/kv.go index df6d985fee59a..c3c4ff5edfc91 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -313,6 +313,9 @@ type Request struct { Data []byte KeyRanges []KeyRange + // For PartitionTableScan used by tiflash. + PartitionIDAndRanges []PartitionIDAndRanges + // Concurrency is 1, if it only sends the request to a single storage unit when // ResponseIterator.Next is called. If concurrency is greater than 1, the request will be // sent to multiple storage units concurrently. @@ -360,6 +363,12 @@ type Request struct { Paging bool } +// PartitionIDAndRanges used by PartitionTableScan in tiflash. +type PartitionIDAndRanges struct { + ID int64 + KeyRanges []KeyRange +} + const ( // GlobalReplicaScope indicates the default replica scope for tidb to request GlobalReplicaScope = oracle.GlobalTxnScope diff --git a/kv/mpp.go b/kv/mpp.go index 231f0cccb2325..012a182aacd1a 100644 --- a/kv/mpp.go +++ b/kv/mpp.go @@ -33,6 +33,8 @@ type MPPTask struct { ID int64 // mppTaskID StartTs uint64 TableID int64 // physical table id + + PartitionTableIDs []int64 } // ToPB generates the pb structure. @@ -89,4 +91,6 @@ type MPPClient interface { type MPPBuildTasksRequest struct { KeyRanges []KeyRange StartTS uint64 + + PartitionIDAndRanges []PartitionIDAndRanges } diff --git a/planner/core/explain.go b/planner/core/explain.go index b18dc174ae622..3f70804c055d5 100644 --- a/planner/core/explain.go +++ b/planner/core/explain.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" @@ -289,6 +290,9 @@ func (p *PhysicalTableScan) OperatorInfo(normalized bool) string { if p.stats.StatsVersion == statistics.PseudoVersion && !normalized { buffer.WriteString(", stats:pseudo") } + if p.StoreType == kv.TiFlash && p.Table.GetPartitionInfo() != nil && p.IsMPPOrBatchCop && p.ctx.GetSessionVars().UseDynamicPartitionPrune() { + buffer.WriteString(", PartitionTableScan:true") + } return buffer.String() } diff --git a/planner/core/fragment.go b/planner/core/fragment.go index b97aded29b5de..89ad2098890ba 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -16,6 +16,7 @@ package core import ( "context" + "sort" "time" "github.com/pingcap/errors" @@ -28,6 +29,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/ranger" "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" ) @@ -312,42 +314,26 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic } } + var req *kv.MPPBuildTasksRequest + var allPartitionsIDs []int64 + var err error splitedRanges, _ := distsql.SplitRangesAcrossInt64Boundary(ts.Ranges, false, false, ts.Table.IsCommonHandle) if ts.Table.GetPartitionInfo() != nil { tmp, _ := e.is.TableByID(ts.Table.ID) tbl := tmp.(table.PartitionedTable) - partitions, err := partitionPruning(e.ctx, tbl, ts.PartitionInfo.PruningConds, ts.PartitionInfo.PartitionNames, ts.PartitionInfo.Columns, ts.PartitionInfo.ColumnNames) + var partitions []table.PhysicalTable + partitions, err = partitionPruning(e.ctx, tbl, ts.PartitionInfo.PruningConds, ts.PartitionInfo.PartitionNames, ts.PartitionInfo.Columns, ts.PartitionInfo.ColumnNames) if err != nil { return nil, errors.Trace(err) } - var ret []*kv.MPPTask - for _, p := range partitions { - pid := p.GetPhysicalID() - meta := p.Meta() - kvRanges, err := distsql.TableHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, []int64{pid}, meta != nil && ts.Table.IsCommonHandle, splitedRanges, nil) - if err != nil { - return nil, errors.Trace(err) - } - tasks, err := e.constructMPPTasksForSinglePartitionTable(ctx, kvRanges, pid) - if err != nil { - return nil, errors.Trace(err) - } - ret = append(ret, tasks...) - } - return ret, nil + req, allPartitionsIDs, err = e.constructMPPBuildTaskReqForPartitionedTable(ts, splitedRanges, partitions) + } else { + req, err = e.constructMPPBuildTaskForNonPartitionTable(ts, splitedRanges) } - - kvRanges, err := distsql.TableHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, []int64{ts.Table.ID}, ts.Table.IsCommonHandle, splitedRanges, nil) if err != nil { return nil, errors.Trace(err) } - return e.constructMPPTasksForSinglePartitionTable(ctx, kvRanges, ts.Table.ID) -} -func (e *mppTaskGenerator) constructMPPTasksForSinglePartitionTable(ctx context.Context, kvRanges []kv.KeyRange, tableID int64) ([]*kv.MPPTask, error) { - req := &kv.MPPBuildTasksRequest{ - KeyRanges: kvRanges, - } ttl, err := time.ParseDuration(e.ctx.GetSessionVars().MPPStoreFailTTL) if err != nil { logutil.BgLogger().Warn("MPP store fail ttl is invalid", zap.Error(err)) @@ -357,9 +343,40 @@ func (e *mppTaskGenerator) constructMPPTasksForSinglePartitionTable(ctx context. if err != nil { return nil, errors.Trace(err) } + tasks := make([]*kv.MPPTask, 0, len(metas)) for _, meta := range metas { - tasks = append(tasks, &kv.MPPTask{Meta: meta, ID: e.ctx.GetSessionVars().AllocMPPTaskID(e.startTS), StartTs: e.startTS, TableID: tableID}) + task := &kv.MPPTask{Meta: meta, ID: e.ctx.GetSessionVars().AllocMPPTaskID(e.startTS), StartTs: e.startTS, TableID: ts.Table.ID, PartitionTableIDs: allPartitionsIDs} + tasks = append(tasks, task) } return tasks, nil } + +func (e *mppTaskGenerator) constructMPPBuildTaskReqForPartitionedTable(ts *PhysicalTableScan, splitedRanges []*ranger.Range, partitions []table.PhysicalTable) (*kv.MPPBuildTasksRequest, []int64, error) { + sort.Slice(partitions, func(i, j int) bool { + return partitions[i].GetPhysicalID() < partitions[j].GetPhysicalID() + }) + partitionIDAndRanges := make([]kv.PartitionIDAndRanges, len(partitions)) + allPartitionsIDs := make([]int64, len(partitions)) + // Get region info for each partition + for i, p := range partitions { + pid := p.GetPhysicalID() + meta := p.Meta() + kvRanges, err := distsql.TableHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, []int64{pid}, meta != nil && ts.Table.IsCommonHandle, splitedRanges, nil) + if err != nil { + return nil, nil, errors.Trace(err) + } + partitionIDAndRanges[i].ID = pid + partitionIDAndRanges[i].KeyRanges = kvRanges + allPartitionsIDs[i] = pid + } + return &kv.MPPBuildTasksRequest{PartitionIDAndRanges: partitionIDAndRanges}, allPartitionsIDs, nil +} + +func (e *mppTaskGenerator) constructMPPBuildTaskForNonPartitionTable(ts *PhysicalTableScan, splitedRanges []*ranger.Range) (*kv.MPPBuildTasksRequest, error) { + kvRanges, err := distsql.TableHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, []int64{ts.Table.ID}, ts.Table.IsCommonHandle, splitedRanges, nil) + if err != nil { + return nil, errors.Trace(err) + } + return &kv.MPPBuildTasksRequest{KeyRanges: kvRanges}, nil +} diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 81b15202e2776..e47a4ef72a9d6 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -6205,3 +6205,41 @@ func TestIssue32632(t *testing.T) { tk.MustExec("drop table if exists partsupp") tk.MustExec("drop table if exists supplier") } + +func TestTiFlashPartitionTableScan(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") + tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'") + tk.MustExec("set @@tidb_enforce_mpp = on") + tk.MustExec("set @@tidb_allow_batch_cop = 2") + tk.MustExec("drop table if exists rp_t;") + tk.MustExec("drop table if exists hp_t;") + tk.MustExec("create table rp_t(a int) partition by RANGE (a) (PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11), PARTITION p2 VALUES LESS THAN (16), PARTITION p3 VALUES LESS THAN (21));") + tk.MustExec("create table hp_t(a int) partition by hash(a) partitions 4;") + tbl1, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "rp_t", L: "rp_t"}) + require.NoError(t, err) + tbl2, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "hp_t", L: "hp_t"}) + require.NoError(t, err) + // Set the hacked TiFlash replica for explain tests. + tbl1.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true} + tbl2.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true} + var input []string + var output []struct { + SQL string + Plan []string + } + integrationSuiteData := core.GetIntegrationSuiteData() + integrationSuiteData.GetTestCases(t, &input, &output) + for i, tt := range input { + testdata.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + tk.MustQuery(tt).Check(testkit.Rows(output[i].Plan...)) + } + tk.MustExec("drop table rp_t;") + tk.MustExec("drop table hp_t;") +} diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index d60eb2ae24168..984ce5a419cae 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -129,6 +129,17 @@ func (p *PhysicalTableReader) GetTableScan() (*PhysicalTableScan, error) { return tableScans[0], nil } +// SetMppOrBatchCopForTableScan set IsMPPOrBatchCop for all TableScan. +func SetMppOrBatchCopForTableScan(curPlan PhysicalPlan) { + if ts, ok := curPlan.(*PhysicalTableScan); ok { + ts.IsMPPOrBatchCop = true + } + children := curPlan.Children() + for _, child := range children { + SetMppOrBatchCopForTableScan(child) + } +} + // GetPhysicalTableReader returns PhysicalTableReader for logical TiKVSingleGather. func (sg *TiKVSingleGather) GetPhysicalTableReader(schema *expression.Schema, stats *property.StatsInfo, props ...*property.PhysicalProperty) *PhysicalTableReader { reader := PhysicalTableReader{}.Init(sg.ctx, sg.blockOffset) @@ -478,6 +489,8 @@ type PhysicalTableScan struct { StoreType kv.StoreType + IsMPPOrBatchCop bool // Used for tiflash PartitionTableScan. + // The table scan may be a partition, rather than a real table. // TODO: clean up this field. After we support dynamic partitioning, table scan // works on the whole partition table, and `isPartition` is not used. diff --git a/planner/core/plan_to_pb.go b/planner/core/plan_to_pb.go index ad5e24efbc24f..9f45ddd54d133 100644 --- a/planner/core/plan_to_pb.go +++ b/planner/core/plan_to_pb.go @@ -176,6 +176,9 @@ func (p *PhysicalLimit) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*t // ToPB implements PhysicalPlan ToPB interface. func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) { + if storeType == kv.TiFlash && p.Table.GetPartitionInfo() != nil && p.IsMPPOrBatchCop && p.ctx.GetSessionVars().UseDynamicPartitionPrune() { + return p.partitionTableScanToPBForFlash(ctx) + } tsExec := tables.BuildTableScanFromInfos(p.Table, p.Columns) tsExec.Desc = p.Desc if p.isPartition { @@ -189,6 +192,14 @@ func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context, storeType kv.StoreType) return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tsExec, ExecutorId: &executorID}, err } +func (p *PhysicalTableScan) partitionTableScanToPBForFlash(ctx sessionctx.Context) (*tipb.Executor, error) { + ptsExec := tables.BuildPartitionTableScanFromInfos(p.Table, p.Columns) + ptsExec.Desc = p.Desc + executorID := p.ExplainID().String() + err := SetPBColumnsDefaultValue(ctx, ptsExec.Columns, p.Columns) + return &tipb.Executor{Tp: tipb.ExecType_TypePartitionTableScan, PartitionTableScan: ptsExec, ExecutorId: &executorID}, err +} + // checkCoverIndex checks whether we can pass unique info to TiKV. We should push it if and only if the length of // range and index are equal. func checkCoverIndex(idx *model.IndexInfo, ranges []*ranger.Range) bool { diff --git a/planner/core/testdata/integration_suite_in.json b/planner/core/testdata/integration_suite_in.json index 2fafa02cc5542..a93847591c21a 100644 --- a/planner/core/testdata/integration_suite_in.json +++ b/planner/core/testdata/integration_suite_in.json @@ -873,5 +873,14 @@ "cases": [ "explain format = 'brief' select sum(ps_supplycost) from partsupp, supplier where ps_suppkey = s_suppkey;" ] + }, + { + "name": "TestTiFlashPartitionTableScan", + "cases": [ + "explain format = 'brief' select * from rp_t where a = 1 or a = 20", + "explain format = 'brief' select * from hp_t where a = 1 or a = 20", + "explain format = 'brief' select count(*) from rp_t where a = 1 or a = 20", + "explain format = 'brief' select count(*) from hp_t where a = 1 or a = 20" + ] } ] diff --git a/planner/core/testdata/integration_suite_out.json b/planner/core/testdata/integration_suite_out.json index e7f322c89a10c..242bac6254942 100644 --- a/planner/core/testdata/integration_suite_out.json +++ b/planner/core/testdata/integration_suite_out.json @@ -6582,5 +6582,50 @@ ] } ] + }, + { + "Name": "TestTiFlashPartitionTableScan", + "Cases": [ + { + "SQL": "explain format = 'brief' select * from rp_t where a = 1 or a = 20", + "Plan": [ + "TableReader 20.00 root partition:p0,p3 data:ExchangeSender", + "└─ExchangeSender 20.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─Selection 20.00 batchCop[tiflash] or(eq(test.rp_t.a, 1), eq(test.rp_t.a, 20))", + " └─TableFullScan 10000.00 batchCop[tiflash] table:rp_t keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format = 'brief' select * from hp_t where a = 1 or a = 20", + "Plan": [ + "TableReader 20.00 root partition:p0,p1 data:ExchangeSender", + "└─ExchangeSender 20.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─Selection 20.00 batchCop[tiflash] or(eq(test.hp_t.a, 1), eq(test.hp_t.a, 20))", + " └─TableFullScan 10000.00 batchCop[tiflash] table:hp_t keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format = 'brief' select count(*) from rp_t where a = 1 or a = 20", + "Plan": [ + "HashAgg 1.00 root funcs:count(Column#5)->Column#3", + "└─TableReader 1.00 root partition:p0,p3 data:ExchangeSender", + " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 batchCop[tiflash] funcs:count(1)->Column#5", + " └─Selection 20.00 batchCop[tiflash] or(eq(test.rp_t.a, 1), eq(test.rp_t.a, 20))", + " └─TableFullScan 10000.00 batchCop[tiflash] table:rp_t keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format = 'brief' select count(*) from hp_t where a = 1 or a = 20", + "Plan": [ + "HashAgg 1.00 root funcs:count(Column#5)->Column#3", + "└─TableReader 1.00 root partition:p0,p1 data:ExchangeSender", + " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 batchCop[tiflash] funcs:count(1)->Column#5", + " └─Selection 20.00 batchCop[tiflash] or(eq(test.hp_t.a, 1), eq(test.hp_t.a, 20))", + " └─TableFullScan 10000.00 batchCop[tiflash] table:hp_t keep order:false, stats:pseudo" + ] + } + ] } ] diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index fe83d40a835b2..32f38a72f7576 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/mpp" "github.com/pingcap/log" "github.com/pingcap/tidb/kv" @@ -49,7 +48,10 @@ type batchCopTask struct { cmdType tikvrpc.CmdType ctx *tikv.RPCContext - regionInfos []RegionInfo + regionInfos []RegionInfo // region info for single physical table + // PartitionTableRegions indicates region infos for each partition table, used by scanning partitions in batch. + // Thus, one of `regionInfos` and `PartitionTableRegions` must be nil. + PartitionTableRegions []*coprocessor.TableRegions } type batchCopResponse struct { @@ -523,24 +525,47 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks [] return ret } -func buildBatchCopTasks(bo *backoff.Backoffer, store *kvStore, ranges *KeyRanges, storeType kv.StoreType, mppStoreLastFailTime map[string]time.Time, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { +func buildBatchCopTasksForNonPartitionedTable(bo *backoff.Backoffer, store *kvStore, ranges *KeyRanges, storeType kv.StoreType, mppStoreLastFailTime map[string]time.Time, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { + return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) +} + +func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, mppStoreLastFailTime map[string]time.Time, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64, partitionIDs []int64) ([]*batchCopTask, error) { + batchTasks, err := buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) + if err != nil { + return nil, err + } + // generate tableRegions for batchCopTasks + convertRegionInfosToPartitionTableRegions(batchTasks, partitionIDs) + return batchTasks, nil +} + +// When `partitionIDs != nil`, it means that buildBatchCopTasksCore is constructing a batch cop tasks for PartitionTableScan. +// At this time, `len(rangesForEachPhysicalTable) == len(partitionIDs)` and `rangesForEachPhysicalTable[i]` is for partition `partitionIDs[i]`. +// Otherwise, `rangesForEachPhysicalTable[0]` indicates the range for the single physical table. +func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, mppStoreLastFailTime map[string]time.Time, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { cache := store.GetRegionCache() start := time.Now() const cmdType = tikvrpc.CmdBatchCop - rangesLen := ranges.Len() + rangesLen := 0 + for { - locations, err := cache.SplitKeyRangesByLocations(bo, ranges) - if err != nil { - return nil, errors.Trace(err) - } var tasks []*copTask - for _, lo := range locations { - tasks = append(tasks, &copTask{ - region: lo.Location.Region, - ranges: lo.Ranges, - cmdType: cmdType, - storeType: storeType, - }) + rangesLen = 0 + for i, ranges := range rangesForEachPhysicalTable { + rangesLen += ranges.Len() + locations, err := cache.SplitKeyRangesByLocations(bo, ranges) + if err != nil { + return nil, errors.Trace(err) + } + for _, lo := range locations { + tasks = append(tasks, &copTask{ + region: lo.Location.Region, + ranges: lo.Ranges, + cmdType: cmdType, + storeType: storeType, + partitionIndex: int64(i), + }) + } } var batchTasks []*batchCopTask @@ -565,13 +590,13 @@ func buildBatchCopTasks(bo *backoff.Backoffer, store *kvStore, ranges *KeyRanges } allStores := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store) if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok { - batchCop.regionInfos = append(batchCop.regionInfos, RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores}) + batchCop.regionInfos = append(batchCop.regionInfos, RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores, PartitionIndex: task.partitionIndex}) } else { batchTask := &batchCopTask{ storeAddr: rpcCtx.Addr, cmdType: cmdType, ctx: rpcCtx, - regionInfos: []RegionInfo{{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores}}, + regionInfos: []RegionInfo{{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores, PartitionIndex: task.partitionIndex}}, } storeTaskMap[rpcCtx.Addr] = batchTask } @@ -580,7 +605,7 @@ func buildBatchCopTasks(bo *backoff.Backoffer, store *kvStore, ranges *KeyRanges // As mentioned above, nil rpcCtx is always attributed to failed stores. // It's equal to long poll the store but get no response. Here we'd better use // TiFlash error to trigger the TiKV fallback mechanism. - err = bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer")) + err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer")) if err != nil { return nil, errors.Trace(err) } @@ -609,7 +634,7 @@ func buildBatchCopTasks(bo *backoff.Backoffer, store *kvStore, ranges *KeyRanges } if elapsed := time.Since(start); elapsed > time.Millisecond*500 { - logutil.BgLogger().Warn("buildBatchCopTasks takes too much time", + logutil.BgLogger().Warn("buildBatchCopTasksCore takes too much time", zap.Duration("elapsed", elapsed), zap.Duration("balanceElapsed", balanceElapsed), zap.Int("range len", rangesLen), @@ -620,14 +645,56 @@ func buildBatchCopTasks(bo *backoff.Backoffer, store *kvStore, ranges *KeyRanges } } +func convertRegionInfosToPartitionTableRegions(batchTasks []*batchCopTask, partitionIDs []int64) { + for _, copTask := range batchTasks { + tableRegions := make([]*coprocessor.TableRegions, len(partitionIDs)) + // init coprocessor.TableRegions + for j, pid := range partitionIDs { + tableRegions[j] = &coprocessor.TableRegions{ + PhysicalTableId: pid, + } + } + // fill region infos + for _, ri := range copTask.regionInfos { + tableRegions[ri.PartitionIndex].Regions = append(tableRegions[ri.PartitionIndex].Regions, + ri.toCoprocessorRegionInfo()) + } + count := 0 + // clear empty table region + for j := 0; j < len(tableRegions); j++ { + if len(tableRegions[j].Regions) != 0 { + tableRegions[count] = tableRegions[j] + count++ + } + } + copTask.PartitionTableRegions = tableRegions[:count] + copTask.regionInfos = nil + } +} + func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.Variables, option *kv.ClientSendOption) kv.Response { if req.KeepOrder || req.Desc { return copErrorResponse{errors.New("batch coprocessor cannot prove keep order or desc property")} } ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs) bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) - ranges := NewKeyRanges(req.KeyRanges) - tasks, err := buildBatchCopTasks(bo, c.store.kvStore, ranges, req.StoreType, nil, 0, false, 0) + + var tasks []*batchCopTask + var err error + if req.PartitionIDAndRanges != nil { + // For Partition Table Scan + keyRanges := make([]*KeyRanges, 0, len(req.PartitionIDAndRanges)) + partitionIDs := make([]int64, 0, len(req.PartitionIDAndRanges)) + for _, pi := range req.PartitionIDAndRanges { + keyRanges = append(keyRanges, NewKeyRanges(pi.KeyRanges)) + partitionIDs = append(partitionIDs, pi.ID) + } + tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store.kvStore, keyRanges, req.StoreType, nil, 0, false, 0, partitionIDs) + } else { + ranges := NewKeyRanges(req.KeyRanges) + tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store.kvStore, ranges, req.StoreType, nil, 0, false, 0) + } + if err != nil { return copErrorResponse{err} } @@ -765,13 +832,34 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task * // Merge all ranges and request again. func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) { - var ranges []kv.KeyRange - for _, ri := range batchTask.regionInfos { - ri.Ranges.Do(func(ran *kv.KeyRange) { - ranges = append(ranges, *ran) - }) + if batchTask.regionInfos != nil { + var ranges []kv.KeyRange + for _, ri := range batchTask.regionInfos { + ri.Ranges.Do(func(ran *kv.KeyRange) { + ranges = append(ranges, *ran) + }) + } + ret, err := buildBatchCopTasksForNonPartitionedTable(bo, b.store, NewKeyRanges(ranges), b.req.StoreType, nil, 0, false, 0) + return ret, err + } + // Retry Partition Table Scan + keyRanges := make([]*KeyRanges, 0, len(batchTask.PartitionTableRegions)) + pid := make([]int64, 0, len(batchTask.PartitionTableRegions)) + for _, trs := range batchTask.PartitionTableRegions { + pid = append(pid, trs.PhysicalTableId) + ranges := make([]kv.KeyRange, 0, len(trs.Regions)) + for _, ri := range trs.Regions { + for _, ran := range ri.Ranges { + ranges = append(ranges, kv.KeyRange{ + StartKey: ran.Start, + EndKey: ran.End, + }) + } + } + keyRanges = append(keyRanges, NewKeyRanges(ranges)) } - return buildBatchCopTasks(bo, b.store, NewKeyRanges(ranges), b.req.StoreType, nil, 0, false, 0) + ret, err := buildBatchCopTasksForPartitionedTable(bo, b.store, keyRanges, b.req.StoreType, nil, 0, false, 0, pid) + return ret, err } const readTimeoutUltraLong = 3600 * time.Second // For requests that may scan many regions for tiflash. @@ -780,22 +868,16 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backo sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient(), b.enableCollectExecutionInfo) var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos)) for _, ri := range task.regionInfos { - regionInfos = append(regionInfos, &coprocessor.RegionInfo{ - RegionId: ri.Region.GetID(), - RegionEpoch: &metapb.RegionEpoch{ - ConfVer: ri.Region.GetConfVer(), - Version: ri.Region.GetVer(), - }, - Ranges: ri.Ranges.ToPBRanges(), - }) + regionInfos = append(regionInfos, ri.toCoprocessorRegionInfo()) } copReq := coprocessor.BatchRequest{ - Tp: b.req.Tp, - StartTs: b.req.StartTs, - Data: b.req.Data, - SchemaVer: b.req.SchemaVar, - Regions: regionInfos, + Tp: b.req.Tp, + StartTs: b.req.StartTs, + Data: b.req.Data, + SchemaVer: b.req.SchemaVar, + Regions: regionInfos, + TableRegions: task.PartitionTableRegions, } req := tikvrpc.NewRequest(task.cmdType, &copReq, kvrpcpb.Context{ diff --git a/store/copr/batch_request_sender.go b/store/copr/batch_request_sender.go index 7a77ef9ba7150..b976d26a59ab3 100644 --- a/store/copr/batch_request_sender.go +++ b/store/copr/batch_request_sender.go @@ -19,6 +19,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/metapb" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/tikv" @@ -29,10 +30,22 @@ import ( // RegionInfo contains region related information for batchCopTask type RegionInfo struct { - Region tikv.RegionVerID - Meta *metapb.Region - Ranges *KeyRanges - AllStores []uint64 + Region tikv.RegionVerID + Meta *metapb.Region + Ranges *KeyRanges + AllStores []uint64 + PartitionIndex int64 // used by PartitionTableScan, indicates the n-th partition of the partition table +} + +func (ri *RegionInfo) toCoprocessorRegionInfo() *coprocessor.RegionInfo { + return &coprocessor.RegionInfo{ + RegionId: ri.Region.GetID(), + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: ri.Region.GetConfVer(), + Version: ri.Region.GetVer(), + }, + Ranges: ri.Ranges.ToPBRanges(), + } } // RegionBatchRequestSender sends BatchCop requests to TiFlash server by stream way. diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index f39fe9b6163fc..2836f69271afd 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -158,6 +158,8 @@ type copTask struct { eventCb trxevents.EventCallback paging bool pagingSize uint64 + + partitionIndex int64 // used by balanceBatchCopTask in PartitionTableScan } func (r *copTask) String() string { diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 1a19d80ed326c..f385fedacc5b3 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/mpp" "github.com/pingcap/log" "github.com/pingcap/tidb/config" @@ -65,11 +64,24 @@ func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta { func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest, mppStoreLastFailTime map[string]time.Time, ttl time.Duration) ([]kv.MPPTaskMeta, error) { ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTS) bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil) - if req.KeyRanges == nil { - return c.selectAllTiFlashStore(), nil + var tasks []*batchCopTask + var err error + if req.PartitionIDAndRanges != nil { + rangesForEachPartition := make([]*KeyRanges, len(req.PartitionIDAndRanges)) + partitionIDs := make([]int64, len(req.PartitionIDAndRanges)) + for i, p := range req.PartitionIDAndRanges { + rangesForEachPartition[i] = NewKeyRanges(p.KeyRanges) + partitionIDs[i] = p.ID + } + tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store, rangesForEachPartition, kv.TiFlash, mppStoreLastFailTime, ttl, true, 20, partitionIDs) + } else { + if req.KeyRanges == nil { + return c.selectAllTiFlashStore(), nil + } + ranges := NewKeyRanges(req.KeyRanges) + tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store, ranges, kv.TiFlash, mppStoreLastFailTime, ttl, true, 20) } - ranges := NewKeyRanges(req.KeyRanges) - tasks, err := buildBatchCopTasks(bo, c.store, ranges, kv.TiFlash, mppStoreLastFailTime, ttl, true, 20) + if err != nil { return nil, errors.Trace(err) } @@ -200,14 +212,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req originalTask, ok := req.Meta.(*batchCopTask) if ok { for _, ri := range originalTask.regionInfos { - regionInfos = append(regionInfos, &coprocessor.RegionInfo{ - RegionId: ri.Region.GetID(), - RegionEpoch: &metapb.RegionEpoch{ - ConfVer: ri.Region.GetConfVer(), - Version: ri.Region.GetVer(), - }, - Ranges: ri.Ranges.ToPBRanges(), - }) + regionInfos = append(regionInfos, ri.toCoprocessorRegionInfo()) } } @@ -222,6 +227,12 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req SchemaVer: req.SchemaVar, Regions: regionInfos, } + if originalTask != nil { + mppReq.TableRegions = originalTask.PartitionTableRegions + if mppReq.TableRegions != nil { + mppReq.Regions = nil + } + } wrappedReq := tikvrpc.NewRequest(tikvrpc.CmdMPPTask, mppReq, kvrpcpb.Context{}) wrappedReq.StoreTp = tikvrpc.TiFlash diff --git a/store/mockstore/unistore/cophandler/closure_exec.go b/store/mockstore/unistore/cophandler/closure_exec.go index 03170cf4dfac7..e7bac738bc67f 100644 --- a/store/mockstore/unistore/cophandler/closure_exec.go +++ b/store/mockstore/unistore/cophandler/closure_exec.go @@ -199,7 +199,8 @@ func convertToExprs(sc *stmtctx.StatementContext, fieldTps []*types.FieldType, p func isScanNode(executor *tipb.Executor) bool { switch executor.Tp { case tipb.ExecType_TypeTableScan, - tipb.ExecType_TypeIndexScan: + tipb.ExecType_TypeIndexScan, + tipb.ExecType_TypePartitionTableScan: return true default: return false @@ -263,6 +264,13 @@ func newClosureExecutor(dagCtx *dagContext, outputOffsets []uint32, scanExec *ti e.idxScanCtx.prevVals = make([][]byte, e.idxScanCtx.columnLen) } e.scanType = IndexScan + case tipb.ExecType_TypePartitionTableScan: + dagCtx.setColumnInfo(scanExec.PartitionTableScan.Columns) + dagCtx.primaryCols = scanExec.PartitionTableScan.PrimaryColumnIds + tblScan := scanExec.PartitionTableScan + e.unique = true + e.scanCtx.desc = tblScan.Desc + e.scanType = TableScan default: panic(fmt.Sprintf("unknown first executor type %s", scanExec.Tp)) } diff --git a/store/mockstore/unistore/cophandler/mpp.go b/store/mockstore/unistore/cophandler/mpp.go index cc8954e025ce9..fcab223f170fa 100644 --- a/store/mockstore/unistore/cophandler/mpp.go +++ b/store/mockstore/unistore/cophandler/mpp.go @@ -90,6 +90,25 @@ func (b *mppExecBuilder) buildMPPTableScan(pb *tipb.TableScan) (*tableScanExec, return ts, err } +func (b *mppExecBuilder) buildMPPPartitionTableScan(pb *tipb.PartitionTableScan) (*tableScanExec, error) { + ranges, err := extractKVRanges(b.dbReader.StartKey, b.dbReader.EndKey, b.dagCtx.keyRanges, false) + if err != nil { + return nil, errors.Trace(err) + } + ts := &tableScanExec{ + baseMPPExec: baseMPPExec{sc: b.sc, mppCtx: b.mppCtx}, + startTS: b.dagCtx.startTS, + kvRanges: ranges, + dbReader: b.dbReader, + } + for _, col := range pb.Columns { + ft := fieldTypeFromPBColumn(col) + ts.fieldTypes = append(ts.fieldTypes, ft) + } + ts.decoder, err = newRowDecoder(pb.Columns, ts.fieldTypes, pb.PrimaryColumnIds, b.sc.TimeZone) + return ts, err +} + func (b *mppExecBuilder) buildIdxScan(pb *tipb.IndexScan) (*indexScanExec, error) { ranges, err := extractKVRanges(b.dbReader.StartKey, b.dbReader.EndKey, b.dagCtx.keyRanges, pb.Desc) if err != nil { @@ -457,6 +476,9 @@ func (b *mppExecBuilder) buildMPPExecutor(exec *tipb.Executor) (mppExec, error) return b.buildLimit(exec.Limit) case tipb.ExecType_TypeTopN: return b.buildTopN(exec.TopN) + case tipb.ExecType_TypePartitionTableScan: + ts := exec.PartitionTableScan + return b.buildMPPPartitionTableScan(ts) default: return nil, errors.Errorf(ErrExecutorNotSupportedMsg + exec.Tp.String()) } diff --git a/store/mockstore/unistore/tikv/server.go b/store/mockstore/unistore/tikv/server.go index 44610ba963c86..36bf176b9f405 100644 --- a/store/mockstore/unistore/tikv/server.go +++ b/store/mockstore/unistore/tikv/server.go @@ -583,6 +583,13 @@ func (svr *Server) BatchCoprocessor(req *coprocessor.BatchRequest, batchCopServe ctx.finish() } }() + if req.TableRegions != nil { + // Support PartitionTableScan for BatchCop + req.Regions = req.Regions[:] + for _, tr := range req.TableRegions { + req.Regions = append(req.Regions, tr.Regions...) + } + } for _, ri := range req.Regions { cop := coprocessor.Request{ Tp: kv.ReqTypeDAG, @@ -643,6 +650,13 @@ func (svr *Server) DispatchMPPTask(_ context.Context, _ *mpp.DispatchTaskRequest func (svr *Server) executeMPPDispatch(ctx context.Context, req *mpp.DispatchTaskRequest, storeAddr string, storeID uint64, handler *cophandler.MPPTaskHandler) error { var reqCtx *requestCtx + if len(req.TableRegions) > 0 { + // Simple unistore logic for PartitionTableScan. + for _, tr := range req.TableRegions { + req.Regions = append(req.Regions, tr.Regions...) + } + } + if len(req.Regions) > 0 { kvContext := &kvrpcpb.Context{ RegionId: req.Regions[0].RegionId, diff --git a/table/tables/tables.go b/table/tables/tables.go index 56bbaacf55271..8fee9660a698d 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -1919,6 +1919,20 @@ func BuildTableScanFromInfos(tableInfo *model.TableInfo, columnInfos []*model.Co return tsExec } +// BuildPartitionTableScanFromInfos build tipb.PartitonTableScan with *model.TableInfo and *model.ColumnInfo. +func BuildPartitionTableScanFromInfos(tableInfo *model.TableInfo, columnInfos []*model.ColumnInfo) *tipb.PartitionTableScan { + pkColIds := TryGetCommonPkColumnIds(tableInfo) + tsExec := &tipb.PartitionTableScan{ + TableId: tableInfo.ID, + Columns: util.ColumnsToProto(columnInfos, tableInfo.PKIsHandle), + PrimaryColumnIds: pkColIds, + } + if tableInfo.IsCommonHandle { + tsExec.PrimaryPrefixColumnIds = PrimaryPrefixColumnIDs(tableInfo) + } + return tsExec +} + // TemporaryTable is used to store transaction-specific or session-specific information for global / local temporary tables. // For example, stats and autoID should have their own copies of data, instead of being shared by all sessions. type TemporaryTable struct {