Skip to content

Commit

Permalink
ddl: save checkpoint with schemaVersion (#1033)
Browse files Browse the repository at this point in the history
* ddl: save checkpoint with schemaVersion

Co-authored-by: you06 <you1474600@gmail.com>
  • Loading branch information
3pointer and you06 authored Jan 11, 2021
1 parent 893b35c commit e28b75c
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 27 deletions.
7 changes: 5 additions & 2 deletions drainer/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ type CheckPoint interface {
Load() error

// Save saves checkpoint information.
Save(commitTS int64, secondaryTS int64, consistent bool) error
Save(commitTS int64, secondaryTS int64, consistent bool, version int64) error

// TS gets checkpoint commit timestamp.
TS() int64

// SchemaVersion gets checkpoint current schemaversion.
SchemaVersion() int64

// IsConsistent return the Consistent status saved.
IsConsistent() bool

Expand All @@ -61,7 +64,7 @@ func NewCheckPoint(cfg *Config) (CheckPoint, error) {
return nil, errors.Annotatef(err, "initialize %s type checkpoint with config %+v", cfg.CheckpointType, cfg)
}

log.Info("initialize checkpoint", zap.String("type", cfg.CheckpointType), zap.Int64("checkpoint", cp.TS()), zap.Reflect("cfg", cfg))
log.Info("initialize checkpoint", zap.String("type", cfg.CheckpointType), zap.Int64("checkpoint", cp.TS()), zap.Int64("version", cp.SchemaVersion()), zap.Reflect("cfg", cfg))

return cp, nil
}
14 changes: 13 additions & 1 deletion drainer/checkpoint/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type FileCheckPoint struct {

ConsistentSaved bool `toml:"consistent" json:"consistent"`
CommitTS int64 `toml:"commitTS" json:"commitTS"`
Version int64 `toml:"schema-version" json:"schema-version"`
}

// NewFile creates a new FileCheckpoint.
Expand Down Expand Up @@ -82,7 +83,7 @@ func (sp *FileCheckPoint) Load() error {
}

// Save implements CheckPoint.Save interface
func (sp *FileCheckPoint) Save(ts, secondaryTS int64, consistent bool) error {
func (sp *FileCheckPoint) Save(ts, secondaryTS int64, consistent bool, version int64) error {
sp.Lock()
defer sp.Unlock()

Expand All @@ -92,6 +93,9 @@ func (sp *FileCheckPoint) Save(ts, secondaryTS int64, consistent bool) error {

sp.CommitTS = ts
sp.ConsistentSaved = consistent
if version > sp.Version {
sp.Version = version
}

var buf bytes.Buffer
e := toml.NewEncoder(&buf)
Expand All @@ -116,6 +120,14 @@ func (sp *FileCheckPoint) TS() int64 {
return sp.CommitTS
}

// SchemaVersion implements CheckPoint.SchemaVersion interface.
func (sp *FileCheckPoint) SchemaVersion() int64 {
sp.RLock()
defer sp.RUnlock()

return sp.Version
}

// IsConsistent implements CheckPoint interface
func (sp *FileCheckPoint) IsConsistent() bool {
sp.RLock()
Expand Down
6 changes: 3 additions & 3 deletions drainer/checkpoint/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ func (t *testCheckPointSuite) TestFile(c *C) {

testTs := int64(1)
// save ts
err = meta.Save(testTs, 0, false)
err = meta.Save(testTs, 0, false, 0)
c.Assert(err, IsNil)
// check ts
ts := meta.TS()
c.Assert(ts, Equals, testTs)
c.Assert(meta.IsConsistent(), Equals, false)

// check consistent true case.
err = meta.Save(testTs, 0, true)
err = meta.Save(testTs, 0, true, 0)
c.Assert(err, IsNil)
ts = meta.TS()
c.Assert(ts, Equals, testTs)
Expand Down Expand Up @@ -70,6 +70,6 @@ func (t *testCheckPointSuite) TestFile(c *C) {
err = meta.Close()
c.Assert(err, IsNil)
c.Assert(errors.Cause(meta.Load()), Equals, ErrCheckPointClosed)
c.Assert(errors.Cause(meta.Save(0, 0, true)), Equals, ErrCheckPointClosed)
c.Assert(errors.Cause(meta.Save(0, 0, true, 0)), Equals, ErrCheckPointClosed)
c.Assert(errors.Cause(meta.Close()), Equals, ErrCheckPointClosed)
}
30 changes: 23 additions & 7 deletions drainer/checkpoint/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package checkpoint

import (
"context"
"database/sql"
"encoding/json"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -25,6 +27,7 @@ import (
// mysql driver
_ "github.com/go-sql-driver/mysql"
"github.com/pingcap/tidb-binlog/pkg/loader"
"github.com/pingcap/tidb-binlog/pkg/util"
)

// MysqlCheckPoint is a local savepoint struct for mysql
Expand All @@ -41,6 +44,7 @@ type MysqlCheckPoint struct {
ConsistentSaved bool `toml:"consistent" json:"consistent"`
CommitTS int64 `toml:"commitTS" json:"commitTS"`
TsMap map[string]int64 `toml:"ts-map" json:"ts-map"`
Version int64 `toml:"schema-version" json:"schema-version"`
}

var _ CheckPoint = &MysqlCheckPoint{}
Expand Down Expand Up @@ -126,7 +130,7 @@ func (sp *MysqlCheckPoint) Load() error {
}

// Save implements checkpoint.Save interface
func (sp *MysqlCheckPoint) Save(ts, secondaryTS int64, consistent bool) error {
func (sp *MysqlCheckPoint) Save(ts, secondaryTS int64, consistent bool, version int64) error {
sp.Lock()
defer sp.Unlock()

Expand All @@ -136,6 +140,9 @@ func (sp *MysqlCheckPoint) Save(ts, secondaryTS int64, consistent bool) error {

sp.CommitTS = ts
sp.ConsistentSaved = consistent
if version > sp.Version {
sp.Version = version
}

if secondaryTS > 0 {
sp.TsMap["primary-ts"] = ts
Expand All @@ -148,12 +155,13 @@ func (sp *MysqlCheckPoint) Save(ts, secondaryTS int64, consistent bool) error {
}

sql := genReplaceSQL(sp, string(b))
_, err = sp.db.Exec(sql)
if err != nil {
return errors.Annotatef(err, "query sql failed: %s", sql)
}

return nil
return util.RetryContext(context.TODO(), 5, time.Second, 1, func(context.Context) error {
_, err = sp.db.Exec(sql)
if err != nil {
return errors.Annotatef(err, "query sql failed: %s", sql)
}
return nil
})
}

// IsConsistent implements CheckPoint interface
Expand All @@ -172,6 +180,14 @@ func (sp *MysqlCheckPoint) TS() int64 {
return sp.CommitTS
}

// SchemaVersion implements CheckPoint.SchemaVersion interface.
func (sp *MysqlCheckPoint) SchemaVersion() int64 {
sp.RLock()
defer sp.RUnlock()

return sp.Version
}

// Close implements CheckPoint.Close interface
func (sp *MysqlCheckPoint) Close() error {
sp.Lock()
Expand Down
4 changes: 2 additions & 2 deletions drainer/checkpoint/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *saveSuite) TestShouldSaveCheckpoint(c *C) {
c.Assert(err, IsNil)
mock.ExpectExec("replace into db.tbl.*").WillReturnResult(sqlmock.NewResult(0, 0))
cp := MysqlCheckPoint{db: db, schema: "db", table: "tbl"}
err = cp.Save(1111, 0, false)
err = cp.Save(1111, 0, false, 0)
c.Assert(err, IsNil)
}

Expand All @@ -64,7 +64,7 @@ func (s *saveSuite) TestShouldUpdateTsMap(c *C) {
table: "tbl",
TsMap: make(map[string]int64),
}
err = cp.Save(65536, 3333, false)
err = cp.Save(65536, 3333, false, 0)
c.Assert(err, IsNil)
c.Assert(cp.TsMap["primary-ts"], Equals, int64(65536))
c.Assert(cp.TsMap["secondary-ts"], Equals, int64(3333))
Expand Down
2 changes: 1 addition & 1 deletion drainer/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint)
return errors.Trace(readerErr)
}

err := cp.Save(lastSuccessTS, 0 /* secondaryTS */, true /*consistent*/)
err := cp.Save(lastSuccessTS, 0 /* secondaryTS */, true /*consistent*/, 0)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion drainer/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var _ loader.Loader = &noOpLoader{}
func (s *relaySuite) TestFeedByRealyLog(c *check.C) {
cp, err := checkpoint.NewFile(0 /* initialCommitTS */, c.MkDir()+"/checkpoint")
c.Assert(err, check.IsNil)
err = cp.Save(0, 0, false)
err = cp.Save(0, 0, false, 0)
c.Assert(err, check.IsNil)
c.Assert(cp.IsConsistent(), check.Equals, false)

Expand Down
5 changes: 5 additions & 0 deletions drainer/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ type Item struct {
Table string
RelayLogPos pb.Pos

// Each item has a schemaVersion. with amend txn feature the prewrite DML's SchemaVersion could change.
// which makes restart & reload history DDL with previous SchemaVersion not reliable.
// so we should save this version as checkpoint.
SchemaVersion int64

// the applied TS executed in downstream, only for tidb
AppliedTS int64
// should skip to replicate this item at downstream
Expand Down
28 changes: 18 additions & 10 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (s *Syncer) enableSafeModeInitializationPhase() {
func (s *Syncer) handleSuccess(fakeBinlog chan *pb.Binlog, lastTS *int64) {
successes := s.dsyncer.Successes()
var lastSaveTS int64
var latestVersion int64
lastSaveTime := time.Now()

for {
Expand All @@ -208,6 +209,7 @@ func (s *Syncer) handleSuccess(fakeBinlog chan *pb.Binlog, lastTS *int64) {
if ts > atomic.LoadInt64(lastTS) {
atomic.StoreInt64(lastTS, ts)
}
latestVersion = item.SchemaVersion

// save ASAP for DDL, and if FinishTS > 0, we should save the ts map
if item.Binlog.DdlJobId > 0 || item.AppliedTS > 0 {
Expand All @@ -229,7 +231,7 @@ func (s *Syncer) handleSuccess(fakeBinlog chan *pb.Binlog, lastTS *int64) {
ts := atomic.LoadInt64(lastTS)
if ts > lastSaveTS {
if saveNow || time.Since(lastSaveTime) > 3*time.Second {
s.savePoint(ts, appliedTS)
s.savePoint(ts, appliedTS, latestVersion)
lastSaveTime = time.Now()
lastSaveTS = ts
appliedTS = 0
Expand All @@ -242,22 +244,22 @@ func (s *Syncer) handleSuccess(fakeBinlog chan *pb.Binlog, lastTS *int64) {

ts := atomic.LoadInt64(lastTS)
if ts > lastSaveTS {
s.savePoint(ts, 0)
s.savePoint(ts, 0, latestVersion)
eventCounter.WithLabelValues("savepoint").Add(1)
}

log.Info("handleSuccess quit")
}

func (s *Syncer) savePoint(ts, secondaryTS int64) {
func (s *Syncer) savePoint(ts, secondaryTS, version int64) {
if ts < s.cp.TS() {
log.Error("save ts is less than checkpoint ts %d", zap.Int64("save ts", ts), zap.Int64("checkpoint ts", s.cp.TS()))
}

log.Info("write save point", zap.Int64("ts", ts))
err := s.cp.Save(ts, secondaryTS, false)
log.Info("write save point", zap.Int64("ts", ts), zap.Int64("version", version))
err := s.cp.Save(ts, secondaryTS, false, version)
if err != nil {
log.Fatal("save checkpoint failed", zap.Int64("ts", ts), zap.Error(err))
log.Fatal("save checkpoint failed", zap.Int64("ts", ts), zap.Int64("version", version), zap.Error(err))
}

checkpointTSOGauge.Set(float64(oracle.ExtractPhysical(uint64(ts))))
Expand All @@ -282,6 +284,12 @@ func (s *Syncer) run() error {

s.enableSafeModeInitializationPhase()

err = s.schema.handlePreviousDDLJobIfNeed(s.cp.SchemaVersion() + 1)
if err != nil {
err = errors.Annotate(err, "handlePreviousDDLJobIfNeed failed")
return err
}

var lastDDLSchemaVersion int64
var b *binlogItem

Expand Down Expand Up @@ -354,6 +362,7 @@ ForLoop:
err = errors.Annotate(err, "handlePreviousDDLJobIfNeed failed")
break ForLoop
}

var isFilterTransaction = false
var err1 error
if s.loopbackSync != nil && s.loopbackSync.LoopbackControl {
Expand All @@ -375,7 +384,7 @@ ForLoop:
s.addDMLEventMetrics(preWrite.GetMutations())
beginTime := time.Now()
lastAddComitTS = binlog.GetCommitTs()
err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: preWrite})
err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: preWrite, SchemaVersion: preWrite.SchemaVersion})
if err != nil {
err = errors.Annotatef(err, "failed to add item")
break ForLoop
Expand All @@ -396,7 +405,6 @@ ForLoop:

log.Debug("get DDL", zap.Int64("SchemaVersion", b.job.BinlogInfo.SchemaVersion))
lastDDLSchemaVersion = b.job.BinlogInfo.SchemaVersion

err = s.schema.handlePreviousDDLJobIfNeed(b.job.BinlogInfo.SchemaVersion)
if err != nil {
err = errors.Trace(err)
Expand Down Expand Up @@ -443,7 +451,7 @@ ForLoop:
log.Info("add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed",
zap.String("sql", sql), zap.Int64("commit ts", binlog.CommitTs))

err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: nil, Schema: schema, Table: table, ShouldSkip: shouldSkip})
err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: nil, Schema: schema, Table: table, ShouldSkip: shouldSkip, SchemaVersion: lastDDLSchemaVersion})
if err != nil {
err = errors.Annotatef(err, "add to dsyncer, commit ts %d", binlog.CommitTs)
break ForLoop
Expand Down Expand Up @@ -473,7 +481,7 @@ ForLoop:
return cerr
}

return s.cp.Save(s.cp.TS(), 0, true /*consistent*/)
return s.cp.Save(s.cp.TS(), 0, true /*consistent*/, lastDDLSchemaVersion)
}

func findLoopBackMark(dmls []*loader.DML, info *loopbacksync.LoopBackSync) (bool, error) {
Expand Down

0 comments on commit e28b75c

Please sign in to comment.