Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: add back table empty check and add a switch config #30887

Merged
merged 20 commits into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ type TikvImporter struct {
DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"`
RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"`
DuplicateResolution DuplicateResolutionAlgorithm `toml:"duplicate-resolution" json:"duplicate-resolution"`
IncrementalImport bool `toml:"incremental-import" json:"incremental-import"`

EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"`
LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"`
Expand Down
93 changes: 90 additions & 3 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@ package restore
import (
"bytes"
"context"
"database/sql"
"fmt"
"io"
"path/filepath"
"reflect"
"sort"
"strconv"
"strings"
"sync"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"modernc.org/mathutil"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
Expand All @@ -38,16 +44,14 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"github.com/tikv/pd/server/api"
pdconfig "github.com/tikv/pd/server/config"

"go.uber.org/zap"
"modernc.org/mathutil"
)

const (
Expand Down Expand Up @@ -1053,3 +1057,86 @@ outloop:
log.L().Info("Sample source data", zap.String("table", tableMeta.Name), zap.Float64("IndexRatio", tableMeta.IndexRatio), zap.Bool("IsSourceOrder", tableMeta.IsRowOrdered))
return nil
}

func (rc *Controller) checkTableEmpty(ctx context.Context) error {
if rc.cfg.TikvImporter.Backend == config.BackendTiDB || rc.cfg.TikvImporter.IncrementalImport {
return nil
}
db, _ := rc.tidbGlue.GetDB()

tableCount := 0
for _, db := range rc.dbMetas {
tableCount += len(db.Tables)
}

var lock sync.Mutex
tableNames := make([]string, 0)
concurrency := utils.MinInt(tableCount, rc.cfg.App.RegionConcurrency)
ch := make(chan string, concurrency)
eg, gCtx := errgroup.WithContext(ctx)
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
for tblName := range ch {
// skip tables that have checkpoint
if rc.cfg.Checkpoint.Enable {
_, err := rc.checkpointsDB.Get(gCtx, tblName)
switch {
case err == nil:
continue
case errors.IsNotFound(err):
default:
return errors.Trace(err)
}
}

hasData, err1 := tableContainsData(gCtx, db, tblName)
if err1 != nil {
return err1
}
if hasData {
lock.Lock()
tableNames = append(tableNames, tblName)
lock.Unlock()
}
}
return nil
})
}
for _, db := range rc.dbMetas {
for _, tbl := range db.Tables {
ch <- common.UniqueTable(tbl.DB, tbl.Name)
}
}
close(ch)
if err := eg.Wait(); err != nil {
if common.IsContextCanceledError(err) {
return nil
}
return errors.Trace(err)
}

if len(tableNames) > 0 {
// sort the failed names
sort.Slice(tableNames, func(i, j int) bool {
return strings.Compare(tableNames[i], tableNames[j]) < 0
})
glorv marked this conversation as resolved.
Show resolved Hide resolved
msg := fmt.Sprintf("table(s) [%s] are not empty", strings.Join(tableNames, ", "))
rc.checkTemplate.Collect(Critical, false, msg)
}
return nil
}

func tableContainsData(ctx context.Context, db utils.QueryExecutor, tableName string) (bool, error) {
query := "select 1 from " + tableName + " limit 1"
var dump int
err := db.QueryRowContext(ctx, query).Scan(&dump)

switch {
case err == sql.ErrNoRows:
return false, nil
case err != nil:
return false, errors.Trace(err)
default:
return true, nil
}
}
139 changes: 139 additions & 0 deletions br/pkg/lightning/restore/check_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@ package restore

import (
"context"
"database/sql"
"fmt"
"os"
"path/filepath"

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/pingcap/failpoint"

"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/glue"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/worker"
"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -404,6 +407,142 @@ func (s *checkInfoSuite) TestCheckCSVHeader(c *C) {
}
}

func (s *checkInfoSuite) TestCheckTableEmpty(c *C) {
dir := c.MkDir()
cfg := config.NewConfig()
cfg.Checkpoint.Enable = false
dbMetas := []*mydump.MDDatabaseMeta{
{
Name: "test1",
Tables: []*mydump.MDTableMeta{
{
DB: "test1",
Name: "tbl1",
},
{
DB: "test1",
Name: "tbl2",
},
},
},
{
Name: "test2",
Tables: []*mydump.MDTableMeta{
{
DB: "test2",
Name: "tbl1",
},
},
},
}

rc := &Controller{
cfg: cfg,
dbMetas: dbMetas,
checkpointsDB: checkpoints.NewNullCheckpointsDB(),
}

ctx := context.Background()

// test tidb will do nothing
rc.cfg.TikvImporter.Backend = config.BackendTiDB
err := rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)

