From f5a3a38bfe6f8062cd9a78a649fec5a3855e2035 Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 15 Dec 2020 11:42:55 +0800 Subject: [PATCH] *: fix bug that broadcast join/MPP not compatible with clustered index (#21663) --- ddl/reorg.go | 10 ++++---- distsql/request_builder.go | 42 ++++++++++++++++----------------- distsql/request_builder_test.go | 2 +- executor/analyze.go | 9 ++----- executor/builder.go | 13 ++++------ executor/checksum.go | 9 +++++-- executor/mpp_gather.go | 12 ++++++---- executor/table_reader.go | 4 +--- planner/core/plan_to_pb.go | 5 +++- 9 files changed, 53 insertions(+), 53 deletions(-) diff --git a/ddl/reorg.go b/ddl/reorg.go index f2a3ff9362cc8..67e50381be1dc 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -408,13 +408,13 @@ func (dc *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl ta } var b distsql.RequestBuilder var builder *distsql.RequestBuilder - if !tbl.Meta().IsCommonHandle { - ranges := ranger.FullIntRange(false) - builder = b.SetTableRanges(tbl.GetPhysicalID(), ranges, nil) + var ranges []*ranger.Range + if tbl.Meta().IsCommonHandle { + ranges = ranger.FullNotNullRange() } else { - ranges := ranger.FullNotNullRange() - builder = b.SetCommonHandleRanges(sctx.GetSessionVars().StmtCtx, tbl.GetPhysicalID(), ranges) + ranges = ranger.FullIntRange(false) } + builder = b.SetHandleRanges(sctx.GetSessionVars().StmtCtx, tbl.GetPhysicalID(), tbl.Meta().IsCommonHandle, ranges, nil) builder.SetDAGRequest(dagPB). SetStartTS(startTS). SetKeepOrder(true). diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 39fe8c7f1f2f8..8fe631b3bbff8 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -48,17 +48,10 @@ func (builder *RequestBuilder) SetMemTracker(tracker *memory.Tracker) *RequestBu return builder } -// SetTableRangesForTables sets "KeyRanges" for "kv.Request" by converting multiples "tableRanges" -// to "KeyRanges" firstly. -func (builder *RequestBuilder) SetTableRangesForTables(tids []int64, tableRanges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder { - if builder.err == nil { - builder.Request.KeyRanges = TablesRangesToKVRanges(tids, tableRanges, fb) - } - return builder -} - // SetTableRanges sets "KeyRanges" for "kv.Request" by converting "tableRanges" // to "KeyRanges" firstly. +// Note this function should be deleted or at least not exported, but currently +// br refers it, so have to keep it. func (builder *RequestBuilder) SetTableRanges(tid int64, tableRanges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder { if builder.err == nil { builder.Request.KeyRanges = TableRangesToKVRanges(tid, tableRanges, fb) @@ -84,20 +77,17 @@ func (builder *RequestBuilder) SetIndexRangesForTables(sc *stmtctx.StatementCont return builder } -// SetCommonHandleRanges sets "KeyRanges" for "kv.Request" by converting common handle range +// SetHandleRanges sets "KeyRanges" for "kv.Request" by converting table handle range // "ranges" to "KeyRanges" firstly. -func (builder *RequestBuilder) SetCommonHandleRanges(sc *stmtctx.StatementContext, tid int64, ranges []*ranger.Range) *RequestBuilder { - if builder.err == nil { - builder.Request.KeyRanges, builder.err = CommonHandleRangesToKVRanges(sc, []int64{tid}, ranges) - } - return builder +func (builder *RequestBuilder) SetHandleRanges(sc *stmtctx.StatementContext, tid int64, isCommonHandle bool, ranges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder { + return builder.SetHandleRangesForTables(sc, []int64{tid}, isCommonHandle, ranges, fb) } -// SetCommonHandleRangesForTables sets "KeyRanges" for "kv.Request" by converting common handle range +// SetHandleRangesForTables sets "KeyRanges" for "kv.Request" by converting table handle range // "ranges" to "KeyRanges" firstly for multiple tables. -func (builder *RequestBuilder) SetCommonHandleRangesForTables(sc *stmtctx.StatementContext, tid []int64, ranges []*ranger.Range) *RequestBuilder { +func (builder *RequestBuilder) SetHandleRangesForTables(sc *stmtctx.StatementContext, tid []int64, isCommonHandle bool, ranges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder { if builder.err == nil { - builder.Request.KeyRanges, builder.err = CommonHandleRangesToKVRanges(sc, tid, ranges) + builder.Request.KeyRanges, builder.err = TableHandleRangesToKVRanges(sc, tid, isCommonHandle, ranges, fb) } return builder } @@ -255,13 +245,23 @@ func (builder *RequestBuilder) SetTiDBServerID(serverID uint64) *RequestBuilder return builder } +// TableHandleRangesToKVRanges convert table handle ranges to "KeyRanges" for multiple tables. +func TableHandleRangesToKVRanges(sc *stmtctx.StatementContext, tid []int64, isCommonHandle bool, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) { + if !isCommonHandle { + return tablesRangesToKVRanges(tid, ranges, fb), nil + } + return CommonHandleRangesToKVRanges(sc, tid, ranges) +} + // TableRangesToKVRanges converts table ranges to "KeyRange". +// Note this function should not be exported, but currently +// br refers to it, so have to keep it. func TableRangesToKVRanges(tid int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) []kv.KeyRange { - return TablesRangesToKVRanges([]int64{tid}, ranges, fb) + return tablesRangesToKVRanges([]int64{tid}, ranges, fb) } -// TablesRangesToKVRanges converts table ranges to "KeyRange". -func TablesRangesToKVRanges(tids []int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) []kv.KeyRange { +// tablesRangesToKVRanges converts table ranges to "KeyRange". +func tablesRangesToKVRanges(tids []int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) []kv.KeyRange { if fb == nil || fb.Hist == nil { return tableRangesToKVRangesWithoutSplit(tids, ranges) } diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index 4fe2617efffe1..d97c34a853605 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -278,7 +278,7 @@ func (s *testSuite) TestRequestBuilder1(c *C) { }, } - actual, err := (&RequestBuilder{}).SetTableRanges(12, ranges, nil). + actual, err := (&RequestBuilder{}).SetHandleRanges(nil, 12, false, ranges, nil). SetDAGRequest(&tipb.DAGRequest{}). SetDesc(false). SetKeepOrder(false). diff --git a/executor/analyze.go b/executor/analyze.go index ece7a77ec26b6..091abb8575c74 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -278,7 +278,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang var builder distsql.RequestBuilder var kvReqBuilder *distsql.RequestBuilder if e.isCommonHandle && e.idxInfo.Primary { - kvReqBuilder = builder.SetCommonHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, e.tableID.CollectIDs, ranges) + kvReqBuilder = builder.SetHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, e.tableID.CollectIDs, true, ranges, nil) } else { kvReqBuilder = builder.SetIndexRangesForTables(e.ctx.GetSessionVars().StmtCtx, e.tableID.CollectIDs, e.idxInfo.ID, ranges) } @@ -469,12 +469,7 @@ func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error { func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectResult, error) { var builder distsql.RequestBuilder - var reqBuilder *distsql.RequestBuilder - if e.handleCols != nil && !e.handleCols.IsInt() { - reqBuilder = builder.SetCommonHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, e.tableID.CollectIDs, ranges) - } else { - reqBuilder = builder.SetTableRangesForTables(e.tableID.CollectIDs, ranges, nil) - } + reqBuilder := builder.SetHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, e.tableID.CollectIDs, e.handleCols != nil && !e.handleCols.IsInt(), ranges, nil) // Always set KeepOrder of the request to be true, in order to compute // correct `correlation` of columns. kvReq, err := reqBuilder. diff --git a/executor/builder.go b/executor/builder.go index d6d4506e7ad1d..845be050c7950 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3337,16 +3337,11 @@ func (h kvRangeBuilderFromRangeAndPartition) buildKeyRange(int64) ([]kv.KeyRange for _, p := range h.partitions { pid := p.GetPhysicalID() meta := p.Meta() - if meta != nil && meta.IsCommonHandle { - kvRange, err := distsql.CommonHandleRangesToKVRanges(h.sctx.GetSessionVars().StmtCtx, []int64{pid}, h.ranges) - if err != nil { - return nil, err - } - ret = append(ret, kvRange...) - } else { - kvRange := distsql.TableRangesToKVRanges(pid, h.ranges, nil) - ret = append(ret, kvRange...) + kvRange, err := distsql.TableHandleRangesToKVRanges(h.sctx.GetSessionVars().StmtCtx, []int64{pid}, meta != nil && meta.IsCommonHandle, h.ranges, nil) + if err != nil { + return nil, err } + ret = append(ret, kvRange...) } return ret, nil } diff --git a/executor/checksum.go b/executor/checksum.go index 3f735b2838445..c6c28fe593754 100644 --- a/executor/checksum.go +++ b/executor/checksum.go @@ -233,10 +233,15 @@ func (c *checksumContext) buildTableRequest(ctx sessionctx.Context, tableID int6 Algorithm: tipb.ChecksumAlgorithm_Crc64_Xor, } - ranges := ranger.FullIntRange(false) + var ranges []*ranger.Range + if c.TableInfo.IsCommonHandle { + ranges = ranger.FullNotNullRange() + } else { + ranges = ranger.FullIntRange(false) + } var builder distsql.RequestBuilder - return builder.SetTableRanges(tableID, ranges, nil). + return builder.SetHandleRanges(ctx.GetSessionVars().StmtCtx, tableID, c.TableInfo.IsCommonHandle, ranges, nil). SetChecksumRequest(checksum). SetStartTS(c.StartTs). SetConcurrency(ctx.GetSessionVars().DistSQLScanConcurrency()). diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index aba6f9cd04e2b..668c8681f7b20 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -54,8 +54,9 @@ type MPPGather struct { } func (e *MPPGather) constructMPPTasksImpl(ctx context.Context, p *plannercore.Fragment) ([]*kv.MPPTask, error) { + isCommonHandle := p.TableScan.Table.IsCommonHandle if p.TableScan.Table.GetPartitionInfo() == nil { - return e.constructSinglePhysicalTable(ctx, p.TableScan.Table.ID, p.TableScan.Ranges) + return e.constructSinglePhysicalTable(ctx, p.TableScan.Table.ID, isCommonHandle, p.TableScan.Ranges) } tmp, _ := e.is.TableByID(p.TableScan.Table.ID) tbl := tmp.(table.PartitionedTable) @@ -65,7 +66,7 @@ func (e *MPPGather) constructMPPTasksImpl(ctx context.Context, p *plannercore.Fr } allTasks := make([]*kv.MPPTask, 0) for _, part := range partitions { - partTasks, err := e.constructSinglePhysicalTable(ctx, part.GetPhysicalID(), p.TableScan.Ranges) + partTasks, err := e.constructSinglePhysicalTable(ctx, part.GetPhysicalID(), isCommonHandle, p.TableScan.Ranges) if err != nil { return nil, errors.Trace(err) } @@ -75,8 +76,11 @@ func (e *MPPGather) constructMPPTasksImpl(ctx context.Context, p *plannercore.Fr } // single physical table means a table without partitions or a single partition in a partition table. -func (e *MPPGather) constructSinglePhysicalTable(ctx context.Context, tableID int64, ranges []*ranger.Range) ([]*kv.MPPTask, error) { - kvRanges := distsql.TableRangesToKVRanges(tableID, ranges, nil) +func (e *MPPGather) constructSinglePhysicalTable(ctx context.Context, tableID int64, isCommonHandle bool, ranges []*ranger.Range) ([]*kv.MPPTask, error) { + kvRanges, err := distsql.TableHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, []int64{tableID}, isCommonHandle, ranges, nil) + if err != nil { + return nil, errors.Trace(err) + } req := &kv.MPPBuildTasksRequest{KeyRanges: kvRanges} metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req) if err != nil { diff --git a/executor/table_reader.go b/executor/table_reader.go index 3c5f7fade5ab1..4a73006eadb20 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -219,10 +219,8 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra return nil, err } reqBuilder = builder.SetKeyRanges(kvRange) - } else if e.table.Meta() != nil && e.table.Meta().IsCommonHandle { - reqBuilder = builder.SetCommonHandleRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), ranges) } else { - reqBuilder = builder.SetTableRanges(getPhysicalTableID(e.table), ranges, e.feedback) + reqBuilder = builder.SetHandleRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges, e.feedback) } kvReq, err := reqBuilder. SetDAGRequest(e.dagPB). diff --git a/planner/core/plan_to_pb.go b/planner/core/plan_to_pb.go index bc63e87e52773..8ae2f76b04ca1 100644 --- a/planner/core/plan_to_pb.go +++ b/planner/core/plan_to_pb.go @@ -159,7 +159,10 @@ func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context, storeType kv.StoreType) executorID := "" if storeType == kv.TiFlash && p.IsGlobalRead { tsExec.NextReadEngine = tipb.EngineType_TiFlash - ranges := distsql.TableRangesToKVRanges(tsExec.TableId, p.Ranges, nil) + ranges, err := distsql.TableHandleRangesToKVRanges(ctx.GetSessionVars().StmtCtx, []int64{tsExec.TableId}, p.Table.IsCommonHandle, p.Ranges, nil) + if err != nil { + return nil, err + } for _, keyRange := range ranges { tsExec.Ranges = append(tsExec.Ranges, tipb.KeyRange{Low: keyRange.StartKey, High: keyRange.EndKey}) }