Skip to content

Commit

Permalink
schema storage: get the schema snap of commit ts -1 to support asyn…
Browse files Browse the repository at this point in the history
…c commit (#1513)
  • Loading branch information
leoppro authored Mar 17, 2021
1 parent 7dbd65d commit 8d4e5ab
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 3 deletions.
4 changes: 3 additions & 1 deletion cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,9 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
PhysicalTableID: physicalTableID,
Delete: raw.OpType == model.OpTypeDelete,
}
snap, err := m.schemaStorage.GetSnapshot(ctx, raw.CRTs)
// when async commit is enabled, the commitTs of DMLs may be equals with DDL finishedTs
// a DML whose commitTs is equal to a DDL finishedTs using the schema info before the DDL
snap, err := m.schemaStorage.GetSnapshot(ctx, raw.CRTs-1)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/autorandom/data/test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use `autorandom_test`;
CREATE TABLE table_a (
id BIGINT AUTO_RANDOM,
data int,
PRIMARY KEY(id)
PRIMARY KEY(id) clustered
);

INSERT INTO table_a (data) value (1);
INSERT INTO table_a (data) value (2);
INSERT INTO table_a (data) value (3);
INSERT INTO table_a (data) value (4);
INSERT INTO table_a (data) value (5);
INSERT INTO table_a (data) value (5);
22 changes: 22 additions & 0 deletions tests/multi_source/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func main() {
log.S().Errorf("Failed to close source database: %s\n", err)
}
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go switchAsyncCommit(ctx, sourceDB0)
util.MustExec(sourceDB0, "create database mark;")
runDDLTest([]*sql.DB{sourceDB0, sourceDB1})
util.MustExec(sourceDB0, "create table mark.finish_mark(a int primary key);")
Expand Down Expand Up @@ -109,6 +112,25 @@ func runDDLTest(srcs []*sql.DB) {
}
}

func switchAsyncCommit(ctx context.Context, db *sql.DB) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
enabled := false
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if enabled {
util.MustExec(db, "set global tidb_enable_async_commit = off")
} else {
util.MustExec(db, "set global tidb_enable_async_commit = on")
}
enabled = !enabled
}
}
}

func getFunctionName(i interface{}) string {
strs := strings.Split(runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name(), ".")
return strs[len(strs)-1]
Expand Down

0 comments on commit 8d4e5ab

Please sign in to comment.