Skip to content

Commit

Permalink
drainer: support load table infos to save memory and add gc safepoint…
Browse files Browse the repository at this point in the history
… update (#1233) (#1268)

ref #1137
  • Loading branch information
lichunzhu authored Oct 13, 2023
1 parent d334e20 commit 984d839
Show file tree
Hide file tree
Showing 18 changed files with 143 additions and 26 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,4 @@ tools/bin/revive: tools/check/go.mod

tools/bin/golangci-lint: tools/check/go.mod
cd tools/check; \
GOBIN=$(CURDIR)/tools/bin $(GO) install github.com/golangci/golangci-lint/cmd/golangci-lint
GOBIN=$(CURDIR)/tools/bin $(GO) install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.53.3
1 change: 0 additions & 1 deletion drainer/binlog_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func newBinlogItem(b *pb.Binlog, nodeID string) *binlogItem {
return itemp
}

//
func (b *binlogItem) SetJob(job *model.Job) {
b.job = job
}
2 changes: 2 additions & 0 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type SyncerConfig struct {
EnableCausalityFlag *bool `toml:"-" json:"enable-detect-flag"`
DisableCausalityFile *bool `toml:"disable-detect" json:"disable-detect"`
EnableCausalityFile *bool `toml:"enable-detect" json:"enable-detect"`
LoadSchemaSnapshot bool `toml:"load-schema-snapshot" json:"load-schema-snapshot"`

// v2 filter rules
CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"`
Expand Down Expand Up @@ -252,6 +253,7 @@ func NewConfig() *Config {
fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant")
fs.BoolVar(cfg.SyncerCfg.DisableCausalityFlag, "disable-detect", false, "DEPRECATED, use enable-detect")
fs.BoolVar(cfg.SyncerCfg.EnableCausalityFlag, "enable-detect", true, "enable detect causality")
fs.BoolVar(&cfg.SyncerCfg.LoadSchemaSnapshot, "load-schema-snapshot", false, "init drainer schema info through pd meta interface, need to make sure checkpoint ts is not garbage collected in upstream")
fs.IntVar(&maxBinlogItemCount, "cache-binlog-count", defaultBinlogItemCount, "blurry count of binlogs in cache, limit cache size")
fs.IntVar(&cfg.SyncedCheckTime, "synced-check-time", defaultSyncedCheckTime, "if we can't detect new binlog after many minute, we think the all binlog is all synced")
fs.StringVar(new(string), "log-rotate", "", "DEPRECATED")
Expand Down
4 changes: 2 additions & 2 deletions drainer/loopbacksync/loopbacksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ var CreateMarkTableDDL string = fmt.Sprintf("CREATE TABLE If Not Exists %s (%s b
// CreateMarkDBDDL is DDL to create the database of mark table.
var CreateMarkDBDDL = "create database IF NOT EXISTS retl;"

//LoopBackSync loopback sync info
// LoopBackSync loopback sync info
type LoopBackSync struct {
ChannelID int64
LoopbackControl bool
SyncDDL bool
}

//NewLoopBackSyncInfo return LoopBackSyncInfo objec
// NewLoopBackSyncInfo return LoopBackSyncInfo objec
func NewLoopBackSyncInfo(ChannelID int64, LoopbackControl, SyncDDL bool) *LoopBackSync {
l := &LoopBackSync{
ChannelID: ChannelID,
Expand Down
49 changes: 49 additions & 0 deletions drainer/safepoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package drainer

import (
"context"
"fmt"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/drainer/checkpoint"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

const (
drainerServiceSafePointPrefix = "drainer"
defaultDrainerGCSafePointTTL = 5 * 60
)

func updateServiceSafePoint(ctx context.Context, pdClient pd.Client, cpt checkpoint.CheckPoint, ttl int64) {
updateInterval := time.Duration(ttl/2) * time.Second
tick := time.NewTicker(updateInterval)
defer tick.Stop()
dumplingServiceSafePointID := fmt.Sprintf("%s_%d", drainerServiceSafePointPrefix, time.Now().UnixNano())
log.Info("generate drainer gc safePoint id", zap.String("id", dumplingServiceSafePointID))

for {
snapshotTS := uint64(cpt.TS())
log.Debug("update PD safePoint limit with ttl",
zap.Uint64("safePoint", snapshotTS),
zap.Int64("ttl", ttl))
for retryCnt := 0; retryCnt <= 10; retryCnt++ {
_, err := pdClient.UpdateServiceGCSafePoint(ctx, dumplingServiceSafePointID, ttl, snapshotTS)
if err == nil {
break
}
log.Debug("update PD safePoint failed", zap.Error(err), zap.Int("retryTime", retryCnt))
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
}
}
select {
case <-ctx.Done():
return
case <-tick.C:
}
}
}
5 changes: 2 additions & 3 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,8 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,

log.Debug("Handle job", zap.Stringer("job", job))

sql = job.Query
if sql == "" {
return "", "", "", errors.Errorf("[ddl job sql miss]%+v", job)
if job.Query == "" {
log.Warn("job query is empty", zap.Stringer("job", job))
}

switch job.Type {
Expand Down
6 changes: 0 additions & 6 deletions drainer/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,6 @@ func (t *schemaSuite) TestHandleDDL(c *C) {
c.Assert(err, IsNil)
c.Assert(sql, Equals, "")

// check job.Query is empty
job = &model.Job{ID: 1, State: model.JobStateDone}
_, _, sql, err = schema.handleDDL(job)
c.Assert(sql, Equals, "")
c.Assert(err, NotNil, Commentf("should return not found job.Query"))

// db info
dbInfo := &model.DBInfo{
ID: 2,
Expand Down
21 changes: 20 additions & 1 deletion drainer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/driver"
"github.com/pingcap/tipb/go-binlog"
Expand Down Expand Up @@ -200,7 +201,12 @@ func createSyncer(etcdURLs string, cp checkpoint.CheckPoint, cfg *SyncerConfig)
}
defer tiStore.Close()

jobs, err := loadHistoryDDLJobs(tiStore)
var jobs []*model.Job
if cfg.LoadSchemaSnapshot {
jobs, err = loadTableInfos(tiStore, cp.TS())
} else {
jobs, err = loadHistoryDDLJobs(tiStore)
}
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -275,6 +281,19 @@ func (s *Server) Start() error {
}
})

if s.cfg.SyncerCfg != nil && s.cfg.SyncerCfg.LoadSchemaSnapshot {
s.tg.GoNoPanic("gc_safepoint", func() {
defer func() { go s.Close() }()
pdCli, err := getPdClient(s.cfg.EtcdURLs, s.cfg.Security)
if err != nil {
log.Error("fail to create pdCli", zap.Error(err))
errCh <- err
}
updateServiceSafePoint(s.ctx, pdCli, s.cp, defaultDrainerGCSafePointTTL)
pdCli.Close()
})
}

s.tg.GoNoPanic("collect", func() {
defer func() { go s.Close() }()
s.collector.Start(s.ctx)
Expand Down
2 changes: 1 addition & 1 deletion drainer/sync/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
2 changes: 1 addition & 1 deletion drainer/sync/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
2 changes: 1 addition & 1 deletion drainer/sync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
2 changes: 1 addition & 1 deletion drainer/translator/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
55 changes: 55 additions & 0 deletions drainer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,45 @@ func loadHistoryDDLJobs(tiStore kv.Storage) ([]*model.Job, error) {
return jobs, nil
}

// loadTableInfos loads all table infos after startTs
func loadTableInfos(tiStore kv.Storage, startTs int64) ([]*model.Job, error) {
meta := getSnapshotMetaFromTs(tiStore, startTs)
dbinfos, err := meta.ListDatabases()
if err != nil {
return nil, errors.Trace(err)
}
jobs := make([]*model.Job, 0, len(dbinfos))
version := int64(1)
for _, dbinfo := range dbinfos {
log.L().Info("load db info", zap.Stringer("db", dbinfo.Name), zap.Int64("version", version))
jobs = append(jobs, mockCreateSchemaJob(dbinfo, version))
version++
}
for _, dbinfo := range dbinfos {
tableInfos, err := meta.ListTables(dbinfo.ID)
if err != nil {
return nil, errors.Trace(err)
}
if len(tableInfos) == 0 {
continue
}
for _, tableInfo := range tableInfos {
log.L().Debug("load table info", zap.Stringer("db", dbinfo.Name), zap.Stringer("table", tableInfo.Name), zap.Int64("version", version))
}
jobs = append(jobs, &model.Job{
Type: model.ActionCreateTables,
State: model.JobStateDone,
SchemaID: dbinfo.ID,
BinlogInfo: &model.HistoryInfo{
SchemaVersion: version,
MultipleTableInfos: tableInfos,
},
})
version++
}
return jobs, nil
}

func getSnapshotMeta(tiStore kv.Storage) (*meta.Meta, error) {
version, err := tiStore.CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
Expand All @@ -203,6 +242,11 @@ func getSnapshotMeta(tiStore kv.Storage) (*meta.Meta, error) {
return meta.NewSnapshotMeta(snapshot), nil
}

func getSnapshotMetaFromTs(tiStore kv.Storage, ts int64) *meta.Meta {
snapshot := tiStore.GetSnapshot(kv.NewVersion(uint64(ts)))
return meta.NewSnapshotMeta(snapshot)
}

func genDrainerID(listenAddr string) (string, error) {
urllis, err := url.Parse(listenAddr)
if err != nil {
Expand Down Expand Up @@ -334,3 +378,14 @@ func combineFilterRules(filterRules []*bf.BinlogEventRule) []*bf.BinlogEventRule
}
return rules
}

func mockCreateSchemaJob(dbInfo *model.DBInfo, schemaVersion int64) *model.Job {
return &model.Job{
Type: model.ActionCreateSchema,
State: model.JobStateDone,
BinlogInfo: &model.HistoryInfo{
SchemaVersion: schemaVersion,
DBInfo: dbInfo,
},
}
}
4 changes: 2 additions & 2 deletions pkg/loader/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func Merge(v bool) Option {
}
}

//DestinationDBType set destDBType option.
// DestinationDBType set destDBType option.
func DestinationDBType(t string) Option {
destDBType := DBTypeUnknown
if t == "oracle" {
Expand All @@ -209,7 +209,7 @@ func DestinationDBType(t string) Option {
}
}

//SetloopBackSyncInfo set loop back sync info of loader
// SetloopBackSyncInfo set loop back sync info of loader
func SetloopBackSyncInfo(loopBackSyncInfo *loopbacksync.LoopBackSync) Option {
return func(o *options) {
o.loopBackSyncInfo = loopBackSyncInfo
Expand Down
2 changes: 1 addition & 1 deletion pkg/loader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func CreateDB(user string, password string, host string, port int, tls *tls.Conf
return CreateDBWithSQLMode(user, password, host, port, tls, nil, nil, time.Minute)
}

//CreateOracleDB create Oracle DB connection and return it
// CreateOracleDB create Oracle DB connection and return it
func CreateOracleDB(user string, password string, host string, port int, serviceName, connectString string) (db *gosql.DB, err error) {
loc, err := time.LoadLocation("Local")
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func GetTidbPosition(db *sql.DB) (int64, error) {
return ts, nil
}

//GetOraclePosition return oracle scn
// GetOraclePosition return oracle scn
func GetOraclePosition(db *sql.DB) (int64, error) {
rows, err := db.Query("select dbms_flashback.get_system_change_number as current_scn from dual")
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pump/storage/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func newItemGenerator(txnNum int32, maxLatency int64, fakeTxnPerNum int32) <-cha
return items
}

/// sorter
// / sorter
type sortItem struct {
start int64
commit int64
Expand Down
6 changes: 3 additions & 3 deletions pump/storage/vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,9 @@ func (vlog *valueLog) scanRequests(start valuePointer, fn func(*request) error)

// scan visits binlogs in order starting from the specified position.
// There are two limitations to the usage of scan:
// 1. Binlogs added in new logFiles after scan starts are not visible, so don't assume
// that every single binlog added would be visited
// 2. If GC is running concurrently, logFiles may be closed and deleted, thus breaking the scanning.
// 1. Binlogs added in new logFiles after scan starts are not visible, so don't assume
// that every single binlog added would be visited
// 2. If GC is running concurrently, logFiles may be closed and deleted, thus breaking the scanning.
func (vlog *valueLog) scan(start valuePointer, fn func(vp valuePointer, record *Record) error) error {
vlog.gcLock.Lock()
defer vlog.gcLock.Unlock()
Expand Down

0 comments on commit 984d839

Please sign in to comment.