Skip to content

Commit

Permalink
drainer: support load table infos to save memory and add gc safepoint…
Browse files Browse the repository at this point in the history
… update (#1233) (#1271)

ref #1137
  • Loading branch information
ti-chi-bot authored Nov 16, 2023
1 parent 6e8a30d commit 9dcfd1f
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 10 deletions.
2 changes: 2 additions & 0 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type SyncerConfig struct {
EnableCausalityFlag *bool `toml:"-" json:"enable-detect-flag"`
DisableCausalityFile *bool `toml:"disable-detect" json:"disable-detect"`
EnableCausalityFile *bool `toml:"enable-detect" json:"enable-detect"`
LoadSchemaSnapshot bool `toml:"load-schema-snapshot" json:"load-schema-snapshot"`

// v2 filter rules
CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"`
Expand Down Expand Up @@ -252,6 +253,7 @@ func NewConfig() *Config {
fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant")
fs.BoolVar(cfg.SyncerCfg.DisableCausalityFlag, "disable-detect", false, "DEPRECATED, use enable-detect")
fs.BoolVar(cfg.SyncerCfg.EnableCausalityFlag, "enable-detect", true, "enable detect causality")
fs.BoolVar(&cfg.SyncerCfg.LoadSchemaSnapshot, "load-schema-snapshot", false, "init drainer schema info through pd meta interface, need to make sure checkpoint ts is not garbage collected in upstream")
fs.IntVar(&maxBinlogItemCount, "cache-binlog-count", defaultBinlogItemCount, "blurry count of binlogs in cache, limit cache size")
fs.IntVar(&cfg.SyncedCheckTime, "synced-check-time", defaultSyncedCheckTime, "if we can't detect new binlog after many minute, we think the all binlog is all synced")
fs.StringVar(new(string), "log-rotate", "", "DEPRECATED")
Expand Down
49 changes: 49 additions & 0 deletions drainer/safepoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package drainer

import (
"context"
"fmt"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/drainer/checkpoint"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

const (
drainerServiceSafePointPrefix = "drainer"
defaultDrainerGCSafePointTTL = 5 * 60
)

func updateServiceSafePoint(ctx context.Context, pdClient pd.Client, cpt checkpoint.CheckPoint, ttl int64) {
updateInterval := time.Duration(ttl/2) * time.Second
tick := time.NewTicker(updateInterval)
defer tick.Stop()
dumplingServiceSafePointID := fmt.Sprintf("%s_%d", drainerServiceSafePointPrefix, time.Now().UnixNano())
log.Info("generate drainer gc safePoint id", zap.String("id", dumplingServiceSafePointID))

for {
snapshotTS := uint64(cpt.TS())
log.Debug("update PD safePoint limit with ttl",
zap.Uint64("safePoint", snapshotTS),
zap.Int64("ttl", ttl))
for retryCnt := 0; retryCnt <= 10; retryCnt++ {
_, err := pdClient.UpdateServiceGCSafePoint(ctx, dumplingServiceSafePointID, ttl, snapshotTS)
if err == nil {
break
}
log.Debug("update PD safePoint failed", zap.Error(err), zap.Int("retryTime", retryCnt))
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
}
}
select {
case <-ctx.Done():
return
case <-tick.C:
}
}
}
5 changes: 2 additions & 3 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,8 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,

log.Debug("Handle job", zap.Stringer("job", job))

sql = job.Query
if sql == "" {
return "", "", "", errors.Errorf("[ddl job sql miss]%+v", job)
if job.Query == "" {
log.Warn("job query is empty", zap.Stringer("job", job))
}

switch job.Type {
Expand Down
6 changes: 0 additions & 6 deletions drainer/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,6 @@ func (t *schemaSuite) TestHandleDDL(c *C) {
c.Assert(err, IsNil)
c.Assert(sql, Equals, "")

// check job.Query is empty
job = &model.Job{ID: 1, State: model.JobStateDone}
_, _, sql, err = schema.handleDDL(job)
c.Assert(sql, Equals, "")
c.Assert(err, NotNil, Commentf("should return not found job.Query"))

// db info
dbInfo := &model.DBInfo{
ID: 2,
Expand Down
21 changes: 20 additions & 1 deletion drainer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/driver"
"github.com/pingcap/tipb/go-binlog"
Expand Down Expand Up @@ -200,7 +201,12 @@ func createSyncer(etcdURLs string, cp checkpoint.CheckPoint, cfg *SyncerConfig)
}
defer tiStore.Close()

jobs, err := loadHistoryDDLJobs(tiStore)
var jobs []*model.Job
if cfg.LoadSchemaSnapshot {
jobs, err = loadTableInfos(tiStore, cp.TS())
} else {
jobs, err = loadHistoryDDLJobs(tiStore)
}
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -275,6 +281,19 @@ func (s *Server) Start() error {
}
})

if s.cfg.SyncerCfg != nil && s.cfg.SyncerCfg.LoadSchemaSnapshot {
s.tg.GoNoPanic("gc_safepoint", func() {
defer func() { go s.Close() }()
pdCli, err := getPdClient(s.cfg.EtcdURLs, s.cfg.Security)
if err != nil {
log.Error("fail to create pdCli", zap.Error(err))
errCh <- err
}
updateServiceSafePoint(s.ctx, pdCli, s.cp, defaultDrainerGCSafePointTTL)
pdCli.Close()
})
}

s.tg.GoNoPanic("collect", func() {
defer func() { go s.Close() }()
s.collector.Start(s.ctx)
Expand Down
55 changes: 55 additions & 0 deletions drainer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,45 @@ func loadHistoryDDLJobs(tiStore kv.Storage) ([]*model.Job, error) {
return jobs, nil
}

// loadTableInfos loads all table infos after startTs
func loadTableInfos(tiStore kv.Storage, startTs int64) ([]*model.Job, error) {
meta := getSnapshotMetaFromTs(tiStore, startTs)
dbinfos, err := meta.ListDatabases()
if err != nil {
return nil, errors.Trace(err)
}
jobs := make([]*model.Job, 0, len(dbinfos))
version := int64(1)
for _, dbinfo := range dbinfos {
log.L().Info("load db info", zap.Stringer("db", dbinfo.Name), zap.Int64("version", version))
jobs = append(jobs, mockCreateSchemaJob(dbinfo, version))
version++
}
for _, dbinfo := range dbinfos {
tableInfos, err := meta.ListTables(dbinfo.ID)
if err != nil {
return nil, errors.Trace(err)
}
if len(tableInfos) == 0 {
continue
}
for _, tableInfo := range tableInfos {
log.L().Debug("load table info", zap.Stringer("db", dbinfo.Name), zap.Stringer("table", tableInfo.Name), zap.Int64("version", version))
}
jobs = append(jobs, &model.Job{
Type: model.ActionCreateTables,
State: model.JobStateDone,
SchemaID: dbinfo.ID,
BinlogInfo: &model.HistoryInfo{
SchemaVersion: version,
MultipleTableInfos: tableInfos,
},
})
version++
}
return jobs, nil
}

func getSnapshotMeta(tiStore kv.Storage) (*meta.Meta, error) {
version, err := tiStore.CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
Expand All @@ -203,6 +242,11 @@ func getSnapshotMeta(tiStore kv.Storage) (*meta.Meta, error) {
return meta.NewSnapshotMeta(snapshot), nil
}

func getSnapshotMetaFromTs(tiStore kv.Storage, ts int64) *meta.Meta {
snapshot := tiStore.GetSnapshot(kv.NewVersion(uint64(ts)))
return meta.NewSnapshotMeta(snapshot)
}

func genDrainerID(listenAddr string) (string, error) {
urllis, err := url.Parse(listenAddr)
if err != nil {
Expand Down Expand Up @@ -334,3 +378,14 @@ func combineFilterRules(filterRules []*bf.BinlogEventRule) []*bf.BinlogEventRule
}
return rules
}

func mockCreateSchemaJob(dbInfo *model.DBInfo, schemaVersion int64) *model.Job {
return &model.Job{
Type: model.ActionCreateSchema,
State: model.JobStateDone,
BinlogInfo: &model.HistoryInfo{
SchemaVersion: schemaVersion,
DBInfo: dbInfo,
},
}
}

0 comments on commit 9dcfd1f

Please sign in to comment.