Skip to content

Commit

Permalink
plan, executor: make Analyze a Plan (pingcap#3130)
Browse files Browse the repository at this point in the history
* executor: remove indexExec's depedency on indexPlan

* plan, executor: make Analyze a Plan

* address comment

* address comment

* fix gofmt
  • Loading branch information
alivxxx authored Apr 26, 2017
1 parent 2693f26 commit c237781
Show file tree
Hide file tree
Showing 14 changed files with 179 additions and 208 deletions.
2 changes: 1 addition & 1 deletion executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func analyzeColumns(exec *XSelectTableExec) analyzeResult {
}

func analyzeIndex(exec *XSelectIndexExec) analyzeResult {
count, hg, err := statistics.BuildIndex(exec.ctx, defaultBucketCount, exec.indexPlan.Index.ID, &recordSet{executor: exec})
count, hg, err := statistics.BuildIndex(exec.ctx, defaultBucketCount, exec.index.ID, &recordSet{executor: exec})
return analyzeResult{tableID: exec.tableInfo.ID, hist: []*statistics.Histogram{hg}, count: count, isIndex: 1, err: err}
}

Expand Down
112 changes: 85 additions & 27 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,9 @@ func (b *executorBuilder) buildUnionScanExec(v *plan.PhysicalUnionScan) Executor
us.conditions = v.Conditions
us.buildAndSortAddedRows(x.table, x.asName)
case *XSelectIndexExec:
us.desc = x.indexPlan.Desc
for _, ic := range x.indexPlan.Index.Columns {
for i, col := range x.indexPlan.Schema().Columns {
us.desc = x.desc
for _, ic := range x.index.Columns {
for i, col := range x.schema.Columns {
if col.ColName.L == ic.Name.L {
us.usedIndex = append(us.usedIndex, i)
break
Expand Down Expand Up @@ -582,18 +582,26 @@ func (b *executorBuilder) buildIndexScan(v *plan.PhysicalIndexScan) Executor {
client := b.ctx.GetClient()
supportDesc := client.SupportRequestType(kv.ReqTypeIndex, kv.ReqSubTypeDesc)
e := &XSelectIndexExec{
tableInfo: v.Table,
ctx: b.ctx,
supportDesc: supportDesc,
asName: v.TableAsName,
table: table,
indexPlan: v,
singleReadMode: !v.DoubleRead,
startTS: startTS,
where: v.TableConditionPBExpr,
aggregate: v.Aggregated,
aggFuncs: v.AggFuncsPB,
byItems: v.GbyItemsPB,
tableInfo: v.Table,
ctx: b.ctx,
supportDesc: supportDesc,
asName: v.TableAsName,
table: table,
singleReadMode: !v.DoubleRead,
startTS: startTS,
where: v.TableConditionPBExpr,
schema: v.Schema(),
ranges: v.Ranges,
limitCount: v.LimitCount,
sortItemsPB: v.SortItemsPB,
columns: v.Columns,
index: v.Index,
desc: v.Desc,
outOfOrder: v.OutOfOrder,
indexConditionPBExpr: v.IndexConditionPBExpr,
aggregate: v.Aggregated,
aggFuncs: v.AggFuncsPB,
byItems: v.GbyItemsPB,
}
vars := b.ctx.GetSessionVars()
if v.OutOfOrder {
Expand All @@ -605,9 +613,9 @@ func (b *executorBuilder) buildIndexScan(v *plan.PhysicalIndexScan) Executor {
}
if !e.aggregate && e.singleReadMode {
// Single read index result has the schema of full index columns.
schemaColumns := make([]*expression.Column, len(e.indexPlan.Index.Columns))
for i, col := range e.indexPlan.Index.Columns {
colInfo := e.indexPlan.Table.Columns[col.Offset]
schemaColumns := make([]*expression.Column, len(e.index.Columns))
for i, col := range e.index.Columns {
colInfo := e.tableInfo.Columns[col.Offset]
schemaColumns[i] = &expression.Column{
Index: i,
ColName: col.Name,
Expand Down Expand Up @@ -743,22 +751,72 @@ func (b *executorBuilder) buildCache(v *plan.Cache) Executor {
}
}

func (b *executorBuilder) buildTableScanForAnalyze(tblInfo *model.TableInfo, cols []*model.ColumnInfo) Executor {
startTS := b.getStartTS()
if b.err != nil {
return nil
}
table, _ := b.is.TableByID(tblInfo.ID)
schema := expression.NewSchema(expression.ColumnInfos2Columns(tblInfo.Name, cols)...)
ranges := []types.IntColumnRange{{math.MinInt64, math.MaxInt64}}
e := &XSelectTableExec{
tableInfo: tblInfo,
ctx: b.ctx,
startTS: startTS,
table: table,
schema: schema,
Columns: cols,
ranges: ranges,
}
return e
}

func (b *executorBuilder) buildIndexScanForAnalyze(tblInfo *model.TableInfo, idxInfo *model.IndexInfo) Executor {
startTS := b.getStartTS()
if b.err != nil {
return nil
}
table, _ := b.is.TableByID(tblInfo.ID)
cols := make([]*model.ColumnInfo, len(idxInfo.Columns))
for i, col := range idxInfo.Columns {
cols[i] = tblInfo.Columns[col.Offset]
}
schema := expression.NewSchema(expression.ColumnInfos2Columns(tblInfo.Name, cols)...)
idxRange := &types.IndexRange{LowVal: []types.Datum{types.MinNotNullDatum()}, HighVal: []types.Datum{types.MaxValueDatum()}}
scanConcurrency := b.ctx.GetSessionVars().IndexSerialScanConcurrency
e := &XSelectIndexExec{
tableInfo: tblInfo,
ctx: b.ctx,
table: table,
singleReadMode: true,
startTS: startTS,
idxColsSchema: schema,
schema: schema,
ranges: []*types.IndexRange{idxRange},
columns: cols,
index: idxInfo,
outOfOrder: false,
scanConcurrency: scanConcurrency,
}
return e
}

func (b *executorBuilder) buildAnalyze(v *plan.Analyze) Executor {
e := &AnalyzeExec{
ctx: b.ctx,
tasks: make([]analyzeTask, 0, len(v.Children())),
}
pkTasks := v.Children()[:len(v.PkTasks)]
for _, task := range pkTasks {
e.tasks = append(e.tasks, analyzeTask{taskType: pkTask, src: b.build(task)})
for _, task := range v.PkTasks {
e.tasks = append(e.tasks, analyzeTask{taskType: pkTask,
src: b.buildTableScanForAnalyze(task.TableInfo, []*model.ColumnInfo{task.PKInfo})})
}
colTasks := v.Children()[len(v.PkTasks) : len(v.PkTasks)+len(v.ColTasks)]
for _, task := range colTasks {
e.tasks = append(e.tasks, analyzeTask{taskType: colTask, src: b.build(task)})
for _, task := range v.ColTasks {
e.tasks = append(e.tasks, analyzeTask{taskType: colTask,
src: b.buildTableScanForAnalyze(task.TableInfo, task.ColsInfo)})
}
idxTasks := v.Children()[len(v.PkTasks)+len(v.ColTasks):]
for _, task := range idxTasks {
e.tasks = append(e.tasks, analyzeTask{taskType: idxTask, src: b.build(task)})
for _, task := range v.IdxTasks {
e.tasks = append(e.tasks, analyzeTask{taskType: idxTask,
src: b.buildIndexScanForAnalyze(task.TableInfo, task.IndexInfo)})
}
return e
}
76 changes: 41 additions & 35 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -338,8 +337,6 @@ type XSelectIndexExec struct {
isMemDB bool
singleReadMode bool

indexPlan *plan.PhysicalIndexScan

// Variables only used for single read.
result distsql.SelectResult
partialResult distsql.PartialResult
Expand All @@ -351,9 +348,18 @@ type XSelectIndexExec struct {
taskCurr *lookupTableTask
handleCount uint64 // returned handle count in double read.

where *tipb.Expr
startTS uint64
returnedRows uint64 // returned row count
where *tipb.Expr
startTS uint64
returnedRows uint64 // returned row count
schema *expression.Schema
ranges []*types.IndexRange
limitCount *int64
sortItemsPB []*tipb.ByItem
columns []*model.ColumnInfo
index *model.IndexInfo
desc bool
outOfOrder bool
indexConditionPBExpr *tipb.Expr

/*
The following attributes are used for aggregation push down.
Expand All @@ -373,7 +379,7 @@ type XSelectIndexExec struct {

// Schema implements Exec Schema interface.
func (e *XSelectIndexExec) Schema() *expression.Schema {
return e.indexPlan.Schema()
return e.schema
}

// Close implements Exec Close interface.
Expand All @@ -396,7 +402,7 @@ func (e *XSelectIndexExec) Close() error {

// Next implements the Executor Next interface.
func (e *XSelectIndexExec) Next() (*Row, error) {
if e.indexPlan.LimitCount != nil && len(e.indexPlan.SortItemsPB) == 0 && e.returnedRows >= uint64(*e.indexPlan.LimitCount) {
if e.limitCount != nil && len(e.sortItemsPB) == 0 && e.returnedRows >= uint64(*e.limitCount) {
return nil, nil
}
e.returnedRows++
Expand Down Expand Up @@ -483,17 +489,17 @@ func decodeRawValues(values []types.Datum, schema *expression.Schema) error {
}

func (e *XSelectIndexExec) indexRowToTableRow(handle int64, indexRow []types.Datum) []types.Datum {
tableRow := make([]types.Datum, len(e.indexPlan.Columns))
for i, tblCol := range e.indexPlan.Columns {
if table.ToColumn(tblCol).IsPKHandleColumn(e.indexPlan.Table) {
tableRow := make([]types.Datum, len(e.columns))
for i, tblCol := range e.columns {
if table.ToColumn(tblCol).IsPKHandleColumn(e.tableInfo) {
if mysql.HasUnsignedFlag(tblCol.FieldType.Flag) {
tableRow[i] = types.NewUintDatum(uint64(handle))
} else {
tableRow[i] = types.NewIntDatum(handle)
}
continue
}
for j, idxCol := range e.indexPlan.Index.Columns {
for j, idxCol := range e.index.Columns {
if tblCol.Name.L == idxCol.Name.L {
tableRow[i] = indexRow[j]
break
Expand Down Expand Up @@ -546,7 +552,7 @@ func (e *XSelectIndexExec) nextForDoubleRead() (*Row, error) {

func (e *XSelectIndexExec) slowQueryInfo(duration time.Duration) string {
return fmt.Sprintf("time: %v, table: %s(%d), index: %s(%d), partials: %d, concurrency: %d, rows: %d, handles: %d",
duration, e.tableInfo.Name, e.tableInfo.ID, e.indexPlan.Index.Name, e.indexPlan.Index.ID,
duration, e.tableInfo.Name, e.tableInfo.ID, e.index.Name, e.index.ID,
e.partialCount, e.scanConcurrency, e.returnedRows, e.handleCount)
}

Expand Down Expand Up @@ -601,36 +607,36 @@ func (e *XSelectIndexExec) doIndexRequest() (distsql.SelectResult, error) {
selIdxReq.StartTs = e.startTS
selIdxReq.TimeZoneOffset = timeZoneOffset()
selIdxReq.Flags = statementContextToFlags(e.ctx.GetSessionVars().StmtCtx)
selIdxReq.IndexInfo = distsql.IndexToProto(e.table.Meta(), e.indexPlan.Index)
if e.indexPlan.Desc {
selIdxReq.OrderBy = []*tipb.ByItem{{Desc: e.indexPlan.Desc}}
selIdxReq.IndexInfo = distsql.IndexToProto(e.table.Meta(), e.index)
if e.desc {
selIdxReq.OrderBy = []*tipb.ByItem{{Desc: e.desc}}
}
// If the index is single read, we can push topn down.
if e.singleReadMode {
selIdxReq.Limit = e.indexPlan.LimitCount
selIdxReq.Limit = e.limitCount
// If sortItemsPB is empty, the Desc may be true and we shouldn't overwrite it.
if len(e.indexPlan.SortItemsPB) > 0 {
selIdxReq.OrderBy = e.indexPlan.SortItemsPB
if len(e.sortItemsPB) > 0 {
selIdxReq.OrderBy = e.sortItemsPB
}
} else if e.where == nil && len(e.indexPlan.SortItemsPB) == 0 {
} else if e.where == nil && len(e.sortItemsPB) == 0 {
// If the index is double read but table scan has no filter or topn, we can push limit down to index.
selIdxReq.Limit = e.indexPlan.LimitCount
selIdxReq.Limit = e.limitCount
}
selIdxReq.Where = e.indexPlan.IndexConditionPBExpr
selIdxReq.Where = e.indexConditionPBExpr
if e.singleReadMode {
selIdxReq.Aggregates = e.aggFuncs
selIdxReq.GroupBy = e.byItems
}
fieldTypes := make([]*types.FieldType, len(e.indexPlan.Index.Columns))
for i, v := range e.indexPlan.Index.Columns {
fieldTypes := make([]*types.FieldType, len(e.index.Columns))
for i, v := range e.index.Columns {
fieldTypes[i] = &(e.table.Cols()[v.Offset].FieldType)
}
sc := e.ctx.GetSessionVars().StmtCtx
keyRanges, err := indexRangesToKVRanges(sc, e.table.Meta().ID, e.indexPlan.Index.ID, e.indexPlan.Ranges, fieldTypes)
keyRanges, err := indexRangesToKVRanges(sc, e.table.Meta().ID, e.index.ID, e.ranges, fieldTypes)
if err != nil {
return nil, errors.Trace(err)
}
return distsql.Select(e.ctx.GetClient(), e.ctx.GoCtx(), selIdxReq, keyRanges, e.scanConcurrency, !e.indexPlan.OutOfOrder)
return distsql.Select(e.ctx.GetClient(), e.ctx.GoCtx(), selIdxReq, keyRanges, e.scanConcurrency, !e.outOfOrder)
}

func (e *XSelectIndexExec) buildTableTasks(handles []int64) []*lookupTableTask {
Expand All @@ -647,7 +653,7 @@ func (e *XSelectIndexExec) buildTableTasks(handles []int64) []*lookupTableTask {
}

var indexOrder map[int64]int
if !e.indexPlan.OutOfOrder {
if !e.outOfOrder {
// Save the index order.
indexOrder = make(map[int64]int, len(handles))
for i, h := range handles {
Expand Down Expand Up @@ -692,10 +698,10 @@ func (e *XSelectIndexExec) executeTask(task *lookupTableTask) error {
if err != nil {
return errors.Trace(err)
}
if !e.indexPlan.OutOfOrder {
if !e.outOfOrder {
// Restore the index order.
sorter := &rowsSorter{order: task.indexOrder, rows: task.rows}
if e.indexPlan.Desc && !e.supportDesc {
if e.desc && !e.supportDesc {
sort.Sort(sort.Reverse(sorter))
} else {
sort.Sort(sorter)
Expand Down Expand Up @@ -744,7 +750,7 @@ func (e *XSelectIndexExec) extractRowsFromPartialResult(t table.Table, partialRe
if err != nil {
return nil, errors.Trace(err)
}
row := resultRowToRow(t, h, values, e.indexPlan.TableAsName)
row := resultRowToRow(t, h, values, e.asName)
rows = append(rows, row)
}
return rows, nil
Expand All @@ -753,18 +759,18 @@ func (e *XSelectIndexExec) extractRowsFromPartialResult(t table.Table, partialRe
func (e *XSelectIndexExec) doTableRequest(handles []int64) (distsql.SelectResult, error) {
// The handles are not in original index order, so we can't push limit here.
selTableReq := new(tipb.SelectRequest)
if e.indexPlan.OutOfOrder {
selTableReq.Limit = e.indexPlan.LimitCount
selTableReq.OrderBy = e.indexPlan.SortItemsPB
if e.outOfOrder {
selTableReq.Limit = e.limitCount
selTableReq.OrderBy = e.sortItemsPB
}
selTableReq.StartTs = e.startTS
selTableReq.TimeZoneOffset = timeZoneOffset()
selTableReq.Flags = statementContextToFlags(e.ctx.GetSessionVars().StmtCtx)
selTableReq.TableInfo = &tipb.TableInfo{
TableId: e.table.Meta().ID,
}
selTableReq.TableInfo.Columns = distsql.ColumnsToProto(e.indexPlan.Columns, e.table.Meta().PKIsHandle)
err := setPBColumnsDefaultValue(e.ctx, selTableReq.TableInfo.Columns, e.indexPlan.Columns)
selTableReq.TableInfo.Columns = distsql.ColumnsToProto(e.columns, e.table.Meta().PKIsHandle)
err := setPBColumnsDefaultValue(e.ctx, selTableReq.TableInfo.Columns, e.columns)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
13 changes: 7 additions & 6 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,15 +512,16 @@ func (e *SelectionExec) initController() error {
}
x.ranges = ranges
case *XSelectIndexExec:
x.indexPlan.AccessCondition, newConds, _, _ = plan.DetachIndexScanConditions(newConds, x.indexPlan.Index)
idxConds, tblConds := plan.DetachIndexFilterConditions(newConds, x.indexPlan.Index.Columns, x.indexPlan.Table)
x.indexPlan.IndexConditionPBExpr, _, _ = plan.ExpressionsToPB(sc, idxConds, client)
x.indexPlan.TableConditionPBExpr, _, _ = plan.ExpressionsToPB(sc, tblConds, client)
err := plan.BuildIndexRange(sc, x.indexPlan)
accessCondition, newConds, _, accessInAndEqCount := plan.DetachIndexScanConditions(newConds, x.index)
idxConds, tblConds := plan.DetachIndexFilterConditions(newConds, x.index.Columns, x.tableInfo)
x.indexConditionPBExpr, _, _ = plan.ExpressionsToPB(sc, idxConds, client)
tableConditionPBExpr, _, _ := plan.ExpressionsToPB(sc, tblConds, client)
var err error
x.ranges, err = plan.BuildIndexRange(sc, x.tableInfo, x.index, accessInAndEqCount, accessCondition)
if err != nil {
return errors.Trace(err)
}
x.where = x.indexPlan.TableConditionPBExpr
x.where = tableConditionPBExpr
default:
return errors.Errorf("Error type of Executor: %T", x)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/union_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (us *UnionScanExec) buildAndSortAddedRows(t table.Table, asName *model.CISt
if t, ok := us.Src.(*XSelectTableExec); ok {
columns = t.Columns
} else {
columns = us.Src.(*XSelectIndexExec).indexPlan.Columns
columns = us.Src.(*XSelectIndexExec).columns
}
for _, col := range columns {
newData = append(newData, data[col.Offset])
Expand Down
Loading

0 comments on commit c237781

Please sign in to comment.