Skip to content

Commit

Permalink
tracker(dm): close and recreate tracker when pause and resume (#5350) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 15, 2022
1 parent 8af519b commit 74ac962
Show file tree
Hide file tree
Showing 30 changed files with 333 additions and 166 deletions.
1 change: 1 addition & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ ErrSyncerReplaceEventNotExist,[code=36066:class=sync-unit:scope=internal:level=h
ErrSyncerParseDDL,[code=36067:class=sync-unit:scope=internal:level=high], "Message: parse DDL: %s, Workaround: Please confirm your DDL statement is correct and needed. For TiDB compatible DDL, see https://docs.pingcap.com/tidb/stable/mysql-compatibility#ddl. You can use `handle-error` command to skip or replace the DDL or add a binlog filter rule to ignore it if the DDL is not needed."
ErrSyncerUnsupportedStmt,[code=36068:class=sync-unit:scope=internal:level=high], "Message: `%s` statement not supported in %s mode"
ErrSyncerGetEvent,[code=36069:class=sync-unit:scope=upstream:level=high], "Message: get binlog event error: %v, Workaround: Please check if the binlog file could be parsed by `mysqlbinlog`."
ErrSyncerDownstreamTableNotFound,[code=36070:class=sync-unit:scope=internal:level=high], "Message: downstream table %s not found"
ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium], "Message: nil request not valid"
ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium], "Message: op %s not supported"
ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium], "Message: operate request without --sharding specified not valid"
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func (w *SourceWorker) UpdateSubTask(ctx context.Context, cfg *config.SubTaskCon
return st.Update(ctx, cfg)
}

// OperateSubTask stop/resume/pause sub task.
// OperateSubTask stop/resume/pause sub task.
func (w *SourceWorker) OperateSubTask(name string, op pb.TaskOp) error {
w.Lock()
defer w.Unlock()
Expand Down
6 changes: 6 additions & 0 deletions dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2026,6 +2026,12 @@ description = ""
workaround = "Please check if the binlog file could be parsed by `mysqlbinlog`."
tags = ["upstream", "high"]

[error.DM-sync-unit-36070]
message = "downstream table %s not found"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-dm-master-38001]
message = "nil request not valid"
description = ""
Expand Down
20 changes: 16 additions & 4 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"strings"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-tools/pkg/filter"
tidbConfig "github.com/pingcap/tidb/config"
Expand All @@ -35,7 +36,9 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
unistoreConfig "github.com/pingcap/tidb/store/mockstore/unistore/config"
"github.com/pingcap/tidb/types"
"go.uber.org/atomic"
"go.uber.org/zap"

tcontext "github.com/pingcap/tiflow/dm/pkg/context"
Expand All @@ -60,13 +63,19 @@ var (
}
)

func init() {
unistoreConfig.DefaultConf.Engine.VlogFileSize = 4 * units.MiB
unistoreConfig.DefaultConf.Engine.L1Size = 128 * units.MiB
}

// Tracker is used to track schema locally.
type Tracker struct {
storePath string
store kv.Storage
dom *domain.Domain
se session.Session
dsTracker *downstreamTracker
closed atomic.Bool
}

// downstreamTracker tracks downstream schema.
Expand Down Expand Up @@ -265,10 +274,7 @@ func (tr *Tracker) GetCreateTable(ctx context.Context, table *filter.Table) (str

row := req.GetRow(0)
str := row.GetString(1) // the first column is the table name.
// returned as single line.
str = strings.ReplaceAll(str, "\n", "")
str = strings.ReplaceAll(str, " ", " ")
return str, nil
return utils.CreateTableSQLToOneRow(str), nil
}

