Skip to content

Commit

Permalink
ddl: Remove the EncodeRecordKey call and add more log message (pingca…
Browse files Browse the repository at this point in the history
…p#1715)

* ddl: update test
  • Loading branch information
zimulala authored Sep 12, 2016
1 parent 6dec141 commit e800cf0
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 15 deletions.
3 changes: 2 additions & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI
}

func (d *ddl) backfillColumnData(t table.Table, columnInfo *model.ColumnInfo, handles []int64, reorgInfo *reorgInfo) error {
log.Infof("[ddl] backfill column handles %v", len(handles))
var (
defaultVal types.Datum
err error
Expand All @@ -339,7 +340,7 @@ func (d *ddl) backfillColumnData(t table.Table, columnInfo *model.ColumnInfo, ha
colMap[col.ID] = &col.FieldType
}
for _, handle := range handles {
log.Info("[ddl] backfill column...", handle)
log.Debug("[ddl] backfill column...", handle)
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
if err := d.isReorgRunnable(txn, ddlJobFlag); err != nil {
return errors.Trace(err)
Expand Down
3 changes: 2 additions & 1 deletion ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ func (s *testDBSuite) SetUpSuite(c *C) {
}

func (s *testDBSuite) TearDownSuite(c *C) {
s.db.Close()
localstore.MockRemoteStore = false

s.db.Close()
s.s.Close()
}

Expand Down
21 changes: 9 additions & 12 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) error {
}
}

func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo *model.IndexInfo) ([]types.Datum, error) {
func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo *model.IndexInfo) (
kv.Key, []types.Datum, error) {
// fetch datas
cols := t.Cols()
colMap := make(map[int64]*types.FieldType)
Expand All @@ -321,18 +322,18 @@ func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo
rowKey := tablecodec.EncodeRecordKey(t.RecordPrefix(), handle)
rowVal, err := txn.Get(rowKey)
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}
row, err := tablecodec.DecodeRow(rowVal, colMap)
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}
vals := make([]types.Datum, 0, len(indexInfo.Columns))
for _, v := range indexInfo.Columns {
col := cols[v.Offset]
vals = append(vals, row[col.ID])
}
return vals, nil
return rowKey, vals, nil
}

const maxBatchSize = 1024
Expand Down Expand Up @@ -365,22 +366,19 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo

func (d *ddl) getSnapshotRows(t table.Table, version uint64, seekHandle int64) ([]int64, error) {
ver := kv.Version{Ver: version}

snap, err := d.store.GetSnapshot(ver)
if err != nil {
return nil, errors.Trace(err)
}

firstKey := t.RecordKey(seekHandle)

it, err := snap.Seek(firstKey)
if err != nil {
return nil, errors.Trace(err)
}
defer it.Close()

handles := make([]int64, 0, maxBatchSize)

for it.Valid() {
if !it.Key().HasPrefix(t.RecordPrefix()) {
break
Expand All @@ -392,13 +390,12 @@ func (d *ddl) getSnapshotRows(t table.Table, version uint64, seekHandle int64) (
return nil, errors.Trace(err)
}

rk := t.RecordKey(handle)

handles = append(handles, handle)
if len(handles) == maxBatchSize {
break
}

rk := t.RecordKey(handle)
err = kv.NextUntil(it, util.RowKeyPrefixFilter(rk))
if terror.ErrorEqual(err, kv.ErrNotExist) {
break
Expand All @@ -412,16 +409,17 @@ func (d *ddl) getSnapshotRows(t table.Table, version uint64, seekHandle int64) (

func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, handles []int64, reorgInfo *reorgInfo) error {
kvX := tables.NewIndex(t.Meta(), indexInfo)
log.Infof("[ddl] backfill index %v rows ", len(handles))

for _, handle := range handles {
log.Debug("[ddl] building index...", handle)
log.Debug("[ddl] backfill index...", handle)

err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
if err := d.isReorgRunnable(txn, ddlJobFlag); err != nil {
return errors.Trace(err)
}

vals, err1 := fetchRowColVals(txn, t, handle, indexInfo)
rowKey, vals, err1 := fetchRowColVals(txn, t, handle, indexInfo)
if terror.ErrorEqual(err1, kv.ErrNotExist) {
// row doesn't exist, skip it.
return nil
Expand All @@ -437,7 +435,6 @@ func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, hand
// index already exists, skip it.
return nil
}
rowKey := tablecodec.EncodeRecordKey(t.RecordPrefix(), handle)
err1 = txn.LockKeys(rowKey)
if err1 != nil {
return errors.Trace(err1)
Expand Down
6 changes: 5 additions & 1 deletion ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,15 @@ func (d *ddl) runReorgJob(f func() error) error {
// wait reorganization job done or timeout
select {
case err := <-d.reorgDoneCh:
log.Info("[ddl] run reorg job done")
d.reorgDoneCh = nil
return errors.Trace(err)
case <-d.quitCh:
log.Info("[ddl] run reorg job ddl quit")
// we return errWaitReorgTimeout here too, so that outer loop will break.
return errWaitReorgTimeout
case <-time.After(waitTimeout):
log.Infof("[ddl] run reorg job wait timeout :%v", waitTimeout)
// if timeout, we will return, check the owner and retry to wait job done again.
return errWaitReorgTimeout
}
Expand Down Expand Up @@ -174,8 +177,8 @@ func (d *ddl) delKeysWithPrefix(prefix kv.Key, jobType JobType) error {
if err != nil {
return errors.Trace(err)
}

defer iter.Close()

for i := 0; i < maxBatchSize; i++ {
if iter.Valid() && iter.Key().HasPrefix(prefix) {
keys = append(keys, iter.Key().Clone())
Expand All @@ -188,6 +191,7 @@ func (d *ddl) delKeysWithPrefix(prefix kv.Key, jobType JobType) error {
}
}

log.Infof("[ddl] delete %v keys with prefix %q", len(keys), prefix)
for _, key := range keys {
err := txn.Delete(key)
// must skip ErrNotExist
Expand Down

0 comments on commit e800cf0

Please sign in to comment.