Skip to content

Commit

Permalink
drainer: fix too much version when initializing table infos (#1237) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 2, 2023
1 parent 9ab2794 commit f883b1e
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 18 deletions.
4 changes: 2 additions & 2 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type SyncerConfig struct {
EnableCausalityFlag *bool `toml:"-" json:"enable-detect-flag"`
DisableCausalityFile *bool `toml:"disable-detect" json:"disable-detect"`
EnableCausalityFile *bool `toml:"enable-detect" json:"enable-detect"`
LoadTableInfos bool `toml:"load-table-infos" json:"load-table-infos"`
LoadSchemaSnapshot bool `toml:"load-schema-snapshot" json:"load-schema-snapshot"`

// v2 filter rules
CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"`
Expand Down Expand Up @@ -253,7 +253,7 @@ func NewConfig() *Config {
fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant")
fs.BoolVar(cfg.SyncerCfg.DisableCausalityFlag, "disable-detect", false, "DEPRECATED, use enable-detect")
fs.BoolVar(cfg.SyncerCfg.EnableCausalityFlag, "enable-detect", true, "enable detect causality")
fs.BoolVar(&cfg.SyncerCfg.LoadTableInfos, "load-table-infos", false, "load table infos")
fs.BoolVar(&cfg.SyncerCfg.LoadSchemaSnapshot, "load-schema-snapshot", false, "init drainer schema info through pd meta interface, need to make sure checkpoint ts is not garbage collected in upstream")
fs.IntVar(&maxBinlogItemCount, "cache-binlog-count", defaultBinlogItemCount, "blurry count of binlogs in cache, limit cache size")
fs.IntVar(&cfg.SyncedCheckTime, "synced-check-time", defaultSyncedCheckTime, "if we can't detect new binlog after many minute, we think the all binlog is all synced")
fs.StringVar(new(string), "log-rotate", "", "DEPRECATED")
Expand Down
4 changes: 2 additions & 2 deletions drainer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func createSyncer(etcdURLs string, cp checkpoint.CheckPoint, cfg *SyncerConfig)
defer tiStore.Close()

var jobs []*model.Job
if cfg.LoadTableInfos {
if cfg.LoadSchemaSnapshot {
jobs, err = loadTableInfos(tiStore, cp.TS())
} else {
jobs, err = loadHistoryDDLJobs(tiStore)
Expand Down Expand Up @@ -281,7 +281,7 @@ func (s *Server) Start() error {
}
})

if s.cfg.SyncerCfg != nil && s.cfg.SyncerCfg.LoadTableInfos {
if s.cfg.SyncerCfg != nil && s.cfg.SyncerCfg.LoadSchemaSnapshot {
s.tg.GoNoPanic("gc_safepoint", func() {
defer func() { go s.Close() }()
pdCli, err := getPdClient(s.cfg.EtcdURLs, s.cfg.Security)
Expand Down
27 changes: 13 additions & 14 deletions drainer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,22 @@ func loadTableInfos(tiStore kv.Storage, startTs int64) ([]*model.Job, error) {
if err != nil {
return nil, errors.Trace(err)
}
if len(tableInfos) == 0 {
continue
}
for _, tableInfo := range tableInfos {
log.L().Debug("load table info", zap.Stringer("db", dbinfo.Name), zap.Stringer("table", tableInfo.Name), zap.Int64("version", version))
jobs = append(jobs, mockCreateTableJob(tableInfo, dbinfo.ID, version))
version++
}
jobs = append(jobs, &model.Job{
Type: model.ActionCreateTables,
State: model.JobStateDone,
SchemaID: dbinfo.ID,
BinlogInfo: &model.HistoryInfo{
SchemaVersion: version,
MultipleTableInfos: tableInfos,
},
})
version++
}
return jobs, nil
}
Expand Down Expand Up @@ -379,15 +390,3 @@ func mockCreateSchemaJob(dbInfo *model.DBInfo, schemaVersion int64) *model.Job {
},
}
}

func mockCreateTableJob(tableInfo *model.TableInfo, schemaID, schemaVersion int64) *model.Job {
return &model.Job{
Type: model.ActionCreateTable,
State: model.JobStateDone,
SchemaID: schemaID,
BinlogInfo: &model.HistoryInfo{
SchemaVersion: schemaVersion,
TableInfo: tableInfo,
},
}
}

0 comments on commit f883b1e

Please sign in to comment.