Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: save checkpoint with schemaVersion #1033

Merged
merged 11 commits into from
Jan 11, 2021
7 changes: 5 additions & 2 deletions drainer/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ type CheckPoint interface {
Load() error

// Save saves checkpoint information.
Save(commitTS int64, secondaryTS int64, consistent bool) error
Save(commitTS int64, secondaryTS int64, consistent bool, version int64) error

// TS gets checkpoint commit timestamp.
TS() int64

// SchemaVersion gets checkpoint current schemaversion.
SchemaVersion() int64

// IsConsistent return the Consistent status saved.
IsConsistent() bool

Expand All @@ -61,7 +64,7 @@ func NewCheckPoint(cfg *Config) (CheckPoint, error) {
return nil, errors.Annotatef(err, "initialize %s type checkpoint with config %+v", cfg.CheckpointType, cfg)
}

log.Info("initialize checkpoint", zap.String("type", cfg.CheckpointType), zap.Int64("checkpoint", cp.TS()), zap.Reflect("cfg", cfg))
log.Info("initialize checkpoint", zap.String("type", cfg.CheckpointType), zap.Int64("checkpoint", cp.TS()), zap.Int64("version", cp.SchemaVersion()), zap.Reflect("cfg", cfg))

return cp, nil
}
14 changes: 13 additions & 1 deletion drainer/checkpoint/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type FileCheckPoint struct {

ConsistentSaved bool `toml:"consistent" json:"consistent"`
CommitTS int64 `toml:"commitTS" json:"commitTS"`
Version int64 `toml:"schema-version" json:"schema-version"`
}

// NewFile creates a new FileCheckpoint.
Expand Down Expand Up @@ -82,7 +83,7 @@ func (sp *FileCheckPoint) Load() error {
}

