Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions memory/table_editor.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ func (t *tableEditor) SetAutoIncrementValue(ctx *sql.Context, val uint64) error
return nil
}

func (t *tableEditor) AcquireAutoIncrementLock(ctx *sql.Context) (func(), error) {
// TODO: Add concurrency tests for AutoIncrement locking modes.
return func() {}, nil
}

func (t *tableEditor) PreciseMatch() bool {
return true
}
Expand Down
5 changes: 5 additions & 0 deletions sql/fulltext/multi_editor.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ func (editor MultiTableEditor) SetAutoIncrementValue(ctx *sql.Context, u uint64)
return editor.primary.(sql.AutoIncrementSetter).SetAutoIncrementValue(ctx, u)
}

func (editor MultiTableEditor) AcquireAutoIncrementLock(ctx *sql.Context) (func(), error) {
// TODO: Add concurrency tests for AutoIncrement locking modes.
return func() {}, nil
}

// Close implements the interface sql.TableEditor.
func (editor MultiTableEditor) Close(ctx *sql.Context) error {
var err error
Expand Down
5 changes: 5 additions & 0 deletions sql/plan/foreign_key_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ type ForeignKeyHandler struct {
AllUpdaters []sql.ForeignKeyEditor
}

func (n *ForeignKeyHandler) Underlying() sql.Table {
return n.Table
}

var _ sql.Node = (*ForeignKeyHandler)(nil)
var _ sql.CollationCoercible = (*ForeignKeyHandler)(nil)
var _ sql.Table = (*ForeignKeyHandler)(nil)
Expand All @@ -41,6 +45,7 @@ var _ sql.TableEditor = (*ForeignKeyHandler)(nil)
var _ sql.RowInserter = (*ForeignKeyHandler)(nil)
var _ sql.RowUpdater = (*ForeignKeyHandler)(nil)
var _ sql.RowDeleter = (*ForeignKeyHandler)(nil)
var _ sql.TableWrapper = (*ForeignKeyHandler)(nil)

// Resolved implements the interface sql.Node.
func (n *ForeignKeyHandler) Resolved() bool {
Expand Down
23 changes: 23 additions & 0 deletions sql/rowexec/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package rowexec

import (
"errors"
"fmt"
"sync"

Expand Down Expand Up @@ -54,7 +55,28 @@ func (b *BaseBuilder) buildInsertInto(ctx *sql.Context, ii *plan.InsertInto, row
return nil, err
}

var unlocker func()
insertExpressions := getInsertExpressions(ii.Source)
if ii.HasUnspecifiedAutoInc {
_, i, _ := sql.SystemVariables.GetGlobal("innodb_autoinc_lock_mode")
lockMode, ok := i.(int64)
if !ok {
return nil, errors.New(fmt.Sprintf("unexpected type for innodb_autoinc_lock_mode, expected int64, got %T", i))
}
// Lock modes "traditional" (0) and "consecutive" (1) require that a single lock is held for the entire iteration.
// Lock mode "interleaved" (2) will acquire the lock only when inserting into the table.
if lockMode != 2 {
autoIncrementable, ok := sql.GetUnderlyingTable(insertable).(sql.AutoIncrementTable)
if !ok {
return nil, errors.New("auto increment expression on non-AutoIncrement table. This should not be possible")
}

unlocker, err = autoIncrementable.AutoIncrementSetter(ctx).AcquireAutoIncrementLock(ctx)
if err != nil {
return nil, err
}
}
}
insertIter := &insertIter{
schema: dstSchema,
tableNode: ii.Destination,
Expand All @@ -63,6 +85,7 @@ func (b *BaseBuilder) buildInsertInto(ctx *sql.Context, ii *plan.InsertInto, row
updater: updater,
rowSource: rowIter,
hasAutoAutoIncValue: ii.HasUnspecifiedAutoInc,
unlocker: unlocker,
updateExprs: ii.OnDupExprs,
insertExprs: insertExpressions,
checks: ii.Checks(),
Expand Down
4 changes: 4 additions & 0 deletions sql/rowexec/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type insertIter struct {
rowSource sql.RowIter
lastInsertIdUpdated bool
hasAutoAutoIncValue bool
unlocker func()
ctx *sql.Context
insertExprs []sql.Expression
updateExprs []sql.Expression
Expand Down Expand Up @@ -257,6 +258,9 @@ func (i *insertIter) resolveValues(ctx *sql.Context, insertRow sql.Row) error {
func (i *insertIter) Close(ctx *sql.Context) error {
if !i.closed {
i.closed = true
if i.unlocker != nil {
i.unlocker()
}
var rsErr, iErr, rErr, uErr error
if i.rowSource != nil {
rsErr = i.rowSource.Close(ctx)
Expand Down
3 changes: 2 additions & 1 deletion sql/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Session interface {
// SetUserVariable sets the given user variable to the value given for this session, or creates it for this session.
SetUserVariable(ctx *Context, varName string, value interface{}, typ Type) error
// GetSessionVariable returns this session's value of the system variable with the given name.
// To access global scope, use sql.SystemVariables.GetGlobal instead.
GetSessionVariable(ctx *Context, sysVarName string) (interface{}, error)
// GetUserVariable returns this session's value of the user variable with the given name, along with its most
// appropriate type.
Expand All @@ -83,7 +84,7 @@ type Session interface {
// To access global scope, use sql.StatusVariables instead.
GetStatusVariable(ctx *Context, statVarName string) (interface{}, error)
// SetStatusVariable sets the value of the status variable with session scope with the given name.
// To access global scope, use sql.StatusVariables instead.
// To access global scope, use sql.StatusVariables.GetGlobal instead.
SetStatusVariable(ctx *Context, statVarName string, val interface{}) error
// GetAllStatusVariables returns a map of all status variables with session scope and their values.
// To access global scope, use sql.StatusVariables instead.
Expand Down
12 changes: 12 additions & 0 deletions sql/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ type TableWrapper interface {
Underlying() Table
}

func GetUnderlyingTable(t Table) Table {
if tw, ok := t.(TableWrapper); ok {
return GetUnderlyingTable(tw.Underlying())
}
return t
}

// MutableTableWrapper is a TableWrapper that can change its underlying table.
type MutableTableWrapper interface {
TableWrapper
Expand Down Expand Up @@ -332,6 +339,11 @@ type AutoIncrementTable interface {
type AutoIncrementSetter interface {
// SetAutoIncrementValue sets a new AUTO_INCREMENT value.
SetAutoIncrementValue(*Context, uint64) error

// AcquireAutoIncrementLock acquires (if necessary) an exclusive lock on generating auto-increment values for the underlying table.
// This is called when @@innodb_autoinc_lock_mode is set to 0 (traditional) or 1 (consecutive), in order to guarentee that insert
// operations get a consecutive range of generated ids. The function returns a callback to release the lock.
AcquireAutoIncrementLock(ctx *Context) (func(), error)
// Closer finalizes the set operation, persisting the result.
Closer
}
Expand Down
5 changes: 2 additions & 3 deletions sql/variables/system_variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,9 +1029,8 @@ var systemVars = map[string]sql.SystemVariable{
Scope: sql.GetMysqlScope(sql.SystemVariableScope_Global),
Dynamic: false,
SetVarHintApplies: false,
// TODO: lower bound should be 0: https://github.com/dolthub/dolt/issues/7634
Type: types.NewSystemIntType("innodb_autoinc_lock_mode", 2, 2, false),
Default: int64(2),
Type: types.NewSystemIntType("innodb_autoinc_lock_mode", 0, 2, false),
Default: int64(2),
},
// Row locking is currently not supported. This variable is provided for 3p tools, and we always return the
// Lowest value allowed by MySQL, which is 1. If you attempt to set this value to anything other than 1, errors ensue.
Expand Down
7 changes: 5 additions & 2 deletions sql/variables/system_variables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,20 @@ func TestInitSystemVars(t *testing.T) {
{
varName: "innodb_autoinc_lock_mode",
varVal: 0,
err: sql.ErrInvalidSystemVariableValue,
},
{
varName: "innodb_autoinc_lock_mode",
varVal: 1,
err: sql.ErrInvalidSystemVariableValue,
},
{
varName: "innodb_autoinc_lock_mode",
varVal: 2,
},
{
varName: "innodb_autoinc_lock_mode",
varVal: 3,
err: sql.ErrInvalidSystemVariableValue,
},
}

for _, test := range tests {
Expand Down