Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

table: fix insert into _tidb_rowid panic and rebase it if needed (#22062) #22359

Merged
merged 4 commits into from
Jan 25, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func (w *worker) doModifyColumn(
func checkAndApplyNewAutoRandomBits(job *model.Job, t *meta.Meta, tblInfo *model.TableInfo,
newCol *model.ColumnInfo, oldName *model.CIStr, newAutoRandBits uint64) error {
schemaID := job.SchemaID
newLayout := autoid.NewAutoRandomIDLayout(&newCol.FieldType, newAutoRandBits)
newLayout := autoid.NewShardIDLayout(&newCol.FieldType, newAutoRandBits)

// GenAutoRandomID first to prevent concurrent update.
_, err := t.GenAutoRandomID(schemaID, tblInfo.ID, 1)
Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,7 @@ func setTableAutoRandomBits(ctx sessionctx.Context, tbInfo *model.TableInfo, col
return errors.Trace(err)
}

layout := autoid.NewAutoRandomIDLayout(col.Tp, autoRandBits)
layout := autoid.NewShardIDLayout(col.Tp, autoRandBits)
if autoRandBits == 0 {
return ErrInvalidAutoRandom.GenWithStackByArgs(autoid.AutoRandomNonPositive)
} else if autoRandBits > autoid.MaxAutoRandomBits {
Expand Down Expand Up @@ -2230,7 +2230,7 @@ func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int6
break
}
}
layout := autoid.NewAutoRandomIDLayout(&autoRandColTp, tbInfo.AutoRandomBits)
layout := autoid.NewShardIDLayout(&autoRandColTp, tbInfo.AutoRandomBits)
if layout.IncrementalMask()&newBase != newBase {
errMsg := fmt.Sprintf(autoid.AutoRandomRebaseOverflow, newBase, layout.IncrementalBitsCapacity())
return errors.Trace(ErrInvalidAutoRandom.GenWithStackByArgs(errMsg))
Expand Down
75 changes: 71 additions & 4 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func (e *InsertValues) initInsertColumns() error {
return errors.Errorf("insert, update and replace statements for _tidb_rowid are not supported.")
}
e.hasExtraHandle = true
break
}
}

Expand Down Expand Up @@ -547,6 +546,13 @@ func (e *InsertValues) fillColValue(ctx context.Context, datum types.Datum, idx
}
return d, nil
}
if column.ID == model.ExtraHandleID && hasValue {
d, err := e.adjustImplicitRowID(ctx, datum, hasValue, column)
if err != nil {
return types.Datum{}, err
}
return d, nil
}
if !hasValue {
d, err := e.getColDefaultValue(idx, column)
if e.handleErr(column, &datum, 0, err) != nil {
Expand All @@ -565,7 +571,14 @@ func (e *InsertValues) fillColValue(ctx context.Context, datum types.Datum, idx
// https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html
func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue []bool) ([]types.Datum, error) {
gCols := make([]*table.Column, 0)
for i, c := range e.Table.Cols() {
tCols := e.Table.Cols()
if e.hasExtraHandle {
col := &table.Column{}
col.ColumnInfo = model.NewExtraHandleColInfo()
col.ColumnInfo.Offset = len(tCols)
tCols = append(tCols, col)
}
for i, c := range tCols {
var err error
// Evaluate the generated columns later after real columns set
if c.IsGenerated() {
Expand Down Expand Up @@ -877,7 +890,7 @@ func (e *InsertValues) allocAutoRandomID(fieldType *types.FieldType) (int64, err
if err != nil {
return 0, err
}
layout := autoid.NewAutoRandomIDLayout(fieldType, tableInfo.AutoRandomBits)
layout := autoid.NewShardIDLayout(fieldType, tableInfo.AutoRandomBits)
if tables.OverflowShardBits(autoRandomID, tableInfo.AutoRandomBits, layout.TypeBitsLength, layout.HasSignBit) {
return 0, autoid.ErrAutoRandReadFailed
}
Expand All @@ -893,12 +906,66 @@ func (e *InsertValues) rebaseAutoRandomID(recordID int64, fieldType *types.Field
alloc := e.Table.Allocators(e.ctx).Get(autoid.AutoRandomType)
tableInfo := e.Table.Meta()

layout := autoid.NewAutoRandomIDLayout(fieldType, tableInfo.AutoRandomBits)
layout := autoid.NewShardIDLayout(fieldType, tableInfo.AutoRandomBits)
autoRandomID := layout.IncrementalMask() & recordID

return alloc.Rebase(tableInfo.ID, autoRandomID, true)
}

func (e *InsertValues) adjustImplicitRowID(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
var err error
var recordID int64
if !hasValue {
d.SetNull()
}
if !d.IsNull() {
recordID = d.GetInt64()
}
// Use the value if it's not null and not 0.
if recordID != 0 {
if !e.ctx.GetSessionVars().AllowWriteRowID {
return types.Datum{}, errors.Errorf("insert, update and replace statements for _tidb_rowid are not supported.")
}
err = e.rebaseImplicitRowID(recordID)
if err != nil {
return types.Datum{}, err
}
d.SetInt64(recordID)
return d, nil
}
// Change NULL to auto id.
// Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set.
if d.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 {
_, err := e.ctx.Txn(true)
if err != nil {
return types.Datum{}, errors.Trace(err)
}
intHandle, err := tables.AllocHandle(ctx, e.ctx, e.Table)
if err != nil {
return types.Datum{}, err
}
recordID = intHandle.IntValue()
}
err = setDatumAutoIDAndCast(e.ctx, &d, recordID, c)
if err != nil {
return types.Datum{}, err
}
return d, nil
}

func (e *InsertValues) rebaseImplicitRowID(recordID int64) error {
if recordID < 0 {
return nil
}
alloc := e.Table.Allocators(e.ctx).Get(autoid.RowIDAllocType)
tableInfo := e.Table.Meta()

layout := autoid.NewShardIDLayout(types.NewFieldType(mysql.TypeLonglong), tableInfo.ShardRowIDBits)
newTiDBRowIDBase := layout.IncrementalMask() & recordID

return alloc.Rebase(tableInfo.ID, newTiDBRowIDBase, true)
}

func (e *InsertValues) handleWarning(err error) {
sc := e.ctx.GetSessionVars().StmtCtx
sc.AppendWarning(err)
Expand Down
49 changes: 49 additions & 0 deletions executor/rowid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,52 @@ func (s *testSuite1) TestNotAllowWriteRowID(c *C) {
// tk.MustExec("insert into tt (id, c, _tidb_rowid) values(30000,10,1);")
// tk.MustExec("admin check table tt;")
}

// Test for https://github.com/pingcap/tidb/issues/22029.
func (s *testSuite3) TestExplicitInsertRowID(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("drop database if exists test_explicit_insert_rowid;")
tk.MustExec("create database test_explicit_insert_rowid;")
tk.MustExec("use test_explicit_insert_rowid;")
tk.MustExec("create table t (a int);")
tk.MustExec("set @@tidb_opt_write_row_id = true;")

// Check that _tidb_rowid insertion success.
tk.MustExec("insert into t (_tidb_rowid, a) values (1, 1), (2, 2);")
tk.MustQuery("select *, _tidb_rowid from t;").Check(testkit.Rows("1 1", "2 2"))
// Test that explicitly insert _tidb_rowid will trigger rebase.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(id varchar(10), c int);")
tk.MustExec("insert t values('one', 101), ('two', 102);")
tk.MustQuery("select *, _tidb_rowid from t;").
Check(testkit.Rows("one 101 1", "two 102 2"))
// check that _tidb_rowid takes the values from the insert statement
tk.MustExec("insert t (id, c, _tidb_rowid) values ('three', 103, 9), ('four', 104, 16), ('five', 105, 5);")
tk.MustQuery("select *, _tidb_rowid from t where c > 102;").
Check(testkit.Rows("five 105 5", "three 103 9", "four 104 16"))
// check that the new _tidb_rowid is rebased
tk.MustExec("insert t values ('six', 106), ('seven', 107);")
tk.MustQuery("select *, _tidb_rowid from t where c > 105;").
Check(testkit.Rows("six 106 17", "seven 107 18"))

// Check that shard_row_id_bits are ignored during rebase.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int) shard_row_id_bits = 5;")
tk.MustExec("insert into t values (1);")
tk.MustQuery("select *, ((1 << (64-5-1)) - 1) & _tidb_rowid from t order by a;").Check(testkit.Rows("1 1"))
tk.MustExec("insert into t (a, _tidb_rowid) values (2, (1<<62)+5);")
tk.MustExec("insert into t values (3);")
tk.MustQuery("select *, ((1 << (64-5-1)) - 1) & _tidb_rowid from t order by a;").Check(testkit.Rows("1 1", "2 5", "3 6"))
tk.MustExec("insert into t (a, _tidb_rowid) values (4, null);")
tk.MustQuery("select *, ((1 << (64-5-1)) - 1) & _tidb_rowid from t order by a;").Check(testkit.Rows("1 1", "2 5", "3 6", "4 7"))

tk.MustExec("delete from t;")
tk.MustExec("SET sql_mode=(SELECT CONCAT(@@sql_mode,',NO_AUTO_VALUE_ON_ZERO'));")
tk.MustExec("insert into t (a, _tidb_rowid) values (5, 0);")
tk.MustQuery("select *, ((1 << (64-5-1)) - 1) & _tidb_rowid from t order by a;").Check(testkit.Rows("5 0"))
tk.MustExec("SET sql_mode=(SELECT REPLACE(@@sql_mode,'NO_AUTO_VALUE_ON_ZERO',''));")
tk.MustExec("insert into t (a, _tidb_rowid) values (6, 0);")
tk.MustQuery("select *, ((1 << (64-5-1)) - 1) & _tidb_rowid from t order by a;").Check(testkit.Rows("5 0", "6 8"))
tk.MustExec("insert into t (_tidb_rowid, a) values (0, 7);")
tk.MustQuery("select *, ((1 << (64-5-1)) - 1) & _tidb_rowid from t order by a;").Check(testkit.Rows("5 0", "6 8", "7 9"))
}
2 changes: 1 addition & 1 deletion executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func rebaseAutoRandomValue(sctx sessionctx.Context, t table.Table, newData *type
if recordID < 0 {
return nil
}
layout := autoid.NewAutoRandomIDLayout(&col.FieldType, tableInfo.AutoRandomBits)
layout := autoid.NewShardIDLayout(&col.FieldType, tableInfo.AutoRandomBits)
// Set bits except incremental_bits to zero.
recordID = recordID & (1<<layout.IncrementalBits - 1)
return t.Allocators(sctx).Get(autoid.AutoRandomType).Rebase(tableInfo.ID, recordID, true)
Expand Down
28 changes: 12 additions & 16 deletions meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,17 +913,13 @@ func TestModifyBaseAndEndInjection(alloc Allocator, base, end int64) {
alloc.(*allocator).mu.Unlock()
}

// AutoRandomIDLayout is used to calculate the bits length of different section in auto_random id.
// The primary key with auto_random can only be `bigint` column, the total layout length of auto random is 64 bits.
// These are two type of layout:
// 1. Signed bigint:
// | [sign_bit] | [shard_bits] | [incremental_bits] |
// sign_bit(1 fixed) + shard_bits(15 max) + incremental_bits(the rest) = total_layout_bits(64 fixed)
// 2. Unsigned bigint:
// | [shard_bits] | [incremental_bits] |
// shard_bits(15 max) + incremental_bits(the rest) = total_layout_bits(64 fixed)
// Please always use NewAutoRandomIDLayout() to instantiate.
type AutoRandomIDLayout struct {
// ShardIDLayout is used to calculate the bits length of different segments in auto id.
// Generally, an auto id is consist of 3 segments: sign bit, shard bits and incremental bits.
// Take ``a BIGINT AUTO_INCREMENT PRIMARY KEY`` as an example, assume that the `shard_row_id_bits` = 5,
// the layout is like
// | [sign_bit] (1 bit) | [shard_bits] (5 bits) | [incremental_bits] (64-1-5=58 bits) |
// Please always use NewShardIDLayout() to instantiate.
type ShardIDLayout struct {
FieldType *types.FieldType
ShardBits uint64
// Derived fields.
Expand All @@ -932,15 +928,15 @@ type AutoRandomIDLayout struct {
HasSignBit bool
}

// NewAutoRandomIDLayout create an instance of AutoRandomIDLayout.
func NewAutoRandomIDLayout(fieldType *types.FieldType, shardBits uint64) *AutoRandomIDLayout {
// NewShardIDLayout create an instance of ShardIDLayout.
func NewShardIDLayout(fieldType *types.FieldType, shardBits uint64) *ShardIDLayout {
typeBitsLength := uint64(mysql.DefaultLengthOfMysqlTypes[mysql.TypeLonglong] * 8)
incrementalBits := typeBitsLength - shardBits
hasSignBit := !mysql.HasUnsignedFlag(fieldType.Flag)
if hasSignBit {
incrementalBits -= 1
}
return &AutoRandomIDLayout{
return &ShardIDLayout{
FieldType: fieldType,
ShardBits: shardBits,
TypeBitsLength: typeBitsLength,
Expand All @@ -950,11 +946,11 @@ func NewAutoRandomIDLayout(fieldType *types.FieldType, shardBits uint64) *AutoRa
}

// IncrementalBitsCapacity returns the max capacity of incremental section of the current layout.
func (l *AutoRandomIDLayout) IncrementalBitsCapacity() uint64 {
func (l *ShardIDLayout) IncrementalBitsCapacity() uint64 {
return uint64(math.Pow(2, float64(l.IncrementalBits))) - 1
}

// IncrementalMask returns 00..0[11..1], where [xxx] is the incremental section of the current layout.
func (l *AutoRandomIDLayout) IncrementalMask() int64 {
func (l *ShardIDLayout) IncrementalMask() int64 {
return (1 << l.IncrementalBits) - 1
}