diff --git a/Makefile b/Makefile index 592fbf51..38a312e2 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,9 @@ LDFLAGS += -X "github.com/pingcap/dumpling/v4/cli.GitHash=$(shell git rev-parse LDFLAGS += -X "github.com/pingcap/dumpling/v4/cli.GitBranch=$(shell git rev-parse --abbrev-ref HEAD)" LDFLAGS += -X "github.com/pingcap/dumpling/v4/cli.GoVersion=$(shell go version)" +FAILPOINT_ENABLE := $$(find $$PWD/ -type d | grep -vE "(\.git|tools)" | xargs bin/failpoint-ctl enable) +FAILPOINT_DISABLE := $$(find $$PWD/ -type d | grep -vE "(\.git|tools)" | xargs bin/failpoint-ctl disable) + GO = go GOLDFLAGS = -ldflags '$(LDFLAGS)' ifeq ("$(WITH_RACE)", "1") @@ -17,8 +20,21 @@ build: bin/dumpling bin/%: cmd/%/main.go $(wildcard v4/**/*.go) $(GO) build $(GOLDFLAGS) -tags codes -o $@ $< -test: - $(GO) list ./... | xargs $(GO) test $(GOLDFLAGS) -coverprofile=coverage.txt -covermode=atomic +test: failpoint-enable + $(GO) list ./... | xargs $(GO) test $(GOLDFLAGS) -coverprofile=coverage.txt -covermode=atomic ||{ $(FAILPOINT_DISABLE); exit 1; } + @make failpoint-disable + +integration_test: failpoint-enable bin/dumpling + @make failpoint-disable + ./tests/run.sh ||{ $(FAILPOINT_DISABLE); exit 1; } + +bin/failpoint-ctl: go.mod + $(GO) build -o $@ github.com/pingcap/failpoint/failpoint-ctl + +failpoint-enable: bin/failpoint-ctl +# Converting gofail failpoints... + @$(FAILPOINT_ENABLE) -integration_test: bin/dumpling - ./tests/run.sh +failpoint-disable: bin/failpoint-ctl +# Restoring gofail failpoints... + @$(FAILPOINT_DISABLE) diff --git a/go.mod b/go.mod index 6703c4b2..e4aeccae 100644 --- a/go.mod +++ b/go.mod @@ -9,13 +9,13 @@ require ( github.com/go-sql-driver/mysql v1.5.0 github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12 github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011 + github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce github.com/pingcap/log v0.0.0-20200511115504-543df19646ad github.com/pingcap/pd/v4 v4.0.0 github.com/pingcap/tidb-tools v4.0.0-rc.2.0.20200521050818-6dd445d83fe0+incompatible github.com/pkg/errors v0.9.1 github.com/soheilhy/cmux v0.1.4 github.com/spf13/pflag v1.0.3 - github.com/stretchr/testify v1.5.1 // indirect go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738 go.uber.org/zap v1.14.0 golang.org/x/mod v0.3.0 // indirect diff --git a/go.sum b/go.sum index 8cf84082..802c4afe 100644 --- a/go.sum +++ b/go.sum @@ -228,9 +228,12 @@ github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12/go.mod h1:PYMCGwN0JH github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9 h1:KH4f4Si9XK6/IW50HtoaiLIFHGkapOM6w83za47UYik= github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= +github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011 h1:58naV4XMEqm0hl9LcYo6cZoGBGiLtefMQMF/vo3XLgQ= github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI= +github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMtVcOkjUcuQKh+YrluSo7+7YMCQSzy30= +github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29 h1:NpW1OuYrIl+IQrSsVbtyHpHpazmSCHy+ysrOixY0xY4= @@ -270,6 +273,7 @@ github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 h1:tB9NOR21++IjLyVx3/PCPhWMwqGNCMQEH96A6dMZ/gc= github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc= diff --git a/tests/consistency/run.sh b/tests/consistency/run.sh new file mode 100644 index 00000000..fe5379c6 --- /dev/null +++ b/tests/consistency/run.sh @@ -0,0 +1,60 @@ +#!/bin/sh + +set -eu +cur=$(cd `dirname $0`; pwd) + +DB_NAME="mysql_consistency" +TABLE_NAME="t" + +# drop database on mysql +run_sql "drop database if exists \`$DB_NAME\`;" + +# build data on mysql +run_sql "create database $DB_NAME;" +run_sql "create table $DB_NAME.$TABLE_NAME (a int(255));" + +# insert 100 records +run_sql "insert into $DB_NAME.$TABLE_NAME values $(seq -s, 100 | sed 's/,*$//g' | sed "s/[0-9]*/('1')/g");" + +# dumping with consistency flush +export DUMPLING_TEST_DATABASE=$DB_NAME +export GO_FAILPOINTS="github.com/pingcap/dumpling/v4/export/ConsistencyCheck=1*sleep(5000)" +run_dumpling & +# wait dumpling process to start to sleep +sleep 2 + +# record metadata info +metadata=`run_sql "show master status;"` +metaLog=`echo $metadata | awk -F 'File:' '{print $2}' | awk '{print $1}'` +metaPos=`echo $metadata | awk -F 'Position:' '{print $2}' | awk '{print $1}'` +metaGTID=`echo $metadata | awk -F 'Executed_Gtid_Set:' '{print $2}' | awk '{print $1}'` +# insert 100 more records, test whether dumpling will dump these data out +run_sql "insert into $DB_NAME.$TABLE_NAME values $(seq -s, 100 | sed 's/,*$//g' | sed "s/[0-9]*/('1')/g");" + +wait + +# check data record count +cnt=`grep -o "(1)" ${DUMPLING_OUTPUT_DIR}/${DB_NAME}.${TABLE_NAME}.0.sql|wc -l` +echo "1st records count is ${cnt}" +[ $cnt = 100 ] + +# check metadata +echo "metaLog: $metaLog" +echo "metaPos: $metaPos" +echo "metaGTID: $metaGTID" +if [ $metaLog != "" ]; then +[ `grep -o "Log: $metaLog" ${DUMPLING_OUTPUT_DIR}/metadata|wc -l` ] +fi +if [ $metaPos != "" ]; then +[ `grep -o "Pos: $metaPos" ${DUMPLING_OUTPUT_DIR}/metadata|wc -l` ] +fi +if [ $metaGTID != "" ]; then +[ `grep -o "GTID: $metaGTID" ${DUMPLING_OUTPUT_DIR}/metadata|wc -l` ] +fi + +# test dumpling normally +export GO_FAILPOINTS="" +run_dumpling +cnt=`grep -o "(1)" ${DUMPLING_OUTPUT_DIR}/${DB_NAME}.${TABLE_NAME}.0.sql|wc -l` +echo "2nd records count is ${cnt}" +[ $cnt = 200 ] diff --git a/v4/export/consistency.go b/v4/export/consistency.go index 02809889..5eb0fdee 100644 --- a/v4/export/consistency.go +++ b/v4/export/consistency.go @@ -1,22 +1,27 @@ package export import ( + "context" "database/sql" "errors" "fmt" ) -func NewConsistencyController(conf *Config, session *sql.DB) (ConsistencyController, error) { +func NewConsistencyController(ctx context.Context, conf *Config, session *sql.DB) (ConsistencyController, error) { resolveAutoConsistency(conf) + conn, err := session.Conn(ctx) + if err != nil { + return nil, err + } switch conf.Consistency { case "flush": return &ConsistencyFlushTableWithReadLock{ serverType: conf.ServerInfo.ServerType, - db: session, + conn: conn, }, nil case "lock": return &ConsistencyLockDumpingTables{ - db: session, + conn: conn, allTables: conf.Tables, }, nil case "snapshot": @@ -32,49 +37,46 @@ func NewConsistencyController(conf *Config, session *sql.DB) (ConsistencyControl } type ConsistencyController interface { - Setup() error - TearDown() error + Setup(context.Context) error + TearDown(context.Context) error } type ConsistencyNone struct{} -func (c *ConsistencyNone) Setup() error { +func (c *ConsistencyNone) Setup(_ context.Context) error { return nil } -func (c *ConsistencyNone) TearDown() error { +func (c *ConsistencyNone) TearDown(_ context.Context) error { return nil } type ConsistencyFlushTableWithReadLock struct { serverType ServerType - db *sql.DB + conn *sql.Conn } -func (c *ConsistencyFlushTableWithReadLock) Setup() error { +func (c *ConsistencyFlushTableWithReadLock) Setup(ctx context.Context) error { if c.serverType == ServerTypeTiDB { return withStack(errors.New("'flush table with read lock' cannot be used to ensure the consistency in TiDB")) } - return FlushTableWithReadLock(c.db) + return FlushTableWithReadLock(ctx, c.conn) } -func (c *ConsistencyFlushTableWithReadLock) TearDown() error { - err := c.db.Ping() - if err != nil { - return withStack(errors.New("ConsistencyFlushTableWithReadLock lost database connection")) - } - return UnlockTables(c.db) +func (c *ConsistencyFlushTableWithReadLock) TearDown(ctx context.Context) error { + defer c.conn.Close() + return UnlockTables(ctx, c.conn) } type ConsistencyLockDumpingTables struct { - db *sql.DB + conn *sql.Conn allTables DatabaseTables } -func (c *ConsistencyLockDumpingTables) Setup() error { +func (c *ConsistencyLockDumpingTables) Setup(ctx context.Context) error { for dbName, tables := range c.allTables { for _, table := range tables { - err := LockTables(c.db, dbName, table.Name) + err := LockTables(ctx, c.conn, dbName, table.Name) if err != nil { return err } @@ -83,12 +85,9 @@ func (c *ConsistencyLockDumpingTables) Setup() error { return nil } -func (c *ConsistencyLockDumpingTables) TearDown() error { - err := c.db.Ping() - if err != nil { - return withStack(errors.New("ConsistencyLockDumpingTables lost database connection")) - } - return UnlockTables(c.db) +func (c *ConsistencyLockDumpingTables) TearDown(ctx context.Context) error { + defer c.conn.Close() + return UnlockTables(ctx, c.conn) } const showMasterStatusFieldNum = 5 diff --git a/v4/export/consistency_test.go b/v4/export/consistency_test.go index 0e756f5c..71999e67 100644 --- a/v4/export/consistency_test.go +++ b/v4/export/consistency_test.go @@ -1,6 +1,7 @@ package export import ( + "context" "errors" "strings" @@ -18,41 +19,43 @@ func (s *testConsistencySuite) assertNil(err error, c *C) { } } -func (s *testConsistencySuite) assertLifetimeErrNil(ctrl ConsistencyController, c *C) { - s.assertNil(ctrl.Setup(), c) - s.assertNil(ctrl.TearDown(), c) +func (s *testConsistencySuite) assertLifetimeErrNil(ctx context.Context, ctrl ConsistencyController, c *C) { + s.assertNil(ctrl.Setup(ctx), c) + s.assertNil(ctrl.TearDown(ctx), c) } func (s *testConsistencySuite) TestConsistencyController(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) defer db.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() conf := DefaultConfig() resultOk := sqlmock.NewResult(0, 1) conf.Consistency = "none" - ctrl, _ := NewConsistencyController(conf, db) + ctrl, _ := NewConsistencyController(ctx, conf, db) _, ok := ctrl.(*ConsistencyNone) c.Assert(ok, IsTrue) - s.assertLifetimeErrNil(ctrl, c) + s.assertLifetimeErrNil(ctx, ctrl, c) conf.Consistency = "flush" mock.ExpectExec("FLUSH TABLES WITH READ LOCK").WillReturnResult(resultOk) mock.ExpectExec("UNLOCK TABLES").WillReturnResult(resultOk) - ctrl, _ = NewConsistencyController(conf, db) + ctrl, _ = NewConsistencyController(ctx, conf, db) _, ok = ctrl.(*ConsistencyFlushTableWithReadLock) c.Assert(ok, IsTrue) - s.assertLifetimeErrNil(ctrl, c) + s.assertLifetimeErrNil(ctx, ctrl, c) if err = mock.ExpectationsWereMet(); err != nil { c.Fatalf(err.Error()) } conf.Consistency = "snapshot" conf.ServerInfo.ServerType = ServerTypeTiDB - ctrl, _ = NewConsistencyController(conf, db) + ctrl, _ = NewConsistencyController(ctx, conf, db) _, ok = ctrl.(*ConsistencyNone) c.Assert(ok, IsTrue) - s.assertLifetimeErrNil(ctrl, c) + s.assertLifetimeErrNil(ctx, ctrl, c) conf.Consistency = "lock" conf.Tables = NewDatabaseTables(). @@ -62,10 +65,10 @@ func (s *testConsistencySuite) TestConsistencyController(c *C) { mock.ExpectExec("LOCK TABLES").WillReturnResult(resultOk) } mock.ExpectExec("UNLOCK TABLES").WillReturnResult(resultOk) - ctrl, _ = NewConsistencyController(conf, db) + ctrl, _ = NewConsistencyController(ctx, conf, db) _, ok = ctrl.(*ConsistencyLockDumpingTables) c.Assert(ok, IsTrue) - s.assertLifetimeErrNil(ctrl, c) + s.assertLifetimeErrNil(ctx, ctrl, c) if err = mock.ExpectationsWereMet(); err != nil { c.Fatalf(err.Error()) } @@ -96,31 +99,33 @@ func (s *testConsistencySuite) TestConsistencyControllerError(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) defer db.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() conf := DefaultConfig() conf.Consistency = "invalid_str" - _, err = NewConsistencyController(conf, db) + _, err = NewConsistencyController(ctx, conf, db) c.Assert(err, NotNil) c.Assert(strings.Contains(err.Error(), "invalid consistency option"), IsTrue) // snapshot consistency is only available in TiDB conf.Consistency = "snapshot" conf.ServerInfo.ServerType = ServerTypeUnknown - _, err = NewConsistencyController(conf, db) + _, err = NewConsistencyController(ctx, conf, db) c.Assert(err, NotNil) // flush consistency is unavailable in TiDB conf.Consistency = "flush" conf.ServerInfo.ServerType = ServerTypeTiDB - ctrl, _ := NewConsistencyController(conf, db) - err = ctrl.Setup() + ctrl, _ := NewConsistencyController(ctx, conf, db) + err = ctrl.Setup(ctx) c.Assert(err, NotNil) // lock table fail conf.Consistency = "lock" conf.Tables = NewDatabaseTables().AppendTables("db", "t") mock.ExpectExec("LOCK TABLE").WillReturnError(errors.New("")) - ctrl, _ = NewConsistencyController(conf, db) - err = ctrl.Setup() + ctrl, _ = NewConsistencyController(ctx, conf, db) + err = ctrl.Setup(ctx) c.Assert(err, NotNil) } diff --git a/v4/export/dump.go b/v4/export/dump.go old mode 100644 new mode 100755 index e42e4a0f..9b2671ff --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -10,6 +10,7 @@ import ( "github.com/pingcap/dumpling/v4/log" _ "github.com/go-sql-driver/mysql" + "github.com/pingcap/failpoint" pd "github.com/pingcap/pd/v4/client" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -133,11 +134,11 @@ func Dump(pCtx context.Context, conf *Config) (err error) { conn.Close() } - conCtrl, err := NewConsistencyController(conf, pool) + conCtrl, err := NewConsistencyController(ctx, conf, pool) if err != nil { return err } - if err = conCtrl.Setup(); err != nil { + if err = conCtrl.Setup(ctx); err != nil { return err } @@ -146,10 +147,6 @@ func Dump(pCtx context.Context, conf *Config) (err error) { return err } - if err = conCtrl.TearDown(); err != nil { - return err - } - // for other consistencies, we should get table list after consistency is set up and GlobalMetaData is cached // for other consistencies, record snapshot after whole tables are locked. The recorded meta info is exactly the locked snapshot. if conf.Consistency != "lock" { @@ -165,6 +162,12 @@ func Dump(pCtx context.Context, conf *Config) (err error) { connectPool.releaseConn(conn) } + if err = conCtrl.TearDown(ctx); err != nil { + return err + } + + failpoint.Inject("ConsistencyCheck", nil) + var writer Writer switch strings.ToLower(conf.FileType) { case "sql": diff --git a/v4/export/sql.go b/v4/export/sql.go index a5bfbfd0..07a7b6e3 100644 --- a/v4/export/sql.go +++ b/v4/export/sql.go @@ -164,6 +164,7 @@ func SelectAllFromTable(conf *Config, db *sql.Conn, database, table string) (Tab } func SelectFromSql(conf *Config, db *sql.Conn) (TableDataIR, error) { + log.Info("dump data from sql", zap.String("sql", conf.Sql)) rows, err := db.QueryContext(context.Background(), conf.Sql) if err != nil { return nil, withStack(errors.WithMessage(err, conf.Sql)) @@ -295,18 +296,18 @@ func GetUniqueIndexName(db *sql.Conn, database, table string) (string, error) { return colName, nil } -func FlushTableWithReadLock(db *sql.DB) error { - _, err := db.Exec("FLUSH TABLES WITH READ LOCK") +func FlushTableWithReadLock(ctx context.Context, db *sql.Conn) error { + _, err := db.ExecContext(ctx, "FLUSH TABLES WITH READ LOCK") return withStack(err) } -func LockTables(db *sql.DB, database, table string) error { - _, err := db.Exec(fmt.Sprintf("LOCK TABLES `%s`.`%s` READ", escapeString(database), escapeString(table))) +func LockTables(ctx context.Context, db *sql.Conn, database, table string) error { + _, err := db.ExecContext(ctx, fmt.Sprintf("LOCK TABLES `%s`.`%s` READ", escapeString(database), escapeString(table))) return withStack(err) } -func UnlockTables(db *sql.DB) error { - _, err := db.Exec("UNLOCK TABLES") +func UnlockTables(ctx context.Context, db *sql.Conn) error { + _, err := db.ExecContext(ctx, "UNLOCK TABLES") return withStack(err) }