Skip to content

Commit

Permalink
*: support adding/dropping the primary key by a configuration (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored and bb7133 committed Nov 14, 2019
1 parent b5bb7fe commit df1baca
Show file tree
Hide file tree
Showing 25 changed files with 1,077 additions and 438 deletions.
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ type Config struct {
Plugin Plugin `toml:"plugin" json:"plugin"`
PessimisticTxn PessimisticTxn `toml:"pessimistic-txn" json:"pessimistic-txn"`
CheckMb4ValueInUTF8 bool `toml:"check-mb4-value-in-utf8" json:"check-mb4-value-in-utf8"`
// AlterPrimaryKey is used to control alter primary key feature.
AlterPrimaryKey bool `toml:"alter-primary-key" json:"alter-primary-key"`
// TreatOldVersionUTF8AsUTF8MB4 is use to treat old version table/column UTF8 charset as UTF8MB4. This is for compatibility.
// Currently not support dynamic modify, because this need to reload all old version schema.
TreatOldVersionUTF8AsUTF8MB4 bool `toml:"treat-old-version-utf8-as-utf8mb4" json:"treat-old-version-utf8-as-utf8mb4"`
Expand Down Expand Up @@ -433,6 +435,7 @@ var defaultConf = Config{
EnableStreaming: false,
EnableBatchDML: false,
CheckMb4ValueInUTF8: true,
AlterPrimaryKey: false,
TreatOldVersionUTF8AsUTF8MB4: true,
EnableTableLock: false,
DelayCleanTableLock: 0,
Expand Down
5 changes: 5 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ delay-clean-table-lock = 0
# Maximum number of the splitting region, which is used by the split region statement.
split-region-max-num = 1000

# alter-primary-key is used to control alter primary key feature. Default is false, indicate the alter primary key feature is disabled.
# If it is true, we can add the primary key by "alter table", but we may not be able to drop the primary key.
# In order to support "drop primary key" operation , this flag must be true and the table does not have the pkIsHandle flag.
alter-primary-key = false

[log]
# Log level: debug, info, warn, error, fatal.
level = "info"
Expand Down
2 changes: 2 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ unrecognized-option-test = true
_, err = f.WriteString(`
token-limit = 0
enable-table-lock = true
alter-primary-key = true
delay-clean-table-lock = 5
split-region-max-num=10000
enable-batch-dml = true
Expand All @@ -201,6 +202,7 @@ max-sql-length=1024

// Test that the value will be overwritten by the config file.
c.Assert(conf.Performance.TxnTotalSizeLimit, Equals, uint64(2000))
c.Assert(conf.AlterPrimaryKey, Equals, true)

c.Assert(conf.TiKVClient.CommitTimeout, Equals, "41s")
c.Assert(conf.TiKVClient.MaxBatchSize, Equals, uint(128))
Expand Down
36 changes: 25 additions & 11 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,12 @@ func (w *worker) doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.Colu
// Column from null to not null.
if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(newCol.Flag) {
noPreventNullFlag := !mysql.HasPreventNullInsertFlag(oldCol.Flag)
// Introduce the `mysql.HasPreventNullInsertFlag` flag to prevent users from inserting or updating null values.
err = modifyColumnFromNull2NotNull(w, t, dbInfo, tblInfo, job, oldCol, newCol)
// Introduce the `mysql.PreventNullInsertFlag` flag to prevent users from inserting or updating null values.
err = modifyColsFromNull2NotNull(w, dbInfo, tblInfo, []*model.ColumnInfo{oldCol}, newCol.Name, oldCol.Tp != newCol.Tp)
if err != nil {
if ErrWarnDataTruncated.Equal(err) || errInvalidUseOfNull.Equal(err) {
job.State = model.JobStateRollingback
}
return ver, err
}
// The column should get into prevent null status first.
Expand Down Expand Up @@ -472,18 +475,26 @@ func (w *worker) doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.Colu

// checkForNullValue ensure there are no null values of the column of this table.
// `isDataTruncated` indicates whether the new field and the old field type are the same, in order to be compatible with mysql.
func checkForNullValue(ctx sessionctx.Context, isDataTruncated bool, schema, table, oldCol, newCol model.CIStr) error {
sql := fmt.Sprintf("select 1 from `%s`.`%s` where `%s` is null limit 1;", schema.L, table.L, oldCol.L)
func checkForNullValue(ctx sessionctx.Context, isDataTruncated bool, schema, table, newCol model.CIStr, oldCols ...*model.ColumnInfo) error {
colsStr := ""
for i, col := range oldCols {
if i == 0 {
colsStr += "`" + col.Name.L + "` is null"
} else {
colsStr += " or `" + col.Name.L + "` is null"
}
}
sql := fmt.Sprintf("select 1 from `%s`.`%s` where %s limit 1;", schema.L, table.L, colsStr)
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
return errors.Trace(err)
}
rowCount := len(rows)
if rowCount != 0 {
if isDataTruncated {
return errInvalidUseOfNull
return ErrWarnDataTruncated.GenWithStackByArgs(newCol.L, rowCount)
}
return ErrWarnDataTruncated.GenWithStackByArgs(newCol.L, rowCount)
return errInvalidUseOfNull
}
return nil
}
Expand Down Expand Up @@ -552,8 +563,10 @@ func rollbackModifyColumnJob(t *meta.Meta, tblInfo *model.TableInfo, job *model.
return ver, nil
}

// modifyColumnFromNull2NotNull modifies the type definitions of 'null' to 'not null'.
func modifyColumnFromNull2NotNull(w *worker, t *meta.Meta, dbInfo *model.DBInfo, tblInfo *model.TableInfo, job *model.Job, oldCol, newCol *model.ColumnInfo) error {
// modifyColsFromNull2NotNull modifies the type definitions of 'null' to 'not null'.
// Introduce the `mysql.PreventNullInsertFlag` flag to prevent users from inserting or updating null values.
func modifyColsFromNull2NotNull(w *worker, dbInfo *model.DBInfo, tblInfo *model.TableInfo, cols []*model.ColumnInfo,
newColName model.CIStr, isModifiedType bool) error {
// Get sessionctx from context resource pool.
var ctx sessionctx.Context
ctx, err := w.sessPool.get()
Expand All @@ -563,14 +576,15 @@ func modifyColumnFromNull2NotNull(w *worker, t *meta.Meta, dbInfo *model.DBInfo,
defer w.sessPool.put(ctx)

// If there is a null value inserted, it cannot be modified and needs to be rollback.
err = checkForNullValue(ctx, oldCol.Tp == newCol.Tp, dbInfo.Name, tblInfo.Name, oldCol.Name, newCol.Name)
err = checkForNullValue(ctx, isModifiedType, dbInfo.Name, tblInfo.Name, newColName, cols...)
if err != nil {
job.State = model.JobStateRollingback
return errors.Trace(err)
}

// Prevent this field from inserting null values.
tblInfo.Columns[oldCol.Offset].Flag |= mysql.PreventNullInsertFlag
for _, col := range cols {
col.Flag |= mysql.PreventNullInsertFlag
}
return nil
}

Expand Down
39 changes: 34 additions & 5 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ import (
var _ = Suite(&testStateChangeSuite{})

type testStateChangeSuite struct {
lease time.Duration
store kv.Storage
dom *domain.Domain
se session.Session
p *parser.Parser
lease time.Duration
store kv.Storage
dom *domain.Domain
se session.Session
p *parser.Parser
preSQL string
}

func (s *testStateChangeSuite) SetUpSuite(c *C) {
Expand Down Expand Up @@ -745,6 +746,16 @@ func (s *testStateChangeSuite) TestParallelAlterAddIndex(c *C) {
s.testControlParallelExecSQL(c, sql1, sql2, f)
}

func (s *testStateChangeSuite) TestParallelAddPrimaryKey(c *C) {
sql1 := "ALTER TABLE t add primary key index_b(b);"
sql2 := "ALTER TABLE t add primary key index_b(c);"
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2.Error(), Equals, "[schema:1068]Multiple primary key defined")
}
s.testControlParallelExecSQL(c, sql1, sql2, f)
}

func (s *testStateChangeSuite) TestParallelAlterAddPartition(c *C) {
sql1 := `alter table t_part add partition (
partition p2 values less than (30)
Expand Down Expand Up @@ -775,6 +786,20 @@ func (s *testStateChangeSuite) TestParallelDropIndex(c *C) {
s.testControlParallelExecSQL(c, sql1, sql2, f)
}

func (s *testStateChangeSuite) TestParallelDropPrimaryKey(c *C) {
s.preSQL = "ALTER TABLE t add primary key index_b(c);"
defer func() {
s.preSQL = ""
}()
sql1 := "alter table t drop primary key;"
sql2 := "alter table t drop primary key;"
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2.Error(), Equals, "[ddl:1091]index PRIMARY doesn't exist")
}
s.testControlParallelExecSQL(c, sql1, sql2, f)
}

func (s *testStateChangeSuite) TestParallelCreateAndRename(c *C) {
sql1 := "create table t_exists(c int);"
sql2 := "alter table t rename to t_exists;"
Expand All @@ -793,6 +818,10 @@ func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 strin
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "create table t(a int, b int, c int, d int auto_increment,e int, index idx1(d), index idx2(d,e))")
c.Assert(err, IsNil)
if len(s.preSQL) != 0 {
_, err := s.se.Execute(context.Background(), s.preSQL)
c.Assert(err, IsNil)
}
defer s.se.Execute(context.Background(), "drop table t")

_, err = s.se.Execute(context.Background(), "drop database if exists t_part")
Expand Down
110 changes: 8 additions & 102 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package ddl_test
import (
"context"
"fmt"
"math"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -69,11 +68,6 @@ func setupIntegrationSuite(s *testIntegrationSuite, c *C) {
s.lease = 50 * time.Millisecond
ddl.WaitTimeWhenErrorOccured = 0

cfg := config.GetGlobalConfig()
newCfg := *cfg
newCfg.Log.SlowThreshold = 10000
config.StoreGlobalConfig(&newCfg)

s.cluster = mocktikv.NewCluster()
mocktikv.BootstrapWithSingleStore(s.cluster)
s.mvccStore = mocktikv.MustNewMVCCStore()
Expand Down Expand Up @@ -1138,19 +1132,14 @@ func (s *testIntegrationSuite3) TestMultiRegionGetTableEndHandle(c *C) {
// Split the table.
s.cluster.SplitTable(s.mvccStore, tblID, 100)

maxID, emptyTable := s.getMaxTableRowID(testCtx)
maxID, emptyTable := getMaxTableRowID(testCtx, s.store)
c.Assert(emptyTable, IsFalse)
c.Assert(maxID, Equals, int64(999))
c.Assert(maxID, Equals, int64(1000))

tk.MustExec("insert into t values(10000, 1000)")
maxID, emptyTable = s.getMaxTableRowID(testCtx)
c.Assert(emptyTable, IsFalse)
c.Assert(maxID, Equals, int64(10000))

tk.MustExec("insert into t values(-1, 1000)")
maxID, emptyTable = s.getMaxTableRowID(testCtx)
maxID, emptyTable = getMaxTableRowID(testCtx, s.store)
c.Assert(emptyTable, IsFalse)
c.Assert(maxID, Equals, int64(10000))
c.Assert(maxID, Equals, int64(1001))
}

type testMaxTableRowIDContext struct {
Expand All @@ -1167,107 +1156,24 @@ func newTestMaxTableRowIDContext(c *C, d ddl.DDL, tbl table.Table) *testMaxTable
}
}

func (s *testIntegrationSuite) getMaxTableRowID(ctx *testMaxTableRowIDContext) (int64, bool) {
func getMaxTableRowID(ctx *testMaxTableRowIDContext, store kv.Storage) (int64, bool) {
c := ctx.c
d := ctx.d
tbl := ctx.tbl
curVer, err := s.store.CurrentVersion()
curVer, err := store.CurrentVersion()
c.Assert(err, IsNil)
maxID, emptyTable, err := d.GetTableMaxRowID(curVer.Ver, tbl.(table.PhysicalTable))
c.Assert(err, IsNil)
return maxID, emptyTable
}

func (s *testIntegrationSuite) checkGetMaxTableRowID(ctx *testMaxTableRowIDContext, expectEmpty bool, expectMaxID int64) {
func checkGetMaxTableRowID(ctx *testMaxTableRowIDContext, store kv.Storage, expectEmpty bool, expectMaxID int64) {
c := ctx.c
maxID, emptyTable := s.getMaxTableRowID(ctx)
maxID, emptyTable := getMaxTableRowID(ctx, store)
c.Assert(emptyTable, Equals, expectEmpty)
c.Assert(maxID, Equals, expectMaxID)
}

func (s *testIntegrationSuite5) TestGetTableEndHandle(c *C) {
// TestGetTableEndHandle test ddl.GetTableMaxRowID method, which will return the max row id of the table.
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("drop database if exists test_get_endhandle")
tk.MustExec("create database test_get_endhandle")
tk.MustExec("use test_get_endhandle")
// Test PK is handle.
tk.MustExec("create table t(a bigint PRIMARY KEY, b int)")

is := s.dom.InfoSchema()
d := s.dom.DDL()
tbl, err := is.TableByName(model.NewCIStr("test_get_endhandle"), model.NewCIStr("t"))
c.Assert(err, IsNil)

testCtx := newTestMaxTableRowIDContext(c, d, tbl)
// test empty table
s.checkGetMaxTableRowID(testCtx, true, int64(math.MaxInt64))

tk.MustExec("insert into t values(-1, 1)")
s.checkGetMaxTableRowID(testCtx, false, int64(-1))

tk.MustExec("insert into t values(9223372036854775806, 1)")
s.checkGetMaxTableRowID(testCtx, false, int64(9223372036854775806))

tk.MustExec("insert into t values(9223372036854775807, 1)")
s.checkGetMaxTableRowID(testCtx, false, int64(9223372036854775807))

tk.MustExec("insert into t values(10, 1)")
tk.MustExec("insert into t values(102149142, 1)")
s.checkGetMaxTableRowID(testCtx, false, int64(9223372036854775807))

tk.MustExec("create table t1(a bigint PRIMARY KEY, b int)")

for i := 0; i < 1000; i++ {
tk.MustExec(fmt.Sprintf("insert into t1 values(%v, %v)", i, i))
}
is = s.dom.InfoSchema()
testCtx.tbl, err = is.TableByName(model.NewCIStr("test_get_endhandle"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
s.checkGetMaxTableRowID(testCtx, false, int64(999))

// Test PK is not handle
tk.MustExec("create table t2(a varchar(255))")

is = s.dom.InfoSchema()
testCtx.tbl, err = is.TableByName(model.NewCIStr("test_get_endhandle"), model.NewCIStr("t2"))
c.Assert(err, IsNil)
s.checkGetMaxTableRowID(testCtx, true, int64(math.MaxInt64))

for i := 0; i < 1000; i++ {
tk.MustExec(fmt.Sprintf("insert into t2 values(%v)", i))
}

result := tk.MustQuery("select MAX(_tidb_rowid) from t2")
maxID, emptyTable := s.getMaxTableRowID(testCtx)
result.Check(testkit.Rows(fmt.Sprintf("%v", maxID)))
c.Assert(emptyTable, IsFalse)

tk.MustExec("insert into t2 values(100000)")
result = tk.MustQuery("select MAX(_tidb_rowid) from t2")
maxID, emptyTable = s.getMaxTableRowID(testCtx)
result.Check(testkit.Rows(fmt.Sprintf("%v", maxID)))
c.Assert(emptyTable, IsFalse)

tk.MustExec(fmt.Sprintf("insert into t2 values(%v)", math.MaxInt64-1))
result = tk.MustQuery("select MAX(_tidb_rowid) from t2")
maxID, emptyTable = s.getMaxTableRowID(testCtx)
result.Check(testkit.Rows(fmt.Sprintf("%v", maxID)))
c.Assert(emptyTable, IsFalse)

tk.MustExec(fmt.Sprintf("insert into t2 values(%v)", math.MaxInt64))
result = tk.MustQuery("select MAX(_tidb_rowid) from t2")
maxID, emptyTable = s.getMaxTableRowID(testCtx)
result.Check(testkit.Rows(fmt.Sprintf("%v", maxID)))
c.Assert(emptyTable, IsFalse)

tk.MustExec("insert into t2 values(100)")
result = tk.MustQuery("select MAX(_tidb_rowid) from t2")
maxID, emptyTable = s.getMaxTableRowID(testCtx)
result.Check(testkit.Rows(fmt.Sprintf("%v", maxID)))
c.Assert(emptyTable, IsFalse)
}

func (s *testIntegrationSuite) getHistoryDDLJob(id int64) (*model.Job, error) {
var job *model.Job

Expand Down
Loading

0 comments on commit df1baca

Please sign in to comment.