// Save implements CheckPoint.Save interface
func (sp *FileCheckPoint) Save(ts, secondaryTS int64, consistent bool) error {
func (sp *FileCheckPoint) Save(ts, secondaryTS int64, consistent bool, version int64) error {
sp.Lock()
defer sp.Unlock()

Expand All @@ -92,6 +93,9 @@ func (sp *FileCheckPoint) Save(ts, secondaryTS int64, consistent bool) error {

sp.CommitTS = ts
sp.ConsistentSaved = consistent
if version > sp.Version {
sp.Version = version
}

var buf bytes.Buffer
e := toml.NewEncoder(&buf)
Expand All @@ -116,6 +120,14 @@ func (sp *FileCheckPoint) TS() int64 {
return sp.CommitTS
}

// SchemaVersion implements CheckPoint.SchemaVersion interface.
func (sp *FileCheckPoint) SchemaVersion() int64 {
sp.RLock()
defer sp.RUnlock()

return sp.Version
}

// IsConsistent implements CheckPoint interface
func (sp *FileCheckPoint) IsConsistent() bool {
sp.RLock()
Expand Down
6 changes: 3 additions & 3 deletions drainer/checkpoint/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ func (t *testCheckPointSuite) TestFile(c *C) {

testTs := int64(1)
// save ts
err = meta.Save(testTs, 0, false)
err = meta.Save(testTs, 0, false, 0)
c.Assert(err, IsNil)
// check ts
ts := meta.TS()
c.Assert(ts, Equals, testTs)
c.Assert(meta.IsConsistent(), Equals, false)

// check consistent true case.
err = meta.Save(testTs, 0, true)
err = meta.Save(testTs, 0, true, 0)
c.Assert(err, IsNil)
ts = meta.TS()
c.Assert(ts, Equals, testTs)
Expand Down Expand Up @@ -70,6 +70,6 @@ func (t *testCheckPointSuite) TestFile(c *C) {
err = meta.Close()
c.Assert(err, IsNil)
c.Assert(errors.Cause(meta.Load()), Equals, ErrCheckPointClosed)
c.Assert(errors.Cause(meta.Save(0, 0, true)), Equals, ErrCheckPointClosed)
c.Assert(errors.Cause(meta.Save(0, 0, true, 0)), Equals, ErrCheckPointClosed)
c.Assert(errors.Cause(meta.Close()), Equals, ErrCheckPointClosed)
}
30 changes: 23 additions & 7 deletions drainer/checkpoint/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package checkpoint

import (
"context"
"database/sql"
"encoding/json"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -25,6 +27,7 @@ import (
// mysql driver
_ "github.com/go-sql-driver/mysql"
"github.com/pingcap/tidb-binlog/pkg/loader"
"github.com/pingcap/tidb-binlog/pkg/util"
)

// MysqlCheckPoint is a local savepoint struct for mysql
Expand All @@ -41,6 +44,7 @@ type MysqlCheckPoint struct {
ConsistentSaved bool `toml:"consistent" json:"consistent"`
CommitTS int64 `toml:"commitTS" json:"commitTS"`
TsMap map[string]int64 `toml:"ts-map" json:"ts-map"`
Version int64 `toml:"schema-version" json:"schema-version"`
}

var _ CheckPoint = &MysqlCheckPoint{}
Expand Down Expand Up @@ -126,7 +130,7 @@ func (sp *MysqlCheckPoint) Load() error {
}

// Save implements checkpoint.Save interface
func (sp *MysqlCheckPoint) Save(ts, secondaryTS int64, consistent bool) error {
func (sp *MysqlCheckPoint) Save(ts, secondaryTS int64, consistent bool, version int64) error {
sp.Lock()
defer sp.Unlock()

Expand All @@ -136,6 +140,9 @@ func (sp *MysqlCheckPoint) Save(ts, secondaryTS int64, consistent bool) error {

sp.CommitTS = ts
sp.ConsistentSaved = consistent
if version > sp.Version {
sp.Version = version
}

if secondaryTS > 0 {
sp.TsMap["primary-ts"] = ts
Expand All @@ -148,12 +155,13 @@ func (sp *MysqlCheckPoint) Save(ts, secondaryTS int64, consistent bool) error {
}

sql := genReplaceSQL(sp, string(b))
_, err = sp.db.Exec(sql)
if err != nil {
return errors.Annotatef(err, "query sql failed: %s", sql)
}

return nil
return util.RetryContext(context.TODO(), 5, time.Second, 1, func(context.Context) error {
_, err = sp.db.Exec(sql)
if err != nil {
return errors.Annotatef(err, "query sql failed: %s", sql)
}
return nil
})
}

// IsConsistent implements CheckPoint interface
Expand All @@ -172,6 +180,14 @@ func (sp *MysqlCheckPoint) TS() int64 {
return sp.CommitTS
}

// SchemaVersion implements CheckPoint.SchemaVersion interface.
func (sp *MysqlCheckPoint) SchemaVersion() int64 {
sp.RLock()
defer sp.RUnlock()

return sp.Version
}

// Close implements CheckPoint.Close interface
func (sp *MysqlCheckPoint) Close() error {
sp.Lock()
Expand Down
4 changes: 2 additions & 2 deletions drainer/checkpoint/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *saveSuite) TestShouldSaveCheckpoint(c *C) {
c.Assert(err, IsNil)
mock.ExpectExec("replace into db.tbl.*").WillReturnResult(sqlmock.NewResult(0, 0))
cp := MysqlCheckPoint{db: db, schema: "db", table: "tbl"}
err = cp.Save(1111, 0, false)
err = cp.Save(1111, 0, false, 0)
c.Assert(err, IsNil)
}

Expand All @@ -64,7 +64,7 @@ func (s *saveSuite) TestShouldUpdateTsMap(c *C) {
table: "tbl",
TsMap: make(map[string]int64),
}
err = cp.Save(65536, 3333, false)
err = cp.Save(65536, 3333, false, 0)
c.Assert(err, IsNil)
c.Assert(cp.TsMap["primary-ts"], Equals, int64(65536))
c.Assert(cp.TsMap["secondary-ts"], Equals, int64(3333))
Expand Down
2 changes: 1 addition & 1 deletion drainer/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint)
return errors.Trace(readerErr)
}

err := cp.Save(lastSuccessTS, 0 /* secondaryTS */, true /*consistent*/)
err := cp.Save(lastSuccessTS, 0 /* secondaryTS */, true /*consistent*/, 0)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion drainer/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var _ loader.Loader = &noOpLoader{}
func (s *relaySuite) TestFeedByRealyLog(c *check.C) {
cp, err := checkpoint.NewFile(0 /* initialCommitTS */, c.MkDir()+"/checkpoint")
c.Assert(err, check.IsNil)
err = cp.Save(0, 0, false)
err = cp.Save(0, 0, false, 0)
c.Assert(err, check.IsNil)
c.Assert(cp.IsConsistent(), check.Equals, false)

Expand Down
5 changes: 5 additions & 0 deletions drainer/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ type Item struct {
Table string
RelayLogPos pb.Pos

// Each item has a schemaVersion. with amend txn feature the prewrite DML's SchemaVersion could change.
// which makes restart & reload history DDL with previous SchemaVersion not reliable.
// so we should save this version as checkpoint.
SchemaVersion int64

// the applied TS executed in downstream, only for tidb
AppliedTS int64
// should skip to replicate this item at downstream
Expand Down
28 changes: 18 additions & 10 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (s *Syncer) enableSafeModeInitializationPhase() {
func (s *Syncer) handleSuccess(fakeBinlog chan *pb.Binlog, lastTS *int64) {
successes := s.dsyncer.Successes()
var lastSaveTS int64
var latestVersion int64
lastSaveTime := time.Now()

for {
Expand All @@ -208,6 +209,7 @@ func (s *Syncer) handleSuccess(fakeBinlog chan *pb.Binlog, lastTS *int64) {
if ts > atomic.LoadInt64(lastTS) {
atomic.StoreInt64(lastTS, ts)
}
latestVersion = item.SchemaVersion

// save ASAP for DDL, and if FinishTS > 0, we should save the ts map
if item.Binlog.DdlJobId > 0 || item.AppliedTS > 0 {
Expand All @@ -229,7 +231,7 @@ func (s *Syncer) handleSuccess(fakeBinlog chan *pb.Binlog, lastTS *int64) {
ts := atomic.LoadInt64(lastTS)
if ts > lastSaveTS {
if saveNow || time.Since(lastSaveTime) > 3*time.Second {
s.savePoint(ts, appliedTS)
s.savePoint(ts, appliedTS, latestVersion)
lastSaveTime = time.Now()
lastSaveTS = ts
appliedTS = 0
Expand All @@ -242,22 +244,22 @@ func (s *Syncer) handleSuccess(fakeBinlog chan *pb.Binlog, lastTS *int64) {

ts := atomic.LoadInt64(lastTS)
if ts > lastSaveTS {
s.savePoint(ts, 0)
s.savePoint(ts, 0, latestVersion)
eventCounter.WithLabelValues("savepoint").Add(1)
}

log.Info("handleSuccess quit")
}

func (s *Syncer) savePoint(ts, secondaryTS int64) {
func (s *Syncer) savePoint(ts, secondaryTS, version int64) {
if ts < s.cp.TS() {
log.Error("save ts is less than checkpoint ts %d", zap.Int64("save ts", ts), zap.Int64("checkpoint ts", s.cp.TS()))
}

log.Info("write save point", zap.Int64("ts", ts))
err := s.cp.Save(ts, secondaryTS, false)
log.Info("write save point", zap.Int64("ts", ts), zap.Int64("version", version))
err := s.cp.Save(ts, secondaryTS, false, version)
if err != nil {
log.Fatal("save checkpoint failed", zap.Int64("ts", ts), zap.Error(err))
log.Fatal("save checkpoint failed", zap.Int64("ts", ts), zap.Int64("version", version), zap.Error(err))
}

checkpointTSOGauge.Set(float64(oracle.ExtractPhysical(uint64(ts))))
Expand All @@ -282,6 +284,12 @@ func (s *Syncer) run() error {

s.enableSafeModeInitializationPhase()

err = s.schema.handlePreviousDDLJobIfNeed(s.cp.SchemaVersion() + 1)
if err != nil {
err = errors.Annotate(err, "handlePreviousDDLJobIfNeed failed")
return err
}

var lastDDLSchemaVersion int64
var b *binlogItem

Expand Down Expand Up @@ -354,6 +362,7 @@ ForLoop:
err = errors.Annotate(err, "handlePreviousDDLJobIfNeed failed")
break ForLoop
}

var isFilterTransaction = false
var err1 error
if s.loopbackSync != nil && s.loopbackSync.LoopbackControl {
Expand All @@ -375,7 +384,7 @@ ForLoop:
s.addDMLEventMetrics(preWrite.GetMutations())
beginTime := time.Now()
lastAddComitTS = binlog.GetCommitTs()
err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: preWrite})
err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: preWrite, SchemaVersion: preWrite.SchemaVersion})
if err != nil {
err = errors.Annotatef(err, "failed to add item")
break ForLoop
Expand All @@ -396,7 +405,6 @@ ForLoop:

log.Debug("get DDL", zap.Int64("SchemaVersion", b.job.BinlogInfo.SchemaVersion))
lastDDLSchemaVersion = b.job.BinlogInfo.SchemaVersion

err = s.schema.handlePreviousDDLJobIfNeed(b.job.BinlogInfo.SchemaVersion)
if err != nil {
err = errors.Trace(err)
Expand Down Expand Up @@ -443,7 +451,7 @@ ForLoop:
log.Info("add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed",
zap.String("sql", sql), zap.Int64("commit ts", binlog.CommitTs))

err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: nil, Schema: schema, Table: table, ShouldSkip: shouldSkip})
err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: nil, Schema: schema, Table: table, ShouldSkip: shouldSkip, SchemaVersion: lastDDLSchemaVersion})
if err != nil {
err = errors.Annotatef(err, "add to dsyncer, commit ts %d", binlog.CommitTs)
break ForLoop
Expand Down Expand Up @@ -473,7 +481,7 @@ ForLoop:
return cerr
}

return s.cp.Save(s.cp.TS(), 0, true /*consistent*/)
return s.cp.Save(s.cp.TS(), 0, true /*consistent*/, lastDDLSchemaVersion)
}

func findLoopBackMark(dmls []*loader.DML, info *loopbacksync.LoopBackSync) (bool, error) {
Expand Down