Skip to content

Commit

Permalink
lightning: refine fetching table structure error handling (#44801) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Aug 10, 2023
1 parent ece169b commit 447ed39
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 6 deletions.
2 changes: 0 additions & 2 deletions br/pkg/lightning/backend/tidb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ go_library(

go_test(
name = "tidb_test",
timeout = "short",
srcs = ["tidb_test.go"],
flaky = True,
deps = [
":tidb",
"//br/pkg/lightning/backend",
Expand Down
20 changes: 16 additions & 4 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,11 @@ func NewTargetInfoGetter(db *sql.DB) backend.TargetInfoGetter {
// TODO: refactor
func (b *targetInfoGetter) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
var err error
tables := []*model.TableInfo{}
results := []*model.TableInfo{}
logger := log.FromContext(ctx)
s := common.SQLWithRetry{
DB: b.db,
Logger: log.FromContext(ctx),
Logger: logger,
}

err = s.Transact(ctx, "fetch table columns", func(c context.Context, tx *sql.Tx) error {
Expand All @@ -160,6 +161,7 @@ func (b *targetInfoGetter) FetchRemoteTableModels(ctx context.Context, schemaNam
curColOffset int
curTable *model.TableInfo
)
tables := []*model.TableInfo{}
for rows.Next() {
var tableName, columnName, columnType, generationExpr, columnExtra string
if e := rows.Scan(&tableName, &columnName, &columnType, &generationExpr, &columnExtra); e != nil {
Expand Down Expand Up @@ -202,15 +204,24 @@ func (b *targetInfoGetter) FetchRemoteTableModels(ctx context.Context, schemaNam
// shard_row_id/auto random is only available after tidb v4.0.0
// `show table next_row_id` is also not available before tidb v4.0.0
if serverInfo.ServerType != version.ServerTypeTiDB || serverInfo.ServerVersion.Major < 4 {
results = tables
return nil
}

failpoint.Inject(
"FetchRemoteTableModels_BeforeFetchTableAutoIDInfos",
func() {
fmt.Println("failpoint: FetchRemoteTableModels_BeforeFetchTableAutoIDInfos")
},
)

// init auto id column for each table
for _, tbl := range tables {
tblName := common.UniqueTable(schemaName, tbl.Name.O)
autoIDInfos, err := FetchTableAutoIDInfos(ctx, tx, tblName)
if err != nil {
return errors.Trace(err)
logger.Warn("fetch table auto ID infos error. Ignore this table and continue.", zap.String("table_name", tblName), zap.Error(err))
continue
}
for _, info := range autoIDInfos {
for _, col := range tbl.Columns {
Expand All @@ -227,10 +238,11 @@ func (b *targetInfoGetter) FetchRemoteTableModels(ctx context.Context, schemaNam
}
}
}
results = append(results, tbl)
}
return nil
})
return tables, err
return results, err
}

// CheckRequirements performs the check whether the backend satisfies the version requirements.
Expand Down
48 changes: 48 additions & 0 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,54 @@ func TestFetchRemoteTableModels_4_x_auto_random(t *testing.T) {
}, tableInfos)
}

func TestFetchRemoteTableModelsDropTableHalfway(t *testing.T) {
s := createMysqlSuite(t)
defer s.TearDownTest(t)
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT tidb_version()").
WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow(`Release Version: v99.0.0`)) // this is a fake version number
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("tbl01", "id", "bigint(20)", "", "auto_increment").
AddRow("tbl01", "val", "varchar(255)", "", "").
AddRow("tbl02", "id", "bigint(20)", "", "auto_increment").
AddRow("tbl02", "val", "varchar(255)", "", ""))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`tbl01` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}).
AddRow("test", "tbl01", "id", int64(1), "_TIDB_ROWID").
AddRow("test", "tbl01", "id", int64(1), "AUTO_INCREMENT"))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`tbl02` NEXT_ROW_ID").
WillReturnError(mysql.NewErr(mysql.ErrNoSuchTable, "test", "tbl02"))
s.mockDB.ExpectCommit()

infoGetter := tidb.NewTargetInfoGetter(s.dbHandle)
tableInfos, err := infoGetter.FetchRemoteTableModels(context.Background(), "test")
require.NoError(t, err)
ft := types.FieldType{}
ft.SetFlag(mysql.AutoIncrementFlag)
require.Equal(t, []*model.TableInfo{
{
Name: model.NewCIStr("tbl01"),
State: model.StatePublic,
PKIsHandle: true,
Columns: []*model.ColumnInfo{
{
Name: model.NewCIStr("id"),
Offset: 0,
State: model.StatePublic,
FieldType: ft,
},
{
Name: model.NewCIStr("val"),
Offset: 1,
State: model.StatePublic,
},
},
},
}, tableInfos)
}

func TestWriteRowsErrorNoRetry(t *testing.T) {
nonRetryableError := sql.ErrNoRows
s := createMysqlSuite(t)
Expand Down
13 changes: 13 additions & 0 deletions br/pkg/lightning/restore/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,19 @@ func (p *PreRestoreInfoGetterImpl) GetAllTableStructures(ctx context.Context, op

func (p *PreRestoreInfoGetterImpl) getTableStructuresByFileMeta(ctx context.Context, dbSrcFileMeta *mydump.MDDatabaseMeta, getPreInfoCfg *ropts.GetPreInfoConfig) ([]*model.TableInfo, error) {
dbName := dbSrcFileMeta.Name
failpoint.Inject(
"getTableStructuresByFileMeta_BeforeFetchRemoteTableModels",
func(v failpoint.Value) {
fmt.Println("failpoint: getTableStructuresByFileMeta_BeforeFetchRemoteTableModels")
const defaultMilliSeconds int = 5000
sleepMilliSeconds, ok := v.(int)
if !ok || sleepMilliSeconds <= 0 || sleepMilliSeconds > 30000 {
sleepMilliSeconds = defaultMilliSeconds
}
//nolint: errcheck
failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/tidb/FetchRemoteTableModels_BeforeFetchTableAutoIDInfos", fmt.Sprintf("sleep(%d)", sleepMilliSeconds))
},
)
currentTableInfosFromDB, err := p.targetInfoGetter.FetchRemoteTableModels(ctx, dbName)
if err != nil {
if getPreInfoCfg != nil && getPreInfoCfg.IgnoreDBNotExist {
Expand Down
2 changes: 2 additions & 0 deletions br/tests/lightning_drop_other_tables_halfway/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[tikv-importer]
backend = "tidb"
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE tbl01 (
id INT PRIMARY KEY AUTO_INCREMENT,
val VARCHAR(64)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
id,val
1,aaa
2,bbb
3,ccc
4,ddd
5,eee
28 changes: 28 additions & 0 deletions br/tests/lightning_drop_other_tables_halfway/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/sh
#
# Copyright 2023 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu

run_sql 'DROP DATABASE IF EXISTS lntest'
run_sql 'CREATE DATABASE lntest'
run_sql 'CREATE TABLE lntest.tbl02 (id BIGINT PRIMARY KEY AUTO_INCREMENT, val VARCHAR(255));'

GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/getTableStructuresByFileMeta_BeforeFetchRemoteTableModels=return(6000)" run_lightning &
pid=$!

sleep 4
run_sql 'DROP TABLE lntest.tbl02;' && echo "table dropped"
wait "${pid}"

0 comments on commit 447ed39

Please sign in to comment.