Skip to content

Commit

Permalink
syncer(dm): init schemaTracker when syncer run (#6052)
Browse files Browse the repository at this point in the history
close #6014
  • Loading branch information
buchuitoudegou authored Jun 30, 2022
1 parent 98a3ad9 commit 4839e39
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 65 deletions.
96 changes: 68 additions & 28 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,14 @@ func init() {

// Tracker is used to track schema locally.
type Tracker struct {
// we're using an embedded tidb, there's no need to sync operations on it, but we may recreate(drop and create)
// a table such as when checkpoint rollback, we need to make sure others(validator for now) can't see the table
// is deleted. so we add an extra layer of synchronization for GetTableInfo/RecreateTables for now.
// The Tracker is an embedded tidb in essence, where there was basically no parallel operation at the beginning.
// However, since the validator is introduced and heavily dependent on the Tracker, we need to make sure
// the synchronization between the reading from the validator and the modification from the syncer (e.g.
// when the checkpoint is being rolled back, we have to make sure the validator can still vision the original tables)
// From this point, we add an extra layer of the synchronization for the following operations:
// 1. GetTableInfo: the validator reads table infos.
// 2. Init: when the syncer restarts, it may re-initialize the Tracker while the validator may read the Tracker at the same time.
// 3. Close: Being similar as above, the validator can read the Tracker while the syncer is closing the Tracker.
sync.RWMutex
storePath string
store kv.Storage
Expand All @@ -99,16 +104,25 @@ type DownstreamTableInfo struct {
WhereHandle *sqlmodel.WhereHandle
}

// NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve
// NewTracker simply returns an empty Tracker,
// which should be followed by an initialization before used.
func NewTracker() *Tracker {
return &Tracker{}
}

// Init initializes the Tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve
// some variable from downstream using `downstreamConn`.
// NOTE **sessionCfg is a reference to caller**.
func NewTracker(
func (tr *Tracker) Init(
ctx context.Context,
task string,
sessionCfg map[string]string,
downstreamConn *dbconn.DBConn,
logger log.Logger,
) (*Tracker, error) {
) error {
if tr == nil {
return nil
}
var (
err error
storePath string
Expand Down Expand Up @@ -148,28 +162,28 @@ func NewTracker(
var ignoredColumn interface{}
rows, err2 := downstreamConn.QuerySQL(tctx, nil, fmt.Sprintf("SHOW VARIABLES LIKE '%s'", k))
if err2 != nil {
return nil, err2
return err2
}
if rows.Next() {
var value string
if err3 := rows.Scan(&ignoredColumn, &value); err3 != nil {
return nil, err3
return err3
}
sessionCfg[k] = value
}
// nolint:sqlclosecheck
if err2 = rows.Close(); err2 != nil {
return nil, err2
return err2
}
if err2 = rows.Err(); err2 != nil {
return nil, err2
return err2
}
}
}

storePath, err = newTmpFolderForTracker(task)
if err != nil {
return nil, err
return err
}
rollbackHolder.Add(fr.FuncRollback{Name: "DeleteStorePath", Fn: func() {
_ = os.RemoveAll(storePath)
Expand All @@ -179,7 +193,7 @@ func NewTracker(
mockstore.WithStoreType(mockstore.EmbedUnistore),
mockstore.WithPath(storePath))
if err != nil {
return nil, err
return err
}
rollbackHolder.Add(fr.FuncRollback{Name: "CloseStore", Fn: func() {
_ = store.Close()
Expand All @@ -190,13 +204,13 @@ func NewTracker(

dom, err = session.BootstrapSession(store)
if err != nil {
return nil, err
return err
}
rollbackHolder.Add(fr.FuncRollback{Name: "CloseDomain", Fn: dom.Close})

se, err = session.CreateSession(store)
if err != nil {
return nil, err
return err
}
rollbackHolder.Add(fr.FuncRollback{Name: "CloseSession", Fn: se.Close})

Expand All @@ -216,13 +230,13 @@ func NewTracker(
log.L().Warn("can not set this variable", zap.Error(err))
continue
}
return nil, err
return err
}
}
for k, v := range globalVarsToSet {
err = se.GetSessionVars().SetSystemVarWithRelaxedValidation(k, v)
if err != nil {
return nil, err
return err
}
}
// skip DDL test https://github.com/pingcap/tidb/pull/33079
Expand All @@ -233,22 +247,40 @@ func NewTracker(
// exist by default. So we need to drop it first.
err = dom.DDL().DropSchema(se, model.NewCIStr("test"))
if err != nil {
return nil, err
return err
}

// init downstreamTracker
dsTracker := &downstreamTracker{
downstreamConn: downstreamConn,
tableInfos: make(map[string]*DownstreamTableInfo),
}
tr.Lock()
defer tr.Unlock()
tr.storePath = storePath
tr.store = store
tr.dom = dom
tr.se = se
tr.dsTracker = dsTracker
tr.closed.Store(false)
return nil
}

return &Tracker{
storePath: storePath,
store: store,
dom: dom,
se: se,
dsTracker: dsTracker,
}, nil
// NewTestTracker creates an empty Tracker and initializes it subsequently.
// It is useful for test.
func NewTestTracker(
ctx context.Context,
task string,
sessionCfg map[string]string,
downstreamConn *dbconn.DBConn,
logger log.Logger,
) (*Tracker, error) {
tr := NewTracker()
err := tr.Init(ctx, task, sessionCfg, downstreamConn, logger)
if err != nil {
return nil, err
}
return tr, nil
}

func newTmpFolderForTracker(task string) (string, error) {
Expand Down Expand Up @@ -392,10 +424,18 @@ func (tr *Tracker) Close() error {
if !tr.closed.CAS(false, true) {
return nil
}
tr.se.Close()
tr.dom.Close()
if err := tr.store.Close(); err != nil {
return err
// Build of the Tracker and the initialization is divided.
// these fields can possibly be nil if the Tracker is closed before the initialization.
if tr.se != nil {
tr.se.Close()
}
if tr.dom != nil {
tr.dom.Close()
}
if tr.store != nil {
if err := tr.store.Close(); err != nil {
return err
}
}
return os.RemoveAll(tr.storePath)
}
Expand Down
40 changes: 20 additions & 20 deletions dm/pkg/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,28 +92,28 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
// user give correct session config

t, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
t, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
err = t.Close()
c.Assert(err, IsNil)

// user give wrong session session, will return error
sessionCfg := map[string]string{"sql_mode": "HaHa"}
_, err = NewTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L())
_, err = NewTestTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L())
c.Assert(err, NotNil)

// discover session config failed, will return error
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", "HaHa"))
_, err = NewTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L())
_, err = NewTestTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L())
c.Assert(err, NotNil)

// empty or default config in downstream
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", defaultTestSessionCfg["sql_mode"]))
tracker, err := NewTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L())
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
err = tracker.Exec(context.Background(), "", "create database testdb;")
Expand All @@ -125,7 +125,7 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_DATE,NO_ZERO_IN_DATE"))
tracker, err = NewTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L())
tracker, err = NewTestTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L())
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
c.Assert(tracker.se.GetSessionVars().SQLMode.HasOnlyFullGroupBy(), IsTrue)
Expand All @@ -144,7 +144,7 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
// user set session config, get tracker config from downstream
// no `STRICT_TRANS_TABLES`, no error now
sessionCfg = map[string]string{"sql_mode": "NO_ZERO_DATE,NO_ZERO_IN_DATE,ANSI_QUOTES"}
tracker, err = NewTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L())
tracker, err = NewTestTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)

Expand Down Expand Up @@ -179,7 +179,7 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
err = tracker.Close()
c.Assert(err, IsNil)

tracker, err = NewTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L())
tracker, err = NewTestTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand All @@ -203,7 +203,7 @@ func (s *trackerSuite) TestDDL(c *C) {
Name: "foo",
}

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -277,7 +277,7 @@ func (s *trackerSuite) TestDDL(c *C) {
func (s *trackerSuite) TestGetSingleColumnIndices(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -320,7 +320,7 @@ func (s *trackerSuite) TestGetSingleColumnIndices(c *C) {
func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -352,7 +352,7 @@ func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) {
func (s *trackerSuite) TestMultiDrop(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -404,7 +404,7 @@ func (s *trackerSuite) TestCreateTableIfNotExists(c *C) {
Name: "foo",
}

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -486,7 +486,7 @@ func (s *trackerSuite) TestCreateTableIfNotExists(c *C) {

func (s *trackerSuite) TestBatchCreateTableIfNotExist(c *C) {
log.SetLevel(zapcore.ErrorLevel)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -604,7 +604,7 @@ func (s *trackerSuite) TestAllSchemas(c *C) {
log.SetLevel(zapcore.ErrorLevel)
ctx := context.Background()

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -700,7 +700,7 @@ func (s *trackerSuite) TestNotSupportedVariable(c *C) {
oldSessionVar := map[string]string{
"tidb_enable_change_column_type": "ON",
}
tracker, err := NewTracker(context.Background(), "test-tracker", oldSessionVar, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", oldSessionVar, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand All @@ -720,7 +720,7 @@ func (s *trackerSuite) TestInitDownStreamSQLModeAndParser(c *C) {
baseConn := conn.NewBaseConn(con, nil)
dbConn := dbconn.NewDBConn(s.cfg, baseConn)

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -757,7 +757,7 @@ func (s *trackerSuite) TestGetDownStreamIndexInfo(c *C) {
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -799,7 +799,7 @@ func (s *trackerSuite) TestReTrackDownStreamIndex(c *C) {
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -891,7 +891,7 @@ func (s *trackerSuite) TestVarchar20000(c *C) {
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -931,7 +931,7 @@ func (s *trackerSuite) TestPlacementRule(c *C) {
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down
4 changes: 2 additions & 2 deletions dm/syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *testCheckpointSuite) SetUpSuite(c *C) {
}
)

s.tracker, err = schema.NewTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, nil, dlog.L())
s.tracker, err = schema.NewTestTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, nil, dlog.L())
c.Assert(err, IsNil)
}

Expand Down Expand Up @@ -499,7 +499,7 @@ func TestRemoteCheckPointLoadIntoSchemaTracker(t *testing.T) {
dbConn, err := db.Conn(ctx)
require.NoError(t, err)
downstreamTrackConn := dbconn.NewDBConn(cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{}))
schemaTracker, err := schema.NewTracker(ctx, cfg.Name, defaultTestSessionCfg, downstreamTrackConn, dlog.L())
schemaTracker, err := schema.NewTestTracker(ctx, cfg.Name, defaultTestSessionCfg, downstreamTrackConn, dlog.L())
require.NoError(t, err)
defer schemaTracker.Close() //nolint

Expand Down
2 changes: 1 addition & 1 deletion dm/syncer/data_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func TestValidatorDoValidate(t *testing.T) {
dbConn, err := db.Conn(context.Background())
require.NoError(t, err)
syncerObj.downstreamTrackConn = dbconn.NewDBConn(cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{}))
syncerObj.schemaTracker, err = schema.NewTracker(context.Background(), cfg.Name, defaultTestSessionCfg, syncerObj.downstreamTrackConn, log.L())
syncerObj.schemaTracker, err = schema.NewTestTracker(context.Background(), cfg.Name, defaultTestSessionCfg, syncerObj.downstreamTrackConn, log.L())
defer syncerObj.schemaTracker.Close()
require.NoError(t, err)
require.NoError(t, syncerObj.schemaTracker.CreateSchemaIfNotExists(schemaName))
Expand Down
Loading

0 comments on commit 4839e39

Please sign in to comment.