Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix alter add index on virtual column bug (#7575) #8655

Merged
merged 2 commits into from
Dec 12, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 49 additions & 25 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
Expand All @@ -35,6 +36,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/timeutil"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -439,22 +441,22 @@ type indexRecord struct {
}

type addIndexWorker struct {
id int
ddlWorker *worker
batchCnt int
sessCtx sessionctx.Context
taskCh chan *reorgIndexTask
resultCh chan *addIndexResult
index table.Index
table table.Table
colFieldMap map[int64]*types.FieldType
closed bool
priority int
id int
ddlWorker *worker
batchCnt int
sessCtx sessionctx.Context
taskCh chan *reorgIndexTask
resultCh chan *addIndexResult
index table.Index
table table.Table
closed bool
priority int

// The following attributes are used to reduce memory allocation.
defaultVals []types.Datum
idxRecords []*indexRecord
rowMap map[int64]types.Datum
rowDecoder decoder.RowDecoder
idxKeyBufs [][]byte
batchCheckKeys []kv.Key
distinctCheckFlags []bool
Expand Down Expand Up @@ -493,8 +495,9 @@ func mergeAddIndexCtxToResult(taskCtx *addIndexTaskContext, result *addIndexResu
result.scanCount += taskCtx.scanCount
}

func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, indexInfo *model.IndexInfo, colFieldMap map[int64]*types.FieldType) *addIndexWorker {
func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, indexInfo *model.IndexInfo, decodeColMap map[int64]decoder.Column) *addIndexWorker {
index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo)
rowDecoder := decoder.NewRowDecoder(t.Cols(), decodeColMap)
return &addIndexWorker{
id: id,
ddlWorker: worker,
Expand All @@ -504,10 +507,10 @@ func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t tab
resultCh: make(chan *addIndexResult, 1),
index: index,
table: t,
colFieldMap: colFieldMap,
rowDecoder: rowDecoder,
priority: kv.PriorityLow,
defaultVals: make([]types.Datum, len(t.Cols())),
rowMap: make(map[int64]types.Datum, len(colFieldMap)),
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
}
}

