Skip to content

Commit

Permalink
Optimist: return ConflictNone if a conflict DDL has no conflict with …
Browse files Browse the repository at this point in the history
…at lease one normal table (#5414)

ref #4287
  • Loading branch information
GMHDBJD authored May 17, 2022
1 parent c8e7fff commit 90e26a2
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 51 deletions.
77 changes: 29 additions & 48 deletions dm/pkg/shardddl/optimism/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,8 +889,8 @@ func (l *Lock) trySyncForOneDDL(source, schema, table string, prevTable, postTab

log.L().Info("found conflict for DDL", zap.String("source", source), zap.String("schema", schema), zap.String("table", table), zap.Stringer("prevTable", prevTable), zap.Stringer("postTable", postTable), log.ShortError(tableErr))

if idempotent {
log.L().Info("return conflict DDL for idempotent DDL", zap.String("source", source), zap.String("schema", schema), zap.String("table", table), zap.Stringer("prevTable", prevTable), zap.Stringer("postTable", postTable))
if idempotent || l.noConflictWithOneNormalTable(source, schema, table, prevTable, postTable) {
log.L().Info("directly return conflict DDL", zap.Bool("idempotent", idempotent), zap.String("source", source), zap.String("schema", schema), zap.String("table", table), zap.Stringer("prevTable", prevTable), zap.Stringer("postTable", postTable))
l.tables[source][schema][table] = postTable
l.finalTables[source][schema][table] = postTable
return true, ConflictNone
Expand All @@ -902,15 +902,6 @@ func (l *Lock) trySyncForOneDDL(source, schema, table string, prevTable, postTab
l.addConflictTable(source, schema, table, postTable)
l.finalTables[source][schema][table] = postTable

// if more than one conflict tables and this conflict DDL has no conflict with normal tables
// e.g. tb1,tb2 put ddl1(rename a to b); tb1 put ddl2(rename c to d); tb2 crash and reput ddl1(rename a to b)
// now tb2's ddl1 is a conflict DDL but has no conflict with normal tables
if l.multipleConflictTables() && l.noConflictWithNormalTables(source, schema, table, postTable) {
l.removeConflictTable(source, schema, table)
l.tables[source][schema][table] = postTable
return true, ConflictNone
}

// if any conflict happened between conflict DDLs, return error
// e.g. tb1: "ALTER TABLE RENAME a TO b", tb2: "ALTER TABLE RENAME c TO d"
if !l.noConflictForConflictTables() {
Expand Down Expand Up @@ -1085,33 +1076,39 @@ func (l *Lock) allFinalTableLarger() bool {
return l.allTableLarger(finalTables)
}

// judge whether a conflict DDL has no conflict with all normal tables.
func (l *Lock) noConflictWithNormalTables(source, schema, table string, postTable schemacmp.Table) bool {
// revert conflict tables and final tables
currentConflictTables := l.conflictTables
currentFinalTables := l.finalTables
defer func() {
l.conflictTables = currentConflictTables
l.finalTables = currentFinalTables
}()

// reset conflict tables and final tables
l.conflictTables = make(map[string]map[string]map[string]schemacmp.Table)
l.finalTables = make(map[string]map[string]map[string]schemacmp.Table)
// jude a conflict ddl is no conflict with at least one normal table.
func (l *Lock) noConflictWithOneNormalTable(callerSource, callerSchema, callerTable string, prevTable, postTable schemacmp.Table) bool {
for source, schemaTables := range l.tables {
l.finalTables[source] = make(map[string]map[string]schemacmp.Table)
for schema, tables := range schemaTables {
l.finalTables[source][schema] = make(map[string]schemacmp.Table)
for table, ti := range tables {
l.finalTables[source][schema][table] = ti
if source == callerSource && schema == callerSchema && table == callerTable {
continue
}

// judge joined no error
joined, err := postTable.Join(ti)
if err != nil {
continue
}

// judge this normal table is smaller(same as allTableSmaller)
if _, err = joined.Compare(prevTable); err == nil {
continue
}

// judge this normal table is larger(same as allTableLarger)
if joined, err = prevTable.Join(ti); err != nil {
joined = ti
}
if cmp, err := joined.Compare(postTable); err != nil || cmp < 0 {
continue
}

return true
}
}
}
// update for current conflict DDL
l.addConflictTable(source, schema, table, postTable)
l.finalTables[source][schema][table] = postTable

return l.noConflictForFinalTables()
return false
}

// judge whether all conflict tables has no conflict.
Expand Down Expand Up @@ -1202,19 +1199,3 @@ func (l *Lock) redirectForConflictTables(callerSource, callerSchema, callerTable
}
return nil
}

// multipleConflictTables check whether a lock has multiple conflict tables.
func (l *Lock) multipleConflictTables() bool {
cnt := 0
for _, schemaTables := range l.conflictTables {
for _, tables := range schemaTables {
for range tables {
cnt++
if cnt > 1 {
return true
}
}
}
}
return false
}
58 changes: 55 additions & 3 deletions dm/pkg/shardddl/optimism/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2583,7 +2583,7 @@ func (t *testLock) TestAllTableSmallerLarger(c *C) {
// tb1: rename id to a
l.addConflictTable(source, schema, table1, t9)
l.finalTables[source][schema][table1] = t9
c.Assert(l.noConflictWithNormalTables(source, schema, table1, t1), IsFalse)
c.Assert(l.noConflictWithOneNormalTable(source, schema, table1, t1, t9), IsFalse)
c.Assert(l.allConflictTableSmaller(), IsTrue)
c.Assert(l.allConflictTableLarger(), IsTrue)
c.Assert(l.allFinalTableSmaller(), IsFalse)
Expand All @@ -2592,13 +2592,13 @@ func (t *testLock) TestAllTableSmallerLarger(c *C) {
l.tables[source][schema][table2] = t0
l.addConflictTable(source, schema, table2, t1)
l.finalTables[source][schema][table2] = t1
c.Assert(l.noConflictWithNormalTables(source, schema, table2, t1), IsTrue)
c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t1), IsTrue)
l.removeConflictTable(source, schema, table2)
l.tables[source][schema][table2] = t1
// tb2: rename id to a
l.addConflictTable(source, schema, table2, t9)
l.finalTables[source][schema][table2] = t9
c.Assert(l.noConflictWithNormalTables(source, schema, table2, t9), IsFalse)
c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t1, t9), IsFalse)
c.Assert(l.allConflictTableSmaller(), IsTrue)
c.Assert(l.allConflictTableLarger(), IsTrue)
c.Assert(l.allFinalTableSmaller(), IsTrue)
Expand All @@ -2614,6 +2614,58 @@ func (t *testLock) TestAllTableSmallerLarger(c *C) {
c.Assert(l.tables[source][schema][table2], DeepEquals, t0)
}

func (t *testLock) TestNoConflictWithOneNormalTable(c *C) {
var (
source = "source"
schema = "schema"
table1 = "table1"
table2 = "table2"
p = parser.New()
se = mock.NewContext()
tblID int64 = 111
ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, a int, col int)`)
ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, a int, new_col int)`)
ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, a int, col varchar(4))`)
ti3 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, a int, new_col2 int)`)
ti4 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, b int, new_col int)`)
t0 = schemacmp.Encode(ti0)
t1 = schemacmp.Encode(ti1)
t2 = schemacmp.Encode(ti2)
t3 = schemacmp.Encode(ti3)
t4 = schemacmp.Encode(ti4)
)
l := &Lock{
tables: map[string]map[string]map[string]schemacmp.Table{
source: {
schema: {table1: t0, table2: t0},
},
},
}

// table1 nothing happened.
// table2 rename column
c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t1), IsFalse)

// mock table1 rename column already
l.tables[source][schema][table1] = t1
// table2 rename column
c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t1), IsTrue)
// table2 modify column
c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t2), IsFalse)
// table2 different rename
c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t3), IsFalse)

// mock table1 rename another column already
l.tables[source][schema][table1] = t4
// same results
// table2 rename column
c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t1), IsTrue)
// table2 modify column
c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t2), IsFalse)
// table2 different rename
c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t3), IsFalse)
}

func checkRedirectOp(c *C, task, source, schema, table string) bool {
ops, _, err := GetAllOperations(etcdTestCli)
c.Assert(err, IsNil)
Expand Down

0 comments on commit 90e26a2

Please sign in to comment.