Skip to content

Commit

Permalink
*: fix bug that broadcast join/MPP not compatible with clustered index (
Browse files Browse the repository at this point in the history
  • Loading branch information
windtalker authored Dec 15, 2020
1 parent 77d1707 commit f5a3a38
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 53 deletions.
10 changes: 5 additions & 5 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,13 @@ func (dc *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl ta
}
var b distsql.RequestBuilder
var builder *distsql.RequestBuilder
if !tbl.Meta().IsCommonHandle {
ranges := ranger.FullIntRange(false)
builder = b.SetTableRanges(tbl.GetPhysicalID(), ranges, nil)
var ranges []*ranger.Range
if tbl.Meta().IsCommonHandle {
ranges = ranger.FullNotNullRange()
} else {
ranges := ranger.FullNotNullRange()
builder = b.SetCommonHandleRanges(sctx.GetSessionVars().StmtCtx, tbl.GetPhysicalID(), ranges)
ranges = ranger.FullIntRange(false)
}
builder = b.SetHandleRanges(sctx.GetSessionVars().StmtCtx, tbl.GetPhysicalID(), tbl.Meta().IsCommonHandle, ranges, nil)
builder.SetDAGRequest(dagPB).
SetStartTS(startTS).
SetKeepOrder(true).
Expand Down
42 changes: 21 additions & 21 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,10 @@ func (builder *RequestBuilder) SetMemTracker(tracker *memory.Tracker) *RequestBu
return builder
}

// SetTableRangesForTables sets "KeyRanges" for "kv.Request" by converting multiples "tableRanges"
// to "KeyRanges" firstly.
func (builder *RequestBuilder) SetTableRangesForTables(tids []int64, tableRanges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder {
if builder.err == nil {
builder.Request.KeyRanges = TablesRangesToKVRanges(tids, tableRanges, fb)
}
return builder
}

// SetTableRanges sets "KeyRanges" for "kv.Request" by converting "tableRanges"
// to "KeyRanges" firstly.
// Note this function should be deleted or at least not exported, but currently
// br refers it, so have to keep it.
func (builder *RequestBuilder) SetTableRanges(tid int64, tableRanges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder {
if builder.err == nil {
builder.Request.KeyRanges = TableRangesToKVRanges(tid, tableRanges, fb)
Expand All @@ -84,20 +77,17 @@ func (builder *RequestBuilder) SetIndexRangesForTables(sc *stmtctx.StatementCont
return builder
}

// SetCommonHandleRanges sets "KeyRanges" for "kv.Request" by converting common handle range
// SetHandleRanges sets "KeyRanges" for "kv.Request" by converting table handle range
// "ranges" to "KeyRanges" firstly.
func (builder *RequestBuilder) SetCommonHandleRanges(sc *stmtctx.StatementContext, tid int64, ranges []*ranger.Range) *RequestBuilder {
if builder.err == nil {
builder.Request.KeyRanges, builder.err = CommonHandleRangesToKVRanges(sc, []int64{tid}, ranges)
}
return builder
func (builder *RequestBuilder) SetHandleRanges(sc *stmtctx.StatementContext, tid int64, isCommonHandle bool, ranges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder {
return builder.SetHandleRangesForTables(sc, []int64{tid}, isCommonHandle, ranges, fb)
}

// SetCommonHandleRangesForTables sets "KeyRanges" for "kv.Request" by converting common handle range
// SetHandleRangesForTables sets "KeyRanges" for "kv.Request" by converting table handle range
// "ranges" to "KeyRanges" firstly for multiple tables.
func (builder *RequestBuilder) SetCommonHandleRangesForTables(sc *stmtctx.StatementContext, tid []int64, ranges []*ranger.Range) *RequestBuilder {
func (builder *RequestBuilder) SetHandleRangesForTables(sc *stmtctx.StatementContext, tid []int64, isCommonHandle bool, ranges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder {
if builder.err == nil {
builder.Request.KeyRanges, builder.err = CommonHandleRangesToKVRanges(sc, tid, ranges)
builder.Request.KeyRanges, builder.err = TableHandleRangesToKVRanges(sc, tid, isCommonHandle, ranges, fb)
}
return builder
}
Expand Down Expand Up @@ -255,13 +245,23 @@ func (builder *RequestBuilder) SetTiDBServerID(serverID uint64) *RequestBuilder
return builder
}

// TableHandleRangesToKVRanges convert table handle ranges to "KeyRanges" for multiple tables.
func TableHandleRangesToKVRanges(sc *stmtctx.StatementContext, tid []int64, isCommonHandle bool, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) {
if !isCommonHandle {
return tablesRangesToKVRanges(tid, ranges, fb), nil
}
return CommonHandleRangesToKVRanges(sc, tid, ranges)
}

// TableRangesToKVRanges converts table ranges to "KeyRange".
// Note this function should not be exported, but currently
// br refers to it, so have to keep it.
func TableRangesToKVRanges(tid int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) []kv.KeyRange {
return TablesRangesToKVRanges([]int64{tid}, ranges, fb)
return tablesRangesToKVRanges([]int64{tid}, ranges, fb)
}

// TablesRangesToKVRanges converts table ranges to "KeyRange".
func TablesRangesToKVRanges(tids []int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) []kv.KeyRange {
// tablesRangesToKVRanges converts table ranges to "KeyRange".
func tablesRangesToKVRanges(tids []int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) []kv.KeyRange {
if fb == nil || fb.Hist == nil {
return tableRangesToKVRangesWithoutSplit(tids, ranges)
}
Expand Down
2 changes: 1 addition & 1 deletion distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (s *testSuite) TestRequestBuilder1(c *C) {
},
}

actual, err := (&RequestBuilder{}).SetTableRanges(12, ranges, nil).
actual, err := (&RequestBuilder{}).SetHandleRanges(nil, 12, false, ranges, nil).
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
Expand Down
9 changes: 2 additions & 7 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang
var builder distsql.RequestBuilder
var kvReqBuilder *distsql.RequestBuilder
if e.isCommonHandle && e.idxInfo.Primary {
kvReqBuilder = builder.SetCommonHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, e.tableID.CollectIDs, ranges)
kvReqBuilder = builder.SetHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, e.tableID.CollectIDs, true, ranges, nil)
} else {
kvReqBuilder = builder.SetIndexRangesForTables(e.ctx.GetSessionVars().StmtCtx, e.tableID.CollectIDs, e.idxInfo.ID, ranges)
}
Expand Down Expand Up @@ -469,12 +469,7 @@ func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error {

func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectResult, error) {
var builder distsql.RequestBuilder
var reqBuilder *distsql.RequestBuilder
if e.handleCols != nil && !e.handleCols.IsInt() {
reqBuilder = builder.SetCommonHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, e.tableID.CollectIDs, ranges)
} else {
reqBuilder = builder.SetTableRangesForTables(e.tableID.CollectIDs, ranges, nil)
}
reqBuilder := builder.SetHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, e.tableID.CollectIDs, e.handleCols != nil && !e.handleCols.IsInt(), ranges, nil)
// Always set KeepOrder of the request to be true, in order to compute
// correct `correlation` of columns.
kvReq, err := reqBuilder.
Expand Down
13 changes: 4 additions & 9 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3337,16 +3337,11 @@ func (h kvRangeBuilderFromRangeAndPartition) buildKeyRange(int64) ([]kv.KeyRange
for _, p := range h.partitions {
pid := p.GetPhysicalID()
meta := p.Meta()
if meta != nil && meta.IsCommonHandle {
kvRange, err := distsql.CommonHandleRangesToKVRanges(h.sctx.GetSessionVars().StmtCtx, []int64{pid}, h.ranges)
if err != nil {
return nil, err
}
ret = append(ret, kvRange...)
} else {
kvRange := distsql.TableRangesToKVRanges(pid, h.ranges, nil)
ret = append(ret, kvRange...)
kvRange, err := distsql.TableHandleRangesToKVRanges(h.sctx.GetSessionVars().StmtCtx, []int64{pid}, meta != nil && meta.IsCommonHandle, h.ranges, nil)
if err != nil {
return nil, err
}
ret = append(ret, kvRange...)
}
return ret, nil
}
Expand Down
9 changes: 7 additions & 2 deletions executor/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,15 @@ func (c *checksumContext) buildTableRequest(ctx sessionctx.Context, tableID int6
Algorithm: tipb.ChecksumAlgorithm_Crc64_Xor,
}

ranges := ranger.FullIntRange(false)
var ranges []*ranger.Range
if c.TableInfo.IsCommonHandle {
ranges = ranger.FullNotNullRange()
} else {
ranges = ranger.FullIntRange(false)
}

var builder distsql.RequestBuilder
return builder.SetTableRanges(tableID, ranges, nil).
return builder.SetHandleRanges(ctx.GetSessionVars().StmtCtx, tableID, c.TableInfo.IsCommonHandle, ranges, nil).
SetChecksumRequest(checksum).
SetStartTS(c.StartTs).
SetConcurrency(ctx.GetSessionVars().DistSQLScanConcurrency()).
Expand Down
12 changes: 8 additions & 4 deletions executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ type MPPGather struct {
}

func (e *MPPGather) constructMPPTasksImpl(ctx context.Context, p *plannercore.Fragment) ([]*kv.MPPTask, error) {
isCommonHandle := p.TableScan.Table.IsCommonHandle
if p.TableScan.Table.GetPartitionInfo() == nil {
return e.constructSinglePhysicalTable(ctx, p.TableScan.Table.ID, p.TableScan.Ranges)
return e.constructSinglePhysicalTable(ctx, p.TableScan.Table.ID, isCommonHandle, p.TableScan.Ranges)
}
tmp, _ := e.is.TableByID(p.TableScan.Table.ID)
tbl := tmp.(table.PartitionedTable)
Expand All @@ -65,7 +66,7 @@ func (e *MPPGather) constructMPPTasksImpl(ctx context.Context, p *plannercore.Fr
}
allTasks := make([]*kv.MPPTask, 0)
for _, part := range partitions {
partTasks, err := e.constructSinglePhysicalTable(ctx, part.GetPhysicalID(), p.TableScan.Ranges)
partTasks, err := e.constructSinglePhysicalTable(ctx, part.GetPhysicalID(), isCommonHandle, p.TableScan.Ranges)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -75,8 +76,11 @@ func (e *MPPGather) constructMPPTasksImpl(ctx context.Context, p *plannercore.Fr
}

// single physical table means a table without partitions or a single partition in a partition table.
func (e *MPPGather) constructSinglePhysicalTable(ctx context.Context, tableID int64, ranges []*ranger.Range) ([]*kv.MPPTask, error) {
kvRanges := distsql.TableRangesToKVRanges(tableID, ranges, nil)
func (e *MPPGather) constructSinglePhysicalTable(ctx context.Context, tableID int64, isCommonHandle bool, ranges []*ranger.Range) ([]*kv.MPPTask, error) {
kvRanges, err := distsql.TableHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, []int64{tableID}, isCommonHandle, ranges, nil)
if err != nil {
return nil, errors.Trace(err)
}
req := &kv.MPPBuildTasksRequest{KeyRanges: kvRanges}
metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req)
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,8 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
return nil, err
}
reqBuilder = builder.SetKeyRanges(kvRange)
} else if e.table.Meta() != nil && e.table.Meta().IsCommonHandle {
reqBuilder = builder.SetCommonHandleRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), ranges)
} else {
reqBuilder = builder.SetTableRanges(getPhysicalTableID(e.table), ranges, e.feedback)
reqBuilder = builder.SetHandleRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges, e.feedback)
}
kvReq, err := reqBuilder.
SetDAGRequest(e.dagPB).
Expand Down
5 changes: 4 additions & 1 deletion planner/core/plan_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context, storeType kv.StoreType)
executorID := ""
if storeType == kv.TiFlash && p.IsGlobalRead {
tsExec.NextReadEngine = tipb.EngineType_TiFlash
ranges := distsql.TableRangesToKVRanges(tsExec.TableId, p.Ranges, nil)
ranges, err := distsql.TableHandleRangesToKVRanges(ctx.GetSessionVars().StmtCtx, []int64{tsExec.TableId}, p.Table.IsCommonHandle, p.Ranges, nil)
if err != nil {
return nil, err
}
for _, keyRange := range ranges {
tsExec.Ranges = append(tsExec.Ranges, tipb.KeyRange{Low: keyRange.StartKey, High: keyRange.EndKey})
}
Expand Down

0 comments on commit f5a3a38

Please sign in to comment.