Skip to content

Commit

Permalink
Merge branch 'master' into read-backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored Jun 5, 2021
2 parents ab6d396 + ce0e3c3 commit 6fb6d1d
Show file tree
Hide file tree
Showing 37 changed files with 1,466 additions and 179 deletions.
19 changes: 18 additions & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,10 +1044,11 @@ func (w *worker) doModifyColumnTypeWithData(
if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil {
logutil.BgLogger().Warn("[ddl] run modify column job failed, RemoveDDLReorgHandle failed, can't convert job to rollback",
zap.String("job", job.String()), zap.Error(err1))
return ver, errors.Trace(err)
}
logutil.BgLogger().Warn("[ddl] run modify column job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err))
job.State = model.JobStateRollingback
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return ver, errors.Trace(err)
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
Expand Down Expand Up @@ -1109,8 +1110,24 @@ func (w *worker) updatePhysicalTableRow(t table.PhysicalTable, oldColInfo, colIn
return w.writePhysicalTableRecord(t.(table.PhysicalTable), typeUpdateColumnWorker, nil, oldColInfo, colInfo, reorgInfo)
}

// TestReorgGoroutineRunning is only used in test to indicate the reorg goroutine has been started.
var TestReorgGoroutineRunning = make(chan interface{})

// updateColumnAndIndexes handles the modify column reorganization state for a table.
func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.ColumnInfo, idxes []*model.IndexInfo, reorgInfo *reorgInfo) error {
failpoint.Inject("mockInfiniteReorgLogic", func(val failpoint.Value) {
if val.(bool) {
a := new(interface{})
TestReorgGoroutineRunning <- a
for {
time.Sleep(30 * time.Millisecond)
if w.reorgCtx.isReorgCanceled() {
// Job is cancelled. So it can't be done.
failpoint.Return(errCancelledDDLJob)
}
}
}
})
// TODO: Support partition tables.
if bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) {
err := w.updatePhysicalTableRow(t.(table.PhysicalTable), oldCol, col, reorgInfo)
Expand Down
63 changes: 63 additions & 0 deletions ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package ddl_test
import (
"context"
"errors"
"strconv"
"sync"
"time"

. "github.com/pingcap/check"
Expand All @@ -42,12 +44,17 @@ import (
)

var _ = SerialSuites(&testColumnTypeChangeSuite{})
var _ = SerialSuites(&testCTCSerialSuiteWrapper{&testColumnTypeChangeSuite{}})

type testColumnTypeChangeSuite struct {
store kv.Storage
dom *domain.Domain
}

type testCTCSerialSuiteWrapper struct {
*testColumnTypeChangeSuite
}

func (s *testColumnTypeChangeSuite) SetUpSuite(c *C) {
var err error
ddl.SetWaitTimeWhenErrorOccurred(1 * time.Microsecond)
Expand Down Expand Up @@ -1884,6 +1891,62 @@ func (s *testColumnTypeChangeSuite) TestChangeIntToBitWillPanicInBackfillIndexes
tk.MustQuery("select * from t").Check(testkit.Rows("\x13 1 1.00", "\x11 2 2.00"))
}

// Close issue #24584
func (s *testColumnTypeChangeSuite) TestCancelCTCInReorgStateWillCauseGoroutineLeak(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
// Enable column change variable.
tk.Se.GetSessionVars().EnableChangeColumnType = true

failpoint.Enable("github.com/pingcap/tidb/ddl/mockInfiniteReorgLogic", `return(true)`)
defer func() {
failpoint.Disable("github.com/pingcap/tidb/ddl/mockInfiniteReorgLogic")
}()

// set ddl hook
originalHook := s.dom.DDL().GetHook()
defer s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook)

tk.MustExec("drop table if exists ctc_goroutine_leak")
tk.MustExec("create table ctc_goroutine_leak (a int)")
tk.MustExec("insert into ctc_goroutine_leak values(1),(2),(3)")
tbl := testGetTableByName(c, tk.Se, "test", "ctc_goroutine_leak")

hook := &ddl.TestDDLCallback{}
var jobID int64
hook.OnJobRunBeforeExported = func(job *model.Job) {
if jobID != 0 {
return
}
if tbl.Meta().ID != job.TableID {
return
}
if job.Query == "alter table ctc_goroutine_leak modify column a tinyint" {
jobID = job.ID
}
}
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)

tk1 := testkit.NewTestKit(c, s.store)
tk1.MustExec("use test")
// Enable column change variable.
tk1.Se.GetSessionVars().EnableChangeColumnType = true
var (
wg = sync.WaitGroup{}
alterErr error
)
wg.Add(1)
go func() {
defer wg.Done()
// This ddl will be hang over in the failpoint loop, waiting for outside cancel.
_, alterErr = tk1.Exec("alter table ctc_goroutine_leak modify column a tinyint")
}()
<-ddl.TestReorgGoroutineRunning
tk.MustExec("admin cancel ddl jobs " + strconv.Itoa(int(jobID)))
wg.Wait()
c.Assert(alterErr.Error(), Equals, "[ddl:8214]Cancelled DDL job")
}

// Close issue #24971, #24973, #24974
func (s *testColumnTypeChangeSuite) TestCTCShouldCastTheDefaultValue(c *C) {
tk := testkit.NewTestKit(c, s.store)
Expand Down
31 changes: 31 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3514,6 +3514,37 @@ out:
tk.MustExec("drop table tnn")
}

func (s *testDBSuite3) TestVirtualColumnDDL(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set tidb_enable_global_temporary_table=true")
tk.MustExec("use test")
tk.MustExec("drop table if exists test_gv_ddl")
tk.MustExec(`create global temporary table test_gv_ddl(a int, b int as (a+8) virtual, c int as (b + 2) stored) on commit delete rows;`)
defer tk.MustExec("drop table if exists test_gv_ddl")
is := tk.Se.(sessionctx.Context).GetInfoSchema().(infoschema.InfoSchema)
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("test_gv_ddl"))
c.Assert(err, IsNil)
testCases := []struct {
generatedExprString string
generatedStored bool
}{
{"", false},
{"`a` + 8", false},
{"`b` + 2", true},
}
for i, column := range table.Meta().Columns {
c.Assert(column.GeneratedExprString, Equals, testCases[i].generatedExprString)
c.Assert(column.GeneratedStored, Equals, testCases[i].generatedStored)
}
result := tk.MustQuery(`DESC test_gv_ddl`)
result.Check(testkit.Rows(`a int(11) YES <nil> `, `b int(11) YES <nil> VIRTUAL GENERATED`, `c int(11) YES <nil> STORED GENERATED`))
tk.MustExec("begin;")
tk.MustExec("insert into test_gv_ddl values (1, default, default)")
tk.MustQuery("select * from test_gv_ddl").Check(testkit.Rows("1 9 11"))
_, err = tk.Exec("commit")
c.Assert(err, IsNil)
}

