Skip to content

Commit

Permalink
Merge branch 'master' into TiDB-3014
Browse files Browse the repository at this point in the history
  • Loading branch information
xiekeyi98 authored Mar 18, 2019
2 parents 9fae8cd + 988ffd0 commit d39dad9
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 52 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ build:
# Install the check tools.
check-setup:tools/bin/revive tools/bin/goword tools/bin/gometalinter tools/bin/gosec

check: fmt errcheck lint tidy check-static
check: fmt errcheck lint tidy check-static vet

# These need to be fixed before they can be ran regularly
check-fail: goword check-slow
Expand Down Expand Up @@ -100,7 +100,7 @@ lint:tools/bin/revive

vet:
@echo "vet"
$(GO) vet -all -shadow $(PACKAGES) 2>&1 | $(FAIL_ON_STDOUT)
$(GO) vet -all $(PACKAGES) 2>&1 | $(FAIL_ON_STDOUT)

tidy:
@echo "go mod tidy"
Expand Down
15 changes: 8 additions & 7 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
// If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update
// the newest schema.
w.waitSchemaChanged(nil, d, waitTime, schemaVer, job)
if job.IsSynced() {
if job.IsSynced() || job.IsCancelled() {
asyncNotify(d.ddlJobDoneCh)
}
}
Expand Down Expand Up @@ -536,15 +536,16 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,

// Save errors in job, so that others can know errors happened.
if err != nil {
// If job is not cancelled, we should log this error.
if job.State != model.JobStateCancelled {
log.Errorf("[ddl-%s] run DDL job err %v", w, errors.ErrorStack(err))
} else {
job.Error = toTError(err)
job.ErrorCount++

// If job is cancelled, we shouldn't return an error and shouldn't load DDL variables.
if job.State == model.JobStateCancelled {
log.Infof("[ddl-%s] the DDL job is normal to cancel because %v", w, err)
return ver, nil
}
log.Errorf("[ddl-%s] run DDL job err %v", w, errors.ErrorStack(err))

job.Error = toTError(err)
job.ErrorCount++
// Load global ddl variables.
if err1 := loadDDLVars(w); err1 != nil {
log.Errorf("[ddl-%s] load ddl global variable error: %v", w, err1)
Expand Down
32 changes: 32 additions & 0 deletions ddl/serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
gofail "github.com/pingcap/gofail/runtime"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/admin"
Expand Down Expand Up @@ -469,3 +472,32 @@ func (s *testSerialSuite) TestCancelJobByErrorCountLimit(c *C) {
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
}

func (s *testSerialSuite) TestCanceledJobTakeTime(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t_cjtt(a int)")

hook := &ddl.TestDDLCallback{}
once := sync.Once{}
hook.OnJobUpdatedExported = func(job *model.Job) {
once.Do(func() {
err := kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
return t.DropTableOrView(job.SchemaID, job.TableID, true)
})
c.Assert(err, IsNil)
})
}
origHook := s.dom.DDL().GetHook()
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
defer s.dom.DDL().(ddl.DDLForTest).SetHook(origHook)

originalWT := ddl.WaitTimeWhenErrorOccured
ddl.WaitTimeWhenErrorOccured = 1 * time.Second
defer func() { ddl.WaitTimeWhenErrorOccured = originalWT }()
startTime := time.Now()
assertErrorCode(c, tk, "alter table t_cjtt add column b int", mysql.ErrNoSuchTable)
sub := time.Since(startTime)
c.Assert(sub, Less, ddl.WaitTimeWhenErrorOccured)
}
46 changes: 23 additions & 23 deletions domain/schema_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func (*testSuite) TestSchemaValidator(c *C) {
delay := time.Duration(100+rand.Intn(900)) * time.Microsecond
time.Sleep(delay)
// Reload can run arbitrarily, at any time.
reload(validator, leaseGrantCh, 0)
item := <-leaseGrantCh
validator.Update(item.leaseGrantTS, item.oldVer, item.schemaVer, nil)
}

// Take a lease, check it's valid.
Expand All @@ -63,33 +64,24 @@ func (*testSuite) TestSchemaValidator(c *C) {
c.Assert(valid, Equals, ResultUnknown)
validator.Restart()

// Sleep for a long time, check schema is invalid.
<-oracleCh // Make sure that ts has timed out a lease.
time.Sleep(lease)
ts := <-oracleCh
// Increase the current time by 2 leases, check schema is invalid.
ts := uint64(time.Now().Add(2 * lease).UnixNano()) // Make sure that ts has timed out a lease.
valid = validator.Check(ts, item.schemaVer, []int64{10})
c.Assert(valid, Equals, ResultUnknown, Commentf("validator latest schema ver %v, time %v, item schema ver %v, ts %v",
validator.latestSchemaVer, validator.latestSchemaExpire, item.schemaVer, oracle.GetTimeFromTS(ts)))

currVer := reload(validator, leaseGrantCh, 0)
validator.latestSchemaVer, validator.latestSchemaExpire, 0, oracle.GetTimeFromTS(ts)))
// Make sure newItem's version is greater than item.schema.
newItem := getGreaterVersionItem(c, lease, leaseGrantCh, item.schemaVer)
currVer := newItem.schemaVer
validator.Update(newItem.leaseGrantTS, newItem.oldVer, currVer, nil)
valid = validator.Check(ts, item.schemaVer, nil)
c.Assert(valid, Equals, ResultFail, Commentf("currVer %d, newItem %v", currVer, item))
valid = validator.Check(ts, item.schemaVer, []int64{0})
c.Assert(valid, Equals, ResultFail, Commentf("currVer %d, newItem %v", currVer, item))
// Check the latest schema version must changed.
c.Assert(item.schemaVer, Less, validator.latestSchemaVer)

// Make sure newItem's version is bigger than currVer.
var newItem leaseGrantItem
for i := 0; i < 10; i++ {
time.Sleep(lease / 2)
newItem = <-leaseGrantCh
if newItem.schemaVer > currVer {
break
}
}
c.Assert(newItem.schemaVer, Greater, currVer, Commentf("currVer %d, newItem %v", currVer, newItem))

// Make sure newItem's version is greater than currVer.
newItem = getGreaterVersionItem(c, lease, leaseGrantCh, currVer)
// Update current schema version to newItem's version and the delta table IDs is 1, 2, 3.
validator.Update(ts, currVer, newItem.schemaVer, []int64{1, 2, 3})
// Make sure the updated table IDs don't be covered with the same schema version.
Expand All @@ -111,10 +103,18 @@ func (*testSuite) TestSchemaValidator(c *C) {
wg.Wait()
}

func reload(validator SchemaValidator, leaseGrantCh chan leaseGrantItem, ids ...int64) int64 {
item := <-leaseGrantCh
validator.Update(item.leaseGrantTS, item.oldVer, item.schemaVer, ids)
return item.schemaVer
func getGreaterVersionItem(c *C, lease time.Duration, leaseGrantCh chan leaseGrantItem, currVer int64) leaseGrantItem {
var newItem leaseGrantItem
for i := 0; i < 10; i++ {
time.Sleep(lease / 2)
newItem = <-leaseGrantCh
if newItem.schemaVer > currVer {
break
}
}
c.Assert(newItem.schemaVer, Greater, currVer, Commentf("currVer %d, newItem %v", currVer, newItem))

return newItem
}

// serverFunc plays the role as a remote server, runs in a separate goroutine.
Expand Down
4 changes: 2 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,10 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
execDetail := sessVars.StmtCtx.GetExecDetails()
if costTime < threshold {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryZapLogger.Debug(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, sql))
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, sql))
} else {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryZapLogger.Warn(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, sql))
logutil.SlowQueryLogger.Warn(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, sql))
metrics.TotalQueryProcHistogram.Observe(costTime.Seconds())
metrics.TotalCopProcHistogram.Observe(execDetail.ProcessTime.Seconds())
metrics.TotalCopWaitHistogram.Observe(execDetail.WaitTime.Seconds())
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ github.com/blacktear23/go-proxyprotocol v0.0.0-20180807104634-af7a81e8dd0d h1:rQ
github.com/blacktear23/go-proxyprotocol v0.0.0-20180807104634-af7a81e8dd0d/go.mod h1:VKt7CNAQxpFpSDz3sXyj9hY/GbVsQCr0sB3w59nE7lU=
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
Expand Down
2 changes: 1 addition & 1 deletion plugin/conn_ip_example/conn_ip_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
)

func ExampleLoadRunShutdownPlugin() {
func LoadRunShutdownPluginExample() {
ctx := context.Background()
var pluginVarNames []string
cfg := plugin.Config{
Expand Down
8 changes: 5 additions & 3 deletions table/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package table

import (
"context"
"strings"
"time"
"unicode/utf8"
Expand All @@ -34,8 +35,9 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/timeutil"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

// Column provides meta data describing a table column.
Expand Down Expand Up @@ -139,7 +141,7 @@ func CastValues(ctx sessionctx.Context, rec []types.Datum, cols []*Column) (err
if err != nil {
if sc.DupKeyAsWarning {
sc.AppendWarning(err)
log.Warnf("cast values failed:%v", err)
logutil.Logger(context.Background()).Warn("CastValues failed", zap.Error(err))
} else {
return errors.Trace(err)
}
Expand All @@ -152,7 +154,7 @@ func CastValues(ctx sessionctx.Context, rec []types.Datum, cols []*Column) (err
func handleWrongUtf8Value(ctx sessionctx.Context, col *model.ColumnInfo, casted *types.Datum, str string, i int) (types.Datum, error) {
sc := ctx.GetSessionVars().StmtCtx
err := ErrTruncateWrongValue.FastGen("incorrect utf8 value %x(%s) for column %s", casted.GetBytes(), str, col.Name)
log.Errorf("con:%d %v", ctx.GetSessionVars().ConnectionID, err)
logutil.Logger(context.Background()).Error("incorrect UTF-8 value", zap.Uint64("conn", ctx.GetSessionVars().ConnectionID), zap.Error(err))
// Truncate to valid utf8 string.
truncateVal := types.NewStringDatum(str[:i])
err = sc.HandleTruncate(err)
Expand Down
16 changes: 9 additions & 7 deletions table/tables/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package tables

import (
"bytes"
"context"
"fmt"
"sort"
"strings"
Expand All @@ -28,8 +29,9 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mock"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

// Both partition and partitionedTable implement the table.Table interface.
Expand Down Expand Up @@ -154,7 +156,7 @@ func generatePartitionExpr(tblInfo *model.TableInfo) (*PartitionExpr, error) {
exprs, err := expression.ParseSimpleExprsWithSchema(ctx, buf.String(), schema)
if err != nil {
// If it got an error here, ddl may hang forever, so this error log is important.
log.Error("wrong table partition expression:", errors.ErrorStack(err), buf.String())
logutil.Logger(context.Background()).Error("wrong table partition expression", zap.String("expression", buf.String()), zap.Error(err))
return nil, errors.Trace(err)
}
locateExprs = append(locateExprs, exprs[0])
Expand All @@ -172,14 +174,14 @@ func generatePartitionExpr(tblInfo *model.TableInfo) (*PartitionExpr, error) {
}
}
if column == nil {
log.Warnf("partition pruning won't work on this expr:%s", partStr)
logutil.Logger(context.Background()).Warn("partition pruning not applicable", zap.String("expression", partStr))
}
}

exprs, err = expression.ParseSimpleExprsWithSchema(ctx, buf.String(), schema)
if err != nil {
// If it got an error here, ddl may hang forever, so this error log is important.
log.Error("wrong table partition expression:", errors.ErrorStack(err), buf.String())
logutil.Logger(context.Background()).Error("wrong table partition expression", zap.String("expression", buf.String()), zap.Error(err))
return nil, errors.Trace(err)
}
partitionPruneExprs = append(partitionPruneExprs, exprs[0])
Expand Down Expand Up @@ -207,7 +209,7 @@ func generateHashPartitionExpr(tblInfo *model.TableInfo) (*PartitionExpr, error)
exprs, err := expression.ParseSimpleExprsWithSchema(ctx, buf.String(), schema)
if err != nil {
// If it got an error here, ddl may hang forever, so this error log is important.
log.Error("wrong table partition expression:", errors.ErrorStack(err), buf.String())
logutil.Logger(context.Background()).Error("wrong table partition expression", zap.String("expression", buf.String()), zap.Error(err))
return nil, errors.Trace(err)
}
partitionPruneExprs = append(partitionPruneExprs, exprs[0])
Expand All @@ -216,7 +218,7 @@ func generateHashPartitionExpr(tblInfo *model.TableInfo) (*PartitionExpr, error)
exprs, err := expression.ParseSimpleExprsWithSchema(ctx, pi.Expr, schema)
if err != nil {
// If it got an error here, ddl may hang forever, so this error log is important.
log.Error("wrong table partition expression:", errors.ErrorStack(err), pi.Expr)
logutil.Logger(context.Background()).Error("wrong table partition expression", zap.String("expression", pi.Expr), zap.Error(err))
return nil, errors.Trace(err)
}
if col, ok := exprs[0].(*expression.Column); ok {
Expand Down Expand Up @@ -376,7 +378,7 @@ func (t *partitionedTable) UpdateRecord(ctx sessionctx.Context, h int64, currDat
// unlikely to happen in step2.
err = t.GetPartition(from).RemoveRecord(ctx, h, currData)
if err != nil {
log.Error("partition update record error, it may write dirty data to txn:", errors.ErrorStack(err))
logutil.Logger(context.Background()).Error("update partition record fails", zap.String("message", "new record inserted while old record is not removed"), zap.Error(err))
return errors.Trace(err)
}
return nil
Expand Down
11 changes: 6 additions & 5 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/logutil"
binlog "github.com/pingcap/tipb/go-binlog"
log "github.com/sirupsen/logrus"
"github.com/spaolacci/murmur3"
"go.uber.org/zap"
)

// tableCommon is shared by both Table and partition.
Expand Down Expand Up @@ -109,7 +110,7 @@ func TableFromMeta(alloc autoid.Allocator, tblInfo *model.TableInfo) (table.Tabl

// Print some information when the column's offset isn't equal to i.
if colInfo.Offset != i {
log.Errorf("[tables] table %#v schema is wrong, no.%d col %#v, cols len %v", tblInfo, i, tblInfo.Columns[i], colsLen)
logutil.Logger(context.Background()).Error("wrong table schema", zap.Any("table", tblInfo), zap.Any("column", colInfo), zap.Int("index", i), zap.Int("offset", colInfo.Offset), zap.Int("columnNumber", colsLen))
}

col := table.ToColumn(colInfo)
Expand Down Expand Up @@ -783,14 +784,14 @@ func (t *tableCommon) removeRowIndices(ctx sessionctx.Context, h int64, rec []ty
for _, v := range t.DeletableIndices() {
vals, err := v.FetchValues(rec, nil)
if err != nil {
log.Infof("remove row index %v failed %v, txn %d, handle %d, data %v", v.Meta(), err, txn.StartTS(), h, rec)
logutil.Logger(context.Background()).Info("remove row index failed", zap.Any("index", v.Meta()), zap.Uint64("txnStartTS", txn.StartTS()), zap.Int64("handle", h), zap.Any("record", rec), zap.Error(err))
return errors.Trace(err)
}
if err = v.Delete(ctx.GetSessionVars().StmtCtx, txn, vals, h, txn); err != nil {
if v.Meta().State != model.StatePublic && kv.ErrNotExist.Equal(err) {
// If the index is not in public state, we may have not created the index,
// or already deleted the index, so skip ErrNotExist error.
log.Debugf("remove row index %v doesn't exist, txn %d, handle %d", v.Meta(), txn.StartTS(), h)
logutil.Logger(context.Background()).Debug("row index not exists", zap.Any("index", v.Meta()), zap.Uint64("txnStartTS", txn.StartTS()), zap.Int64("handle", h))
continue
}
return errors.Trace(err)
Expand Down Expand Up @@ -844,7 +845,7 @@ func (t *tableCommon) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols
return nil
}

log.Debugf("startKey:%q, key:%q, value:%q", startKey, it.Key(), it.Value())
logutil.Logger(context.Background()).Debug("iterate records", zap.ByteString("startKey", startKey), zap.ByteString("key", it.Key()), zap.ByteString("value", it.Value()))

colMap := make(map[int64]*types.FieldType)
for _, col := range cols {
Expand Down
Loading

0 comments on commit d39dad9

Please sign in to comment.