// test incremental mode
rc.cfg.TikvImporter.Backend = config.BackendLocal
rc.cfg.TikvImporter.IncrementalImport = true
err = rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)

rc.cfg.TikvImporter.IncrementalImport = false
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
mock.MatchExpectationsInOrder(false)
rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test2`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
// not error, need not to init check template
err = rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)

// single table contains data
db, mock, err = sqlmock.New()
c.Assert(err, IsNil)
rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.MatchExpectationsInOrder(false)
mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test2`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))
rc.checkTemplate = NewSimpleTemplate()
err = rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
tmpl := rc.checkTemplate.(*SimpleTemplate)
c.Assert(len(tmpl.criticalMsgs), Equals, 1)
c.Assert(tmpl.criticalMsgs[0], Matches, "table\\(s\\) \\[`test2`.`tbl1`\\] are not empty")

// multi tables contains data
db, mock, err = sqlmock.New()
c.Assert(err, IsNil)
rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.MatchExpectationsInOrder(false)
mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))
mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test2`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))
rc.checkTemplate = NewSimpleTemplate()
err = rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
tmpl = rc.checkTemplate.(*SimpleTemplate)
c.Assert(len(tmpl.criticalMsgs), Equals, 1)
c.Assert(tmpl.criticalMsgs[0], Matches, "table\\(s\\) \\[`test1`.`tbl1`, `test2`.`tbl1`\\] are not empty")

// init checkpoint with only two of the three tables
dbInfos := map[string]*checkpoints.TidbDBInfo{
"test1": {
Name: "test1",
Tables: map[string]*checkpoints.TidbTableInfo{
"tbl1": {
Name: "tbl1",
},
},
},
"test2": {
Name: "test2",
Tables: map[string]*checkpoints.TidbTableInfo{
"tbl1": {
Name: "tbl1",
},
},
},
}
rc.cfg.Checkpoint.Enable = true
rc.checkpointsDB = checkpoints.NewFileCheckpointsDB(filepath.Join(dir, "cp.pb"))
err = rc.checkpointsDB.Initialize(ctx, cfg, dbInfos)
c.Check(err, IsNil)
db, mock, err = sqlmock.New()
c.Assert(err, IsNil)
rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
// only need to check the one that is not in checkpoint
mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
err = rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
}

func (s *checkInfoSuite) TestLocalResource(c *C) {
dir := c.MkDir()
mockStore, err := storage.NewLocalStorage(dir)
Expand Down
58 changes: 57 additions & 1 deletion br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,9 +1027,65 @@ func (m noopTableMetaMgr) UpdateTableBaseChecksum(ctx context.Context, checksum
}

func (m noopTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) (bool, bool, *verify.KVChecksum, error) {
return false, false, nil, nil
return true, true, &verify.KVChecksum{}, nil
}

func (m noopTableMetaMgr) FinishTable(ctx context.Context) error {
return nil
}

type singleMgrBuilder struct{}

func (b singleMgrBuilder) Init(context.Context) error {
return nil
}

func (b singleMgrBuilder) TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr {
return &singleTaskMetaMgr{
pd: pd,
}
}

func (b singleMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr {
return noopTableMetaMgr{}
3pointer marked this conversation as resolved.
Show resolved Hide resolved
}

type singleTaskMetaMgr struct {
pd *pdutil.PdController
}

func (m *singleTaskMetaMgr) InitTask(ctx context.Context, source int64) error {
return nil
}

func (m *singleTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error {
_, err := action(nil)
return err
}

func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) {
return m.pd.RemoveSchedulers(ctx)
}

func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return true, nil
}

func (m *singleTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) {
return true, true, nil
}

func (m *singleTaskMetaMgr) Cleanup(ctx context.Context) error {
return nil
}

func (m *singleTaskMetaMgr) CleanupTask(ctx context.Context) error {
return nil
}

func (m *singleTaskMetaMgr) CleanupAllMetas(ctx context.Context) error {
return nil
}

func (m *singleTaskMetaMgr) Close() {
}
Loading