Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

Commit

Permalink
fix no-schemas and TiDBRowID usage
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu committed Jul 9, 2021
1 parent 088b5ba commit 09d257d
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 204 deletions.
12 changes: 11 additions & 1 deletion tests/views/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,18 @@ run_dumpling --no-views
file_not_exist "$DUMPLING_OUTPUT_DIR/views.v-schema.sql"
file_not_exist "$DUMPLING_OUTPUT_DIR/views.v-schema-view.sql"

rm -rf $DUMPLING_OUTPUT_DIR
run_dumpling --no-views=false
#diff "$DUMPLING_BASE_NAME/data/views-schema-create.sql" "$DUMPLING_OUTPUT_DIR/views-schema-create.sql"
diff "$DUMPLING_BASE_NAME/data/views.v-schema.sql" "$DUMPLING_OUTPUT_DIR/views.v-schema.sql"
diff "$DUMPLING_BASE_NAME/data/views.v-schema-view.sql" "$DUMPLING_OUTPUT_DIR/views.v-schema-view.sql"
file_not_exist "$DUMPLING_BASE_NAME/data/views.v.000000000.sql"
file_not_exist "$DUMPLING_OUTPUT_DIR/views.v.000000000.sql"

# test --no-schemas
rm -rf $DUMPLING_OUTPUT_DIR
run_dumpling --no-schemas
file_not_exist "$DUMPLING_OUTPUT_DIR/views-schema-create.sql"
file_not_exist "$DUMPLING_OUTPUT_DIR/views.t-schema.sql"
file_not_exist "$DUMPLING_OUTPUT_DIR/views.v-schema.sql"
file_not_exist "$DUMPLING_OUTPUT_DIR/views.v-schema-view.sql"
file_not_exist "$DUMPLING_OUTPUT_DIR/views.v.000000000.sql"
98 changes: 55 additions & 43 deletions v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,16 @@ func (d *Dumper) dumpDatabases(tctx *tcontext.Context, metaConn *sql.Conn, taskC
conf := d.conf
allTables := conf.Tables
for dbName, tables := range allTables {
createDatabaseSQL, err := ShowCreateDatabase(metaConn, dbName)
if err != nil {
return err
}
task := NewTaskDatabaseMeta(dbName, createDatabaseSQL)
ctxDone := d.sendTaskToChan(tctx, task, taskChan)
if ctxDone {
return tctx.Err()
if !conf.NoSchemas {
createDatabaseSQL, err := ShowCreateDatabase(metaConn, dbName)
if err != nil {
return err
}
task := NewTaskDatabaseMeta(dbName, createDatabaseSQL)
ctxDone := d.sendTaskToChan(tctx, task, taskChan)
if ctxDone {
return tctx.Err()
}
}

for _, table := range tables {
Expand All @@ -304,18 +306,22 @@ func (d *Dumper) dumpDatabases(tctx *tcontext.Context, metaConn *sql.Conn, taskC
return err
}

if table.Type == TableTypeView {
task := NewTaskViewMeta(dbName, table.Name, meta.ShowCreateTable(), meta.ShowCreateView())
ctxDone = d.sendTaskToChan(tctx, task, taskChan)
if ctxDone {
return tctx.Err()
}
} else {
task := NewTaskTableMeta(dbName, table.Name, meta.ShowCreateTable())
ctxDone = d.sendTaskToChan(tctx, task, taskChan)
if ctxDone {
return tctx.Err()
if !conf.NoSchemas {
if table.Type == TableTypeView {
task := NewTaskViewMeta(dbName, table.Name, meta.ShowCreateTable(), meta.ShowCreateView())
ctxDone := d.sendTaskToChan(tctx, task, taskChan)
if ctxDone {
return tctx.Err()
}
} else {
task := NewTaskTableMeta(dbName, table.Name, meta.ShowCreateTable())
ctxDone := d.sendTaskToChan(tctx, task, taskChan)
if ctxDone {
return tctx.Err()
}
}
}
if table.Type == TableTypeBase {
err = d.dumpTableData(tctx, metaConn, meta, taskChan)
if err != nil {
return err
Expand Down Expand Up @@ -429,7 +435,7 @@ func (d *Dumper) sequentialDumpTable(tctx *tcontext.Context, conn *sql.Conn, met
zap.String("database", meta.DatabaseName()),
zap.String("table", meta.TableName()))
}
orderByClause, err := buildOrderByClause(conf, conn, meta.DatabaseName(), meta.TableName())
orderByClause, err := buildOrderByClause(conf, conn, meta.DatabaseName(), meta.TableName(), meta.HasImplicitRowID())
if err != nil {
return err
}
Expand All @@ -456,12 +462,12 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met
zap.String("database", db), zap.String("table", tbl), zap.Error(err))
}

orderByClause, err := buildOrderByClause(conf, conn, db, tbl)
orderByClause, err := buildOrderByClause(conf, conn, db, tbl, meta.HasImplicitRowID())
if err != nil {
return err
}

field, err := pickupPossibleField(meta, conn, conf)
field, err := pickupPossibleField(meta, conn)
if err != nil || field == "" {
// skip split chunk logic if not found proper field
tctx.L().Warn("fallback to sequential dump due to no proper field",
Expand Down Expand Up @@ -576,7 +582,7 @@ func (d *Dumper) selectMinAndMaxIntValue(conn *sql.Conn, db, tbl, field string)
}

func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task) error {
db, tbl := meta.DatabaseName(), meta.TableName()
db, tbl, hasImplicitRowID := meta.DatabaseName(), meta.TableName(), meta.HasImplicitRowID()

var (
handleColNames []string
Expand All @@ -586,15 +592,15 @@ func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn
if d.conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0 {
tctx.L().Debug("dumping TiDB tables with TABLESAMPLE",
zap.String("database", db), zap.String("table", tbl))
handleColNames, handleVals, err = selectTiDBTableSample(tctx, conn, db, tbl)
handleColNames, handleVals, err = selectTiDBTableSample(tctx, conn, db, tbl, hasImplicitRowID)
} else {
tctx.L().Debug("dumping TiDB tables with TABLE REGIONS",
zap.String("database", db), zap.String("table", tbl))
var partitions []string
partitions, err = GetPartitionNames(conn, db, tbl)
if err == nil {
if len(partitions) == 0 {
handleColNames, handleVals, err = selectTiDBTableRegion(tctx, conn, db, tbl)
handleColNames, handleVals, err = selectTiDBTableRegion(tctx, conn, db, tbl, hasImplicitRowID)
} else {
return d.concurrentDumpTiDBPartitionTables(tctx, conn, meta, taskChan, partitions)
}
Expand All @@ -607,15 +613,15 @@ func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn
}

func (d *Dumper) concurrentDumpTiDBPartitionTables(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task, partitions []string) error {
db, tbl := meta.DatabaseName(), meta.TableName()
db, tbl, hasImplicitRowID := meta.DatabaseName(), meta.TableName(), meta.HasImplicitRowID()
tctx.L().Debug("dumping TiDB tables with TABLE REGIONS for partition table",
zap.String("database", db), zap.String("table", tbl), zap.Strings("partitions", partitions))

startChunkIdx := 0
totalChunk := 0
cachedHandleVals := make([][][]string, len(partitions))

handleColNames, _, err := selectTiDBRowKeyFields(conn, db, tbl, checkTiDBTableRegionPkFields)
handleColNames, _, err := selectTiDBRowKeyFields(conn, db, tbl, hasImplicitRowID, checkTiDBTableRegionPkFields)
if err != nil {
return err
}
Expand Down Expand Up @@ -670,8 +676,8 @@ func (d *Dumper) L() log.Logger {
return d.tctx.L()
}

func selectTiDBTableSample(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error) {
pkFields, pkColTypes, err := selectTiDBRowKeyFields(conn, dbName, tableName, nil)
func selectTiDBTableSample(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string, hasImplicitRowID bool) (pkFields []string, pkVals [][]string, err error) {
pkFields, pkColTypes, err := selectTiDBRowKeyFields(conn, dbName, tableName, hasImplicitRowID, nil)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down Expand Up @@ -715,11 +721,7 @@ func buildTiDBTableSampleQuery(pkFields []string, dbName, tblName string) string
return fmt.Sprintf(template, pks, escapeString(dbName), escapeString(tblName), pks)
}

func selectTiDBRowKeyFields(conn *sql.Conn, dbName, tableName string, checkPkFields func([]string, []string) error) (pkFields, pkColTypes []string, err error) {
hasImplicitRowID, err := SelectTiDBRowID(conn, dbName, tableName)
if err != nil {
return
}
func selectTiDBRowKeyFields(conn *sql.Conn, dbName, tableName string, hasImplicitRowID bool, checkPkFields func([]string, []string) error) (pkFields, pkColTypes []string, err error) {
if hasImplicitRowID {
pkFields, pkColTypes = []string{"_tidb_rowid"}, []string{"BIGINT"}
} else {
Expand All @@ -744,8 +746,8 @@ func checkTiDBTableRegionPkFields(pkFields, pkColTypes []string) (err error) {
return
}

func selectTiDBTableRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error) {
pkFields, _, err = selectTiDBRowKeyFields(conn, dbName, tableName, checkTiDBTableRegionPkFields)
func selectTiDBTableRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string, hasImplicitRowID bool) (pkFields []string, pkVals [][]string, err error) {
pkFields, _, err = selectTiDBRowKeyFields(conn, dbName, tableName, hasImplicitRowID, checkTiDBTableRegionPkFields)
if err != nil {
return
}
Expand Down Expand Up @@ -860,8 +862,17 @@ func dumpTableMeta(conf *Config, conn *sql.Conn, db string, table *TableInfo) (T
if err != nil {
return nil, err
}
var (
colTypes []*sql.ColumnType
hasImplicitRowID bool
)
if conf.ServerInfo.ServerType == ServerTypeTiDB {
hasImplicitRowID, err = SelectTiDBRowID(conn, db, tbl)
if err != nil {
return nil, err
}
}

var colTypes []*sql.ColumnType
// If all columns are generated
if selectField == "" {
colTypes, err = GetColumnTypes(conn, "*", db, tbl)
Expand All @@ -873,12 +884,13 @@ func dumpTableMeta(conf *Config, conn *sql.Conn, db string, table *TableInfo) (T
}

meta := &tableMeta{
avgRowLength: table.AvgRowLength,
database: db,
table: tbl,
colTypes: colTypes,
selectedField: selectField,
selectedLen: selectLen,
avgRowLength: table.AvgRowLength,
database: db,
table: tbl,
colTypes: colTypes,
selectedField: selectField,
selectedLen: selectLen,
hasImplicitRowID: hasImplicitRowID,
specCmts: []string{
"/*!40101 SET NAMES binary*/;",
},
Expand Down
39 changes: 39 additions & 0 deletions v4/export/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,42 @@ func (s *testSQLSuite) TestDumpBlock(c *C) {
c.Assert(errors.ErrorEqual(d.dumpDatabases(writerCtx, conn, taskChan), context.Canceled), IsTrue)
c.Assert(errors.ErrorEqual(wg.Wait(), writerErr), IsTrue)
}

func (s *testSQLSuite) TestDumpTableMeta(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()

database := "foo"
table := "bar"

tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel()
defer cancel()
conn, err := db.Conn(tctx)
c.Assert(err, IsNil)
conf := DefaultConfig()
conf.NoSchemas = true

for serverType := ServerTypeUnknown; serverType < ServerTypeAll; serverType++ {
conf.ServerInfo.ServerType = ServerType(serverType)
hasImplicitRowID := false
mock.ExpectQuery("SELECT COLUMN_NAME").
WithArgs(database, table).
WillReturnRows(sqlmock.NewRows([]string{"column_name", "extra"}).AddRow("id", ""))
if serverType == ServerTypeTiDB {
mock.ExpectExec("SELECT _tidb_rowid from").
WillReturnResult(sqlmock.NewResult(0, 0))
hasImplicitRowID = true
}
mock.ExpectQuery(fmt.Sprintf("SELECT \\* FROM `%s`.`%s`", database, table)).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(1))
meta, err := dumpTableMeta(conf, conn, database, &TableInfo{Type: TableTypeBase, Name: table})
c.Assert(err, IsNil)
c.Assert(meta.DatabaseName(), Equals, database)
c.Assert(meta.TableName(), Equals, table)
c.Assert(meta.SelectedField(), Equals, "*")
c.Assert(meta.SelectedLen(), Equals, 1)
c.Assert(meta.ShowCreateTable(), Equals, "")
c.Assert(meta.HasImplicitRowID(), Equals, hasImplicitRowID)
}
}
1 change: 1 addition & 0 deletions v4/export/ir.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type TableMeta interface {
ShowCreateTable() string
ShowCreateView() string
AvgRowLength() uint64
HasImplicitRowID() bool
}

// SQLRowIter is the iterator on a collection of sql.Row.
Expand Down
23 changes: 14 additions & 9 deletions v4/export/ir_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,16 @@ func (td *tableData) RawRows() *sql.Rows {
}

type tableMeta struct {
database string
table string
colTypes []*sql.ColumnType
selectedField string
selectedLen int
specCmts []string
showCreateTable string
showCreateView string
avgRowLength uint64
database string
table string
colTypes []*sql.ColumnType
selectedField string
selectedLen int
specCmts []string
showCreateTable string
showCreateView string
avgRowLength uint64
hasImplicitRowID bool
}

func (tm *tableMeta) ColumnTypes() []string {
Expand Down Expand Up @@ -312,6 +313,10 @@ func (tm *tableMeta) AvgRowLength() uint64 {
return tm.avgRowLength
}

func (tm *tableMeta) HasImplicitRowID() bool {
return tm.hasImplicitRowID
}

type metaData struct {
target string
metaSQL string
Expand Down
27 changes: 7 additions & 20 deletions v4/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,18 +284,12 @@ func buildSelectQuery(database, table, fields, partition, where, orderByClause s
return query.String()
}

func buildOrderByClause(conf *Config, db *sql.Conn, database, table string) (string, error) {
func buildOrderByClause(conf *Config, db *sql.Conn, database, table string, hasImplicitRowID bool) (string, error) {
if !conf.SortByPk {
return "", nil
}
if conf.ServerInfo.ServerType == ServerTypeTiDB {
ok, err := SelectTiDBRowID(db, database, table)
if err != nil {
return "", errors.Trace(err)
}
if ok {
return orderByTiDBRowID, nil
}
if hasImplicitRowID {
return orderByTiDBRowID, nil
}
cols, err := GetPrimaryKeyColumns(db, database, table)
if err != nil {
Expand Down Expand Up @@ -911,17 +905,10 @@ func simpleQueryWithArgs(conn *sql.Conn, handleOneRow func(*sql.Rows) error, sql
return errors.Annotatef(rows.Err(), "sql: %s", sql)
}

func pickupPossibleField(meta TableMeta, db *sql.Conn, conf *Config) (string, error) {
dbName, tableName := meta.DatabaseName(), meta.TableName()
// If detected server is TiDB, try using _tidb_rowid
if conf.ServerInfo.ServerType == ServerTypeTiDB {
ok, err := SelectTiDBRowID(db, dbName, tableName)
if err != nil {
return "", err
}
if ok {
return "_tidb_rowid", nil
}
func pickupPossibleField(meta TableMeta, db *sql.Conn) (string, error) {
// try using _tidb_rowid first
if meta.HasImplicitRowID() {
return "_tidb_rowid", nil
}
// try to use pk
fieldName, err := getNumericIndex(db, meta)
Expand Down
Loading

0 comments on commit 09d257d

Please sign in to comment.