Expand All @@ -523,7 +526,8 @@ func (w *addIndexWorker) getIndexRecord(handle int64, recordKey []byte, rawRecor
t := w.table
cols := t.Cols()
idxInfo := w.index.Meta()
_, err := tablecodec.DecodeRowWithMap(rawRecord, w.colFieldMap, time.UTC, w.rowMap)
sysZone := timeutil.SystemLocation()
_, err := w.rowDecoder.DecodeAndEvalRowWithMap(w.sessCtx, rawRecord, time.UTC, sysZone, w.rowMap)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -552,9 +556,8 @@ func (w *addIndexWorker) getIndexRecord(handle int64, recordKey []byte, rawRecor

if idxColumnVal.Kind() == types.KindMysqlTime {
t := idxColumnVal.GetMysqlTime()
zone := timeutil.SystemLocation()
if t.Type == mysql.TypeTimestamp && zone != time.UTC {
err := t.ConvertTimeZone(zone, time.UTC)
if t.Type == mysql.TypeTimestamp && sysZone != time.UTC {
err := t.ConvertTimeZone(sysZone, time.UTC)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -847,14 +850,31 @@ func (w *addIndexWorker) run(d *ddlCtx) {
log.Infof("[ddl-reorg] worker[%v] exit", w.id)
}

func makeupIndexColFieldMap(t table.Table, indexInfo *model.IndexInfo) map[int64]*types.FieldType {
func makeupDecodeColMap(sessCtx sessionctx.Context, t table.Table, indexInfo *model.IndexInfo) (map[int64]decoder.Column, error) {
cols := t.Cols()
colFieldMap := make(map[int64]*types.FieldType, len(indexInfo.Columns))
decodeColMap := make(map[int64]decoder.Column, len(indexInfo.Columns))
for _, v := range indexInfo.Columns {
col := cols[v.Offset]
colFieldMap[col.ID] = &col.FieldType
tpExpr := decoder.Column{
Info: col.ToInfo(),
}
if col.IsGenerated() && !col.GeneratedStored {
for _, c := range cols {
if _, ok := col.Dependences[c.Name.L]; ok {
decodeColMap[c.ID] = decoder.Column{
Info: c.ToInfo(),
}
}
}
e, err := expression.ParseSimpleExprCastWithTableInfo(sessCtx, col.GeneratedExprString, t.Meta(), &col.FieldType)
if err != nil {
return nil, errors.Trace(err)
}
tpExpr.GenExpr = e
}
decodeColMap[col.ID] = tpExpr
}
return colFieldMap
return decodeColMap, nil
}

// splitTableRanges uses PD region's key ranges to split the backfilling table key range space,
Expand Down Expand Up @@ -1057,19 +1077,23 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addInd
func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
log.Infof("[ddl-reorg] addTableIndex, job:%s, reorgInfo:%#v", job, reorgInfo)
colFieldMap := makeupIndexColFieldMap(t, indexInfo)
sessCtx := newContext(reorgInfo.d.store)
decodeColMap, err := makeupDecodeColMap(sessCtx, t, indexInfo)
if err != nil {
return errors.Trace(err)
}

// variable.ddlReorgWorkerCounter can be modified by system variable "tidb_ddl_reorg_worker_cnt".
workerCnt := variable.GetDDLReorgWorkerCounter()
idxWorkers := make([]*addIndexWorker, workerCnt)
for i := 0; i < int(workerCnt); i++ {
sessCtx := newContext(reorgInfo.d.store)
idxWorkers[i] = newAddIndexWorker(sessCtx, w, i, t, indexInfo, colFieldMap)
idxWorkers[i] = newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap)
idxWorkers[i].priority = job.Priority
go idxWorkers[i].run(reorgInfo.d)
}
defer closeAddIndexWorkers(idxWorkers)
err := w.buildIndexForReorgInfo(t, idxWorkers, job, reorgInfo)
err = w.buildIndexForReorgInfo(t, idxWorkers, job, reorgInfo)
return errors.Trace(err)
}

Expand Down
27 changes: 27 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,33 @@ func (s *testSuite) TestAdmin(c *C) {
tk.MustExec("ALTER TABLE t1 ADD COLUMN c4 bit(10) default 127;")
tk.MustExec("ALTER TABLE t1 ADD INDEX idx3 (c4);")
tk.MustExec("admin check table t1;")

// For add index on virtual column
tk.MustExec("drop table if exists t1;")
tk.MustExec(`create table t1 (
a int as (JSON_EXTRACT(k,'$.a')),
c double as (JSON_EXTRACT(k,'$.c')),
d decimal(20,10) as (JSON_EXTRACT(k,'$.d')),
e char(10) as (JSON_EXTRACT(k,'$.e')),
f date as (JSON_EXTRACT(k,'$.f')),
g time as (JSON_EXTRACT(k,'$.g')),
h datetime as (JSON_EXTRACT(k,'$.h')),
i timestamp as (JSON_EXTRACT(k,'$.i')),
j year as (JSON_EXTRACT(k,'$.j')),
k json);`)

tk.MustExec("insert into t1 set k='{\"a\": 100,\"c\":1.234,\"d\":1.2340000000,\"e\":\"abcdefg\",\"f\":\"2018-09-28\",\"g\":\"12:59:59\",\"h\":\"2018-09-28 12:59:59\",\"i\":\"2018-09-28 16:40:33\",\"j\":\"2018\"}';")
tk.MustExec("alter table t1 add index idx_a(a);")
tk.MustExec("alter table t1 add index idx_c(c);")
tk.MustExec("alter table t1 add index idx_d(d);")
tk.MustExec("alter table t1 add index idx_e(e);")
tk.MustExec("alter table t1 add index idx_f(f);")
tk.MustExec("alter table t1 add index idx_g(g);")
tk.MustExec("alter table t1 add index idx_h(h);")
tk.MustExec("alter table t1 add index idx_j(j);")
tk.MustExec("alter table t1 add index idx_i(i);")
tk.MustExec("alter table t1 add index idx_m(a,c,d,e,f,g,h,i,j);")
tk.MustExec("admin check table t1;")
}

func (s *testSuite) fillData(tk *testkit.TestKit, table string) {
Expand Down
11 changes: 11 additions & 0 deletions expression/simple_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ func ParseSimpleExprWithTableInfo(ctx sessionctx.Context, exprStr string, tableI
return RewriteSimpleExprWithTableInfo(ctx, tableInfo, expr)
}

// ParseSimpleExprCastWithTableInfo parses simple expression string to Expression.
// And the expr returns will cast to the target type.
func ParseSimpleExprCastWithTableInfo(ctx sessionctx.Context, exprStr string, tableInfo *model.TableInfo, targetFt *types.FieldType) (Expression, error) {
e, err := ParseSimpleExprWithTableInfo(ctx, exprStr, tableInfo)
if err != nil {
return nil, errors.Trace(err)
}
e = BuildCastFunction(ctx, e, targetFt)
return e, nil
}

// RewriteSimpleExprWithTableInfo rewrites simple ast.ExprNode to expression.Expression.
func RewriteSimpleExprWithTableInfo(ctx sessionctx.Context, tbl *model.TableInfo, expr ast.ExprNode) (Expression, error) {
dbName := model.NewCIStr(ctx.GetSessionVars().CurrentDB)
Expand Down
112 changes: 112 additions & 0 deletions util/rowDecoder/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package decoder

import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
)

// Column contains the info and generated expr of column.
type Column struct {
Info *model.ColumnInfo
GenExpr expression.Expression
}

// RowDecoder decodes a byte slice into datums and eval the generated column value.
type RowDecoder struct {
mutRow chunk.MutRow
columns map[int64]Column
colTypes map[int64]*types.FieldType
haveGenColumn bool
}

// NewRowDecoder returns a new RowDecoder.
func NewRowDecoder(cols []*table.Column, decodeColMap map[int64]Column) RowDecoder {
colFieldMap := make(map[int64]*types.FieldType, len(decodeColMap))
haveGenCol := false
for id, col := range decodeColMap {
colFieldMap[id] = &col.Info.FieldType
if col.GenExpr != nil {
haveGenCol = true
}
}
if !haveGenCol {
return RowDecoder{
colTypes: colFieldMap,
}
}

tps := make([]*types.FieldType, len(cols))
for _, col := range cols {
tps[col.Offset] = &col.FieldType
}
return RowDecoder{
mutRow: chunk.MutRowFromTypes(tps),
columns: decodeColMap,
colTypes: colFieldMap,
haveGenColumn: haveGenCol,
}
}

// DecodeAndEvalRowWithMap decodes a byte slice into datums and evaluates the generated column value.
func (rd RowDecoder) DecodeAndEvalRowWithMap(ctx sessionctx.Context, b []byte, decodeLoc, sysLoc *time.Location, row map[int64]types.Datum) (map[int64]types.Datum, error) {
row, err := tablecodec.DecodeRowWithMap(b, rd.colTypes, decodeLoc, row)
if err != nil {
return nil, errors.Trace(err)
}
if !rd.haveGenColumn {
return row, nil
}

for id, v := range row {
rd.mutRow.SetValue(rd.columns[id].Info.Offset, v.GetValue())
}
for id, col := range rd.columns {
if col.GenExpr == nil {
continue
}
// Eval the column value
val, err := col.GenExpr.Eval(rd.mutRow.ToRow())
if err != nil {
return nil, errors.Trace(err)
}
val, err = table.CastValue(ctx, val, col.Info)
if err != nil {
return nil, errors.Trace(err)
}

if val.Kind() == types.KindMysqlTime {
t := val.GetMysqlTime()
if t.Type == mysql.TypeTimestamp && sysLoc != time.UTC {
err := t.ConvertTimeZone(sysLoc, time.UTC)
if err != nil {
return nil, errors.Trace(err)
}
val.SetMysqlTime(t)
}
}
row[id] = val
}
return row, nil
}