// AllSchemas returns all schemas visible to the tracker (excluding system tables).
Expand Down Expand Up @@ -348,6 +354,12 @@ func (tr *Tracker) Reset() error {

// Close close a tracker.
func (tr *Tracker) Close() error {
if tr == nil {
return nil
}
if !tr.closed.CAS(false, true) {
return nil
}
tr.se.Close()
tr.dom.Close()
if err := tr.store.Close(); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions dm/pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ const (
codeSyncerParseDDL
codeSyncerUnsupportedStmt
codeSyncerGetEvent
codeSyncerDownstreamTableNotFound
)

// DM-master error code.
Expand Down Expand Up @@ -1067,6 +1068,7 @@ var (
ErrSyncerParseDDL = New(codeSyncerParseDDL, ClassSyncUnit, ScopeInternal, LevelHigh, "parse DDL: %s", "Please confirm your DDL statement is correct and needed. For TiDB compatible DDL, see https://docs.pingcap.com/tidb/stable/mysql-compatibility#ddl. You can use `handle-error` command to skip or replace the DDL or add a binlog filter rule to ignore it if the DDL is not needed.")
ErrSyncerUnsupportedStmt = New(codeSyncerUnsupportedStmt, ClassSyncUnit, ScopeInternal, LevelHigh, "`%s` statement not supported in %s mode", "")
ErrSyncerGetEvent = New(codeSyncerGetEvent, ClassSyncUnit, ScopeUpstream, LevelHigh, "get binlog event error: %v", "Please check if the binlog file could be parsed by `mysqlbinlog`.")
ErrSyncerDownstreamTableNotFound = New(codeSyncerDownstreamTableNotFound, ClassSyncUnit, ScopeInternal, LevelHigh, "downstream table %s not found", "")

// DM-master error.
ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid", "")
Expand Down
7 changes: 7 additions & 0 deletions dm/pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,3 +632,10 @@ func GetTableCreateSQL(ctx context.Context, conn *sql.Conn, tableID string) (sql
}
return createStr, nil
}

// CreateTableSQLToOneRow formats the result of SHOW CREATE TABLE to one row.
func CreateTableSQLToOneRow(sql string) string {
sql = strings.ReplaceAll(sql, "\n", "")
sql = strings.ReplaceAll(sql, " ", " ")
return sql
}
8 changes: 8 additions & 0 deletions dm/pkg/utils/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package utils
import (
"context"
"strconv"
"testing"
"time"

"github.com/DATA-DOG/go-sqlmock"
Expand All @@ -25,6 +26,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
tmysql "github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"

"github.com/pingcap/tiflow/dm/pkg/gtid"
)
Expand Down Expand Up @@ -458,3 +460,9 @@ func (t *testDBSuite) TestAddGSetWithPurged(c *C) {
c.Assert(originSet, DeepEquals, tc.originGSet)
}
}

func TestCreateTableSQLToOneRow(t *testing.T) {
input := "CREATE TABLE `t1` (\n `id` bigint(20) NOT NULL,\n `c1` varchar(20) DEFAULT NULL,\n `c2` varchar(20) DEFAULT NULL,\n PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */\n) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin"
expected := "CREATE TABLE `t1` ( `id` bigint(20) NOT NULL, `c1` varchar(20) DEFAULT NULL, `c2` varchar(20) DEFAULT NULL, PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin"
require.Equal(t, expected, CreateTableSQLToOneRow(input))
}
115 changes: 68 additions & 47 deletions dm/syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (b *binlogPoint) flush() {
b.flushedTI = b.ti
}

func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (isSchemaChanged bool) {
func (b *binlogPoint) rollback() {
b.Lock()
defer b.Unlock()

Expand All @@ -120,18 +120,16 @@ func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (is

// NOTE: no `Equal` function for `model.TableInfo` exists now, so we compare `pointer` directly,
// and after a new DDL applied to the schema, the returned pointer of `model.TableInfo` changed now.
trackedTi, _ := schemaTracker.GetTableInfo(&filter.Table{Schema: schema, Name: b.ti.Name.O}) // ignore the returned error, only compare `trackerTi` is enough.
// may three versions of schema exist:
// - the one tracked in the TiDB-with-mockTiKV.
// there may be three versions of schema:
// - the one tracked in the schema tracker (TiDB-with-unistore).
// - the one in the checkpoint but not flushed.
// - the one in the checkpoint and flushed.
// if any of them are not equal, then we rollback them:
// schema tracker will be closed after task is paused, and it will load all schemas from checkpoint when task resumes.
// if the later two are not equal, then we rollback them:
// - set the one in the checkpoint but not flushed to the one flushed.
// - set the one tracked to the one in the checkpoint by the caller of this method (both flushed and not flushed are the same now)
if isSchemaChanged = (trackedTi != b.ti) || (b.ti != b.flushedTI); isSchemaChanged {
if b.ti != b.flushedTI {
b.ti = b.flushedTI
}
return
}

func (b *binlogPoint) outOfDate() bool {
Expand Down Expand Up @@ -237,6 +235,9 @@ type CheckPoint interface {
// TablePoint returns all table's stream checkpoint
TablePoint() map[string]map[string]binlog.Location

// GetTableInfo returns the saved table info from table checkpoint for the given table, return nil when not found
GetTableInfo(schema string, table string) *model.TableInfo

// FlushedGlobalPoint returns the flushed global binlog stream's checkpoint
// corresponding to to Meta.Pos and gtid
FlushedGlobalPoint() binlog.Location
Expand All @@ -250,13 +251,16 @@ type CheckPoint interface {
GetFlushedTableInfo(table *filter.Table) *model.TableInfo

// Rollback rolls global checkpoint and all table checkpoints back to flushed checkpoints
Rollback(schemaTracker *schema.Tracker)
Rollback()

// String return text of global position
String() string

// CheckAndUpdate check the checkpoint data consistency and try to fix them if possible
CheckAndUpdate(ctx context.Context, schemas map[string]string, tables map[string]map[string]string) error

// LoadIntoSchemaTracker loads table infos of all points into schema tracker.
LoadIntoSchemaTracker(ctx context.Context, schemaTracker *schema.Tracker) error
}

// RemoteCheckPoint implements CheckPoint
Expand Down Expand Up @@ -596,10 +600,16 @@ func (cp *RemoteCheckPoint) FlushPointWithTableInfo(tctx *tcontext.Context, tabl
args := make([][]interface{}, 0, 10)
point := newBinlogPoint(binlog.NewLocation(cp.cfg.Flavor), binlog.NewLocation(cp.cfg.Flavor), nil, nil, cp.cfg.EnableGTID)

if tablePoints, ok := cp.points[sourceSchema]; ok {
if p, ok2 := tablePoints[sourceTable]; ok2 {
point = p
}
tablePoints, ok := cp.points[sourceSchema]
if !ok {
tablePoints = map[string]*binlogPoint{}
cp.points[sourceSchema] = tablePoints
}
p, ok2 := tablePoints[sourceTable]
if ok2 {
point = p
} else {
tablePoints[sourceTable] = point
}

tiBytes, err := json.Marshal(ti)
Expand Down Expand Up @@ -676,6 +686,21 @@ func (cp *RemoteCheckPoint) TablePoint() map[string]map[string]binlog.Location {
return tablePoint
}

func (cp *RemoteCheckPoint) GetTableInfo(schema string, table string) *model.TableInfo {
cp.RLock()
defer cp.RUnlock()

tables, ok := cp.points[schema]
if !ok {
return nil
}
tablePoint, ok := tables[table]
if !ok {
return nil
}
return tablePoint.TableInfo()
}

// FlushedGlobalPoint implements CheckPoint.FlushedGlobalPoint.
func (cp *RemoteCheckPoint) FlushedGlobalPoint() binlog.Location {
cp.RLock()
Expand All @@ -700,10 +725,10 @@ func (cp *RemoteCheckPoint) CheckGlobalPoint() bool {
}

// Rollback implements CheckPoint.Rollback.
func (cp *RemoteCheckPoint) Rollback(schemaTracker *schema.Tracker) {
func (cp *RemoteCheckPoint) Rollback() {
cp.RLock()
defer cp.RUnlock()
cp.globalPoint.rollback(schemaTracker, "")
cp.globalPoint.rollback()
for schemaName, mSchema := range cp.points {
for tableName, point := range mSchema {
table := &filter.Table{
Expand All @@ -712,38 +737,7 @@ func (cp *RemoteCheckPoint) Rollback(schemaTracker *schema.Tracker) {
}
logger := cp.logCtx.L().WithFields(zap.Stringer("table", table))
logger.Debug("try to rollback checkpoint", log.WrapStringerField("checkpoint", point))
from := point.MySQLLocation()
if point.rollback(schemaTracker, schemaName) {
logger.Info("rollback checkpoint", zap.Stringer("from", from), zap.Stringer("to", point.FlushedMySQLLocation()))
// schema changed
if err := schemaTracker.DropTable(table); err != nil {
logger.Warn("failed to drop table from schema tracker", log.ShortError(err))
}
if point.ti != nil {
// TODO: Figure out how to recover from errors.
if err := schemaTracker.CreateSchemaIfNotExists(schemaName); err != nil {
logger.Error("failed to rollback schema on schema tracker: cannot create schema", log.ShortError(err))
}
if err := schemaTracker.CreateTableIfNotExists(table, point.ti); err != nil {
logger.Error("failed to rollback schema on schema tracker: cannot create table", log.ShortError(err))
}
}
}
}
}

// drop any tables in the tracker if no corresponding checkpoint exists.
for _, schema := range schemaTracker.AllSchemas() {
_, ok1 := cp.points[schema.Name.O]
for _, table := range schema.Tables {
var ok2 bool
if ok1 {
_, ok2 = cp.points[schema.Name.O][table.Name.O]
}
if !ok2 {
err := schemaTracker.DropTable(&filter.Table{Schema: schema.Name.O, Name: table.Name.O})
cp.logCtx.L().Info("drop table in schema tracker because no checkpoint exists", zap.String("schema", schema.Name.O), zap.String("table", table.Name.O), log.ShortError(err))
}
point.rollback()
}
}
}
Expand Down Expand Up @@ -899,6 +893,33 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error {
return terror.WithScope(terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError), terror.ScopeDownstream)
}

// LoadIntoSchemaTracker loads table infos of all points into schema tracker.
func (cp *RemoteCheckPoint) LoadIntoSchemaTracker(ctx context.Context, schemaTracker *schema.Tracker) error {
cp.RLock()
defer cp.RUnlock()

for cpSchema, mSchema := range cp.points {
err := schemaTracker.CreateSchemaIfNotExists(cpSchema)
if err != nil {
return err
}
for cpTable, point := range mSchema {
// for create database DDL, we'll create a table point with no table name and table info, need to skip.
if point.flushedTI == nil {
continue
}
cp.logCtx.L().Debug("will init table info in schema tracker",
zap.String("database", cpSchema),
zap.String("table", cpTable))
err := schemaTracker.CreateTableIfNotExists(&filter.Table{Schema: cpSchema, Name: cpTable}, point.flushedTI)
if err != nil {
return err
}
}
}
return nil
}

// CheckAndUpdate check the checkpoint data consistency and try to fix them if possible.
func (cp *RemoteCheckPoint) CheckAndUpdate(ctx context.Context, schemas map[string]string, tables map[string]map[string]string) error {
cp.Lock()
Expand Down
Loading

0 comments on commit 74ac962

Please sign in to comment.