func (s *testDBSuite3) TestGeneratedColumnDDL(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
34 changes: 34 additions & 0 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,40 @@ func (rc *reorgCtx) clean() {
rc.doneCh = nil
}

// runReorgJob is used as a portal to do the reorganization work.
// eg:
// 1: add index
// 2: alter column type
// 3: clean global index
//
// ddl goroutine >---------+
// ^ |
// | |
// | |
// | | <---(doneCh)--- f()
// HandleDDLQueue(...) | <---(regular timeout)
// | | <---(ctx done)
// | |
// | |
// A more ddl round <-----+
//
// How can we cancel reorg job?
//
// The background reorg is continuously running except for several factors, for instances, ddl owner change,
// logic error (kv duplicate when insert index / cast error when alter column), ctx done, and cancel signal.
//
// When `admin cancel ddl jobs xxx` takes effect, we will give this kind of reorg ddl one more round.
// because we should pull the result from doneCh out, otherwise, the reorg worker will hang on `f()` logic,
// which is a kind of goroutine leak.
//
// That's why we couldn't set the job to rollingback state directly in `convertJob2RollbackJob`, which is a
// cancelling portal for admin cancel action.
//
// In other words, the cancelling signal is informed from the bottom up, we set the atomic cancel variable
// in the cancelling portal to notify the lower worker goroutine, and fetch the cancel error from them in
// the additional ddl round.
//
// After that, we can make sure that the worker goroutine is correctly shut down.
func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model.TableInfo, lease time.Duration, f func() error) error {
job := reorgInfo.Job
// This is for tests compatible, because most of the early tests try to build the reorg job manually
Expand Down
14 changes: 11 additions & 3 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,15 @@ func convertNotStartAddIdxJob2RollbackJob(t *meta.Meta, job *model.Job, occuredE
// Since modifying column job has two types: normal-type and reorg-type, we should handle it respectively.
// normal-type has only two states: None -> Public
// reorg-type has five states: None -> Delete-only -> Write-only -> Write-org -> Public
func rollingbackModifyColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
func rollingbackModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
// If the value of SnapshotVer isn't zero, it means the reorg workers have been started.
if job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 {
// column type change workers are started. we have to ask them to exit.
logutil.Logger(w.logCtx).Info("[ddl] run the cancelling DDL job", zap.String("job", job.String()))
w.reorgCtx.notifyReorgCancel()
// Give the this kind of ddl one more round to run, the errCancelledDDLJob should be fetched from the bottom up.
return w.onModifyColumn(d, t, job)
}
_, tblInfo, oldCol, jp, err := getModifyColumnInfo(t, job)
if err != nil {
return ver, err
Expand Down Expand Up @@ -138,7 +146,7 @@ func rollingbackModifyColumn(t *meta.Meta, job *model.Job) (ver int64, err error
job.State = model.JobStateCancelled
return ver, errCancelledDDLJob
}
// The job has been in it's middle state and we roll it back.
// The job has been in its middle state (but the reorg worker hasn't started) and we roll it back here.
job.State = model.JobStateRollingback
return ver, errCancelledDDLJob
}
Expand Down Expand Up @@ -424,7 +432,7 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
case model.ActionTruncateTable:
ver, err = rollingbackTruncateTable(t, job)
case model.ActionModifyColumn:
ver, err = rollingbackModifyColumn(t, job)
ver, err = rollingbackModifyColumn(w, d, t, job)
case model.ActionRebaseAutoID, model.ActionShardRowID, model.ActionAddForeignKey,
model.ActionDropForeignKey, model.ActionRenameTable, model.ActionRenameTables,
model.ActionModifyTableCharsetAndCollate, model.ActionTruncateTablePartition,
Expand Down
4 changes: 1 addition & 3 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,7 @@ func (s *testSuite1) TestAnalyzeTooLongColumns(c *C) {
}

func (s *testSuite1) TestAnalyzeIndexExtractTopN(c *C) {
if israce.RaceEnabled {
c.Skip("unstable, skip race test")
}
c.Skip("unstable, skip it and fix it before 20210618")
store, err := mockstore.NewMockStore()
c.Assert(err, IsNil)
defer func() {
Expand Down
2 changes: 2 additions & 0 deletions executor/merge_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ func (s *testSerialSuite1) TestShuffleMergeJoinInDisk(c *C) {
c.Assert(tk.Se.GetSessionVars().StmtCtx.DiskTracker.MaxConsumed(), Greater, int64(0))
}
func (s *testSerialSuite1) TestMergeJoinInDisk(c *C) {
c.Skip("unstable, skip it and fix it before 20210618")

defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.OOMUseTmpStorage = true
Expand Down
19 changes: 19 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1444,4 +1444,23 @@ func (s *testSerialSuite) TestSetTopSQLVariables(c *C) {
c.Assert(err.Error(), Equals, "[variable:1231]Variable 'tidb_top_sql_max_statement_count' can't be set to the value of '5001'")
tk.MustQuery("select @@global.tidb_top_sql_precision_seconds;").Check(testkit.Rows("2"))
c.Assert(variable.TopSQLVariable.MaxStatementCount.Load(), Equals, int64(2))

tk.MustExec("set @@tidb_top_sql_report_interval_seconds=10;")
tk.MustQuery("select @@tidb_top_sql_report_interval_seconds;").Check(testkit.Rows("10"))
c.Assert(variable.TopSQLVariable.ReportIntervalSeconds.Load(), Equals, int64(10))
_, err = tk.Exec("set @@tidb_top_sql_report_interval_seconds='abc';")
c.Assert(err.Error(), Equals, "[variable:1232]Incorrect argument type to variable 'tidb_top_sql_report_interval_seconds'")
_, err = tk.Exec("set @@tidb_top_sql_report_interval_seconds='5000';")
c.Assert(err.Error(), Equals, "[variable:1231]Variable 'tidb_top_sql_report_interval_seconds' can't be set to the value of '5000'")
tk.MustQuery("select @@tidb_top_sql_report_interval_seconds;").Check(testkit.Rows("10"))
c.Assert(variable.TopSQLVariable.ReportIntervalSeconds.Load(), Equals, int64(10))
tk.MustExec("set @@global.tidb_top_sql_report_interval_seconds=120;")
tk.MustQuery("select @@global.tidb_top_sql_report_interval_seconds;").Check(testkit.Rows("120"))
c.Assert(variable.TopSQLVariable.ReportIntervalSeconds.Load(), Equals, int64(120))
_, err = tk.Exec("set @@global.tidb_top_sql_report_interval_seconds='abc';")
c.Assert(err.Error(), Equals, "[variable:1232]Incorrect argument type to variable 'tidb_top_sql_report_interval_seconds'")
_, err = tk.Exec("set @@global.tidb_top_sql_report_interval_seconds='5000';")
c.Assert(err.Error(), Equals, "[variable:1231]Variable 'tidb_top_sql_report_interval_seconds' can't be set to the value of '5000'")
tk.MustQuery("select @@global.tidb_top_sql_report_interval_seconds;").Check(testkit.Rows("120"))
c.Assert(variable.TopSQLVariable.ReportIntervalSeconds.Load(), Equals, int64(120))
}
45 changes: 45 additions & 0 deletions expression/expr_to_pb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,51 @@ func (s *testEvaluatorSuite) TestExprPushDownToFlash(c *C) {
c.Assert(err, IsNil)
exprs = append(exprs, function)

// sqrt
function, err = NewFunction(mock.NewContext(), ast.Sqrt, types.NewFieldType(mysql.TypeDouble), realColumn)
c.Assert(err, IsNil)
exprs = append(exprs, function)

// ScalarFuncSig_CeilReal
function, err = NewFunction(mock.NewContext(), ast.Ceil, types.NewFieldType(mysql.TypeDouble), realColumn)
c.Assert(err, IsNil)
exprs = append(exprs, function)

// ScalarFuncSig_CeilIntToInt
function, err = NewFunction(mock.NewContext(), ast.Ceil, types.NewFieldType(mysql.TypeLonglong), intColumn)
c.Assert(err, IsNil)
exprs = append(exprs, function)

// ScalarFuncSig_CeilDecimalToInt
function, err = NewFunction(mock.NewContext(), ast.Ceil, types.NewFieldType(mysql.TypeLonglong), decimalColumn)
c.Assert(err, IsNil)
exprs = append(exprs, function)

// ScalarFuncSig_CeilDecimalToDecimal
function, err = NewFunction(mock.NewContext(), ast.Ceil, types.NewFieldType(mysql.TypeNewDecimal), decimalColumn)
c.Assert(err, IsNil)
exprs = append(exprs, function)

// ScalarFuncSig_FloorReal
function, err = NewFunction(mock.NewContext(), ast.Floor, types.NewFieldType(mysql.TypeDouble), realColumn)
c.Assert(err, IsNil)
exprs = append(exprs, function)

// ScalarFuncSig_FloorIntToInt
function, err = NewFunction(mock.NewContext(), ast.Floor, types.NewFieldType(mysql.TypeLonglong), intColumn)
c.Assert(err, IsNil)
exprs = append(exprs, function)

// ScalarFuncSig_FloorDecimalToInt
function, err = NewFunction(mock.NewContext(), ast.Floor, types.NewFieldType(mysql.TypeLonglong), decimalColumn)
c.Assert(err, IsNil)
exprs = append(exprs, function)

// ScalarFuncSig_FloorDecimalToDecimal
function, err = NewFunction(mock.NewContext(), ast.Floor, types.NewFieldType(mysql.TypeNewDecimal), decimalColumn)
c.Assert(err, IsNil)
exprs = append(exprs, function)

// Replace
function, err = NewFunction(mock.NewContext(), ast.Replace, types.NewFieldType(mysql.TypeString), stringColumn, stringColumn, stringColumn)
c.Assert(err, IsNil)
Expand Down
8 changes: 8 additions & 0 deletions expression/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,13 @@ func scalarExprSupportedByTiKV(sf *ScalarFunction) bool {

func scalarExprSupportedByFlash(function *ScalarFunction) bool {
switch function.FuncName.L {
case ast.Floor, ast.Ceil, ast.Ceiling:
switch function.Function.PbCode() {
case tipb.ScalarFuncSig_FloorIntToDec, tipb.ScalarFuncSig_CeilIntToDec:
return false
default:
return true
}
case
ast.LogicOr, ast.LogicAnd, ast.UnaryNot, ast.BitNeg, ast.Xor, ast.And, ast.Or,
ast.GE, ast.LE, ast.EQ, ast.NE, ast.LT, ast.GT, ast.In, ast.IsNull, ast.Like,
Expand All @@ -1004,6 +1011,7 @@ func scalarExprSupportedByFlash(function *ScalarFunction) bool {
ast.Concat, ast.ConcatWS,
ast.Year, ast.Month, ast.Day,
ast.DateDiff, ast.TimestampDiff, ast.DateFormat, ast.FromUnixTime,
ast.Sqrt,
ast.JSONLength:
return true
case ast.Substr, ast.Substring, ast.Left, ast.Right, ast.CharLength:
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/golang/protobuf v1.3.4
github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf
github.com/google/btree v1.0.0
github.com/google/go-cmp v0.5.2 // indirect
github.com/google/go-cmp v0.5.5 // indirect
github.com/google/pprof v0.0.0-20200407044318-7d83b28da2e9
github.com/google/uuid v1.1.1
github.com/gorilla/mux v1.7.4
Expand All @@ -43,12 +43,12 @@ require (
github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20210531063847-f42e582bf0bb
github.com/pingcap/kvproto v0.0.0-20210602120243-804ac0a6ce21
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4
github.com/pingcap/parser v0.0.0-20210525032559-c37778aff307
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible
github.com/pingcap/tipb v0.0.0-20210601083426-79a378b6d1c4
github.com/pingcap/tipb v0.0.0-20210603161937-cfb5a9225f95
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
Expand All @@ -62,6 +62,7 @@ require (
github.com/uber-go/atomic v1.4.0
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.4.0+incompatible // indirect
github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f
github.com/xitongsys/parquet-go v1.5.5-0.20201110004701-b09c49d6d457 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20200824191128-ae9734ed278b
go.uber.org/atomic v1.7.0
Expand Down
Loading

0 comments on commit 6fb6d1d

Please sign in to comment.