Skip to content
This repository has been archived by the owner on Sep 21, 2022. It is now read-only.

Commit

Permalink
vreplication: fix row move bug
Browse files Browse the repository at this point in the history
If the target pk for a row changes (row move), then a simple update
may not do the right thing, especially for aggregates. In such cases,
we should delete and insert.

Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
  • Loading branch information
sougou committed Mar 3, 2019
1 parent 31178f9 commit 8510abf
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 18 deletions.
52 changes: 36 additions & 16 deletions go/vt/vttablet/tabletmanager/vreplication/player_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ func (tp *TablePlan) FindCol(name sqlparser.ColIdent) *ColExpr {
return nil
}

// GenerateStatement must be called only after Fields and PKCols have been populated.
func (tp *TablePlan) GenerateStatement(rowChange *binlogdatapb.RowChange) string {
// GenerateStatements must be called only after Fields and PKCols have been populated.
func (tp *TablePlan) GenerateStatements(rowChange *binlogdatapb.RowChange) []string {
// MakeRowTrusted is needed here because Proto3ToResult is not convenient.
var before, after []sqltypes.Value
if rowChange.Before != nil {
Expand All @@ -264,13 +264,33 @@ func (tp *TablePlan) GenerateStatement(rowChange *binlogdatapb.RowChange) string
case before == nil && after != nil:
query = tp.generateInsert(after)
case before != nil && after != nil:
pkChanged := false
for _, cExpr := range tp.PKCols {
if !valsEqual(before[cExpr.ColNum], after[cExpr.ColNum]) {
pkChanged = true
break
}
}
if pkChanged {
queries := make([]string, 0, 2)
if query := tp.generateDelete(before); query != "" {
queries = append(queries, query)
}
if query := tp.generateInsert(after); query != "" {
queries = append(queries, query)
}
return queries
}
query = tp.generateUpdate(before, after)
case before != nil && after == nil:
query = tp.generateDelete(before)
case before == nil && after == nil:
// unreachable
}
return query
if query != "" {
return []string{query}
}
return nil
}

func (tp *TablePlan) generateInsert(after []sqltypes.Value) string {
Expand Down Expand Up @@ -345,22 +365,10 @@ func (tp *TablePlan) generateUpdateValues(sql *sqlparser.TrackedBuffer, before,
if cExpr.Operation == OpCount {
continue
}
bef := before[cExpr.ColNum]
aft := after[cExpr.ColNum]
// If both are null, there's no change
if bef.IsNull() && aft.IsNull() {
continue
}
// If any one of them is null, something has changed.
if bef.IsNull() || aft.IsNull() {
goto mustSet
}
// Compare content only if none are null.
if bef.ToString() == aft.ToString() {
if valsEqual(before[cExpr.ColNum], after[cExpr.ColNum]) {
continue
}
}
mustSet:
sql.Myprintf("%s%v=", separator, cExpr.ColName)
separator = ", "
hasSet = true
Expand Down Expand Up @@ -407,3 +415,15 @@ func (tp *TablePlan) generateWhereValues(sql *sqlparser.TrackedBuffer, before []
encodeValue(sql, before[cExpr.ColNum])
}
}

func valsEqual(v1, v2 sqltypes.Value) bool {
if v1.IsNull() && v2.IsNull() {
return true
}
// If any one of them is null, something has changed.
if v1.IsNull() || v2.IsNull() {
return false
}
// Compare content only if none are null.
return v1.ToString() == v2.ToString()
}
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row
return fmt.Errorf("unexpected event on table %s", rowEvent.TableName)
}
for _, change := range rowEvent.RowChanges {
if query := tplan.GenerateStatement(change); query != "" {
for _, query := range tplan.GenerateStatements(change) {
if err := vp.exec(ctx, query); err != nil {
return err
}
Expand Down
57 changes: 56 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ func TestPlayerFilters(t *testing.T) {
input: "update nopk set val='bbb' where id=1",
output: []string{
"begin",
"update nopk set val='bbb' where id=1 and val='aaa'",
"delete from nopk where id=1 and val='aaa'",
"insert into nopk set id=1, val='bbb'",
"/update _vt.vreplication set pos=",
"commit",
},
Expand Down Expand Up @@ -388,6 +389,60 @@ func TestPlayerUpdates(t *testing.T) {
}
}

func TestPlayerRowMove(t *testing.T) {
defer deleteTablet(addTablet(100, "0", topodatapb.TabletType_REPLICA, true, true))

execStatements(t, []string{
"create table src(id int, val1 int, val2 int, primary key(id))",
fmt.Sprintf("create table %s.dst(val1 int, sval2 int, rcount int, primary key(val1))", vrepldb),
})
defer execStatements(t, []string{
"drop table src",
fmt.Sprintf("drop table %s.dst", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "dst",
Filter: "select val1, sum(val2) as sval2, count(*) as rcount from src group by val1",
}},
}
cancel, _ := startVReplication(t, filter, binlogdatapb.OnDDLAction_IGNORE, "")
defer cancel()

execStatements(t, []string{
"insert into src values(1, 1, 1), (2, 2, 2), (3, 2, 3)",
})
expectDBClientQueries(t, []string{
"begin",
"insert into dst set val1=1, sval2=1, rcount=1 on duplicate key update sval2=sval2+1, rcount=rcount+1",
"insert into dst set val1=2, sval2=2, rcount=1 on duplicate key update sval2=sval2+2, rcount=rcount+1",
"insert into dst set val1=2, sval2=3, rcount=1 on duplicate key update sval2=sval2+3, rcount=rcount+1",
"/update _vt.vreplication set pos=",
"commit",
})
expectData(t, "dst", [][]string{
{"1", "1", "1"},
{"2", "5", "2"},
})

execStatements(t, []string{
"update src set val1=1, val2=4 where id=3",
})
expectDBClientQueries(t, []string{
"begin",
"update dst set sval2=sval2-3, rcount=rcount-1 where val1=2",
"insert into dst set val1=1, sval2=4, rcount=1 on duplicate key update sval2=sval2+4, rcount=rcount+1",
"/update _vt.vreplication set pos=",
"commit",
})
expectData(t, "dst", [][]string{
{"1", "5", "2"},
{"2", "2", "1"},
})
}

func TestPlayerTypes(t *testing.T) {
defer deleteTablet(addTablet(100, "0", topodatapb.TabletType_REPLICA, true, true))

Expand Down

0 comments on commit 8510abf

Please sign in to comment.