Skip to content

Commit

Permalink
sync-diff-inspector: skip validation for tables that exist only upstr…
Browse files Browse the repository at this point in the history
…eam or downstream and print skipped information in summary and progress (#693)

ref #692
  • Loading branch information
liumengya94 authored Jan 30, 2023
1 parent 5c7c9ee commit b4be17a
Show file tree
Hide file tree
Showing 17 changed files with 485 additions and 113 deletions.
3 changes: 3 additions & 0 deletions sync_diff_inspector/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@ type Config struct {
CheckStructOnly bool `toml:"check-struct-only" json:"check-struct-only"`
// experimental feature: only check table data without table struct
CheckDataOnly bool `toml:"check-data-only" json:"-"`
// skip validation for tables that don't exist upstream or downstream
SkipNonExistingTable bool `toml:"skip-non-existing-table" json:"-"`
// DMAddr is dm-master's address, the format should like "http://127.0.0.1:8261"
DMAddr string `toml:"dm-addr" json:"dm-addr"`
// DMTask string `toml:"dm-task" json:"dm-task"`
Expand Down Expand Up @@ -411,6 +413,7 @@ func NewConfig() *Config {
fs.IntVar(&cfg.CheckThreadCount, "check-thread-count", 4, "how many goroutines are created to check data")
fs.BoolVar(&cfg.ExportFixSQL, "export-fix-sql", true, "set true if want to compare rows or set to false will only compare checksum")
fs.BoolVar(&cfg.CheckStructOnly, "check-struct-only", false, "ignore check table's data")
fs.BoolVar(&cfg.SkipNonExistingTable, "skip-non-existing-table", false, "skip validation for tables that don't exist upstream or downstream")
fs.BoolVar(&cfg.CheckDataOnly, "check-data-only", false, "ignore check table's struct")

_ = fs.MarkHidden("check-data-only")
Expand Down
28 changes: 20 additions & 8 deletions sync_diff_inspector/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,16 @@ func (df *Diff) StructEqual(ctx context.Context) error {
tableIndex = df.startRange.ChunkRange.Index.TableIndex
}
for ; tableIndex < len(tables); tableIndex++ {
isEqual, isSkip, err := df.compareStruct(ctx, tableIndex)
if err != nil {
return errors.Trace(err)
isEqual, isSkip, isAllTableExist := false, true, tables[tableIndex].TableLack
if common.AllTableExist(isAllTableExist) {
var err error
isEqual, isSkip, err = df.compareStruct(ctx, tableIndex)
if err != nil {
return errors.Trace(err)
}
}
progress.RegisterTable(dbutil.TableName(tables[tableIndex].Schema, tables[tableIndex].Table), !isEqual, isSkip)
df.report.SetTableStructCheckResult(tables[tableIndex].Schema, tables[tableIndex].Table, isEqual, isSkip)
progress.RegisterTable(dbutil.TableName(tables[tableIndex].Schema, tables[tableIndex].Table), !isEqual, isSkip, isAllTableExist)
df.report.SetTableStructCheckResult(tables[tableIndex].Schema, tables[tableIndex].Table, isEqual, isSkip, isAllTableExist)
}
return nil
}
Expand Down Expand Up @@ -411,12 +415,21 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool
node: rangeInfo.ToNode(),
}
defer func() { df.sqlCh <- dml }()
tableDiff := df.downstream.GetTables()[rangeInfo.GetTableIndex()]
schema, table := tableDiff.Schema, tableDiff.Table
id := rangeInfo.ChunkRange.Index
if rangeInfo.ChunkRange.Type == chunk.Empty {
dml.node.State = checkpoints.IgnoreState
// for tables that don't exist upstream or downstream
if !common.AllTableExist(tableDiff.TableLack) {
upCount := df.upstream.GetCountForLackTable(ctx, rangeInfo)
downCount := df.downstream.GetCountForLackTable(ctx, rangeInfo)
df.report.SetTableDataCheckResult(schema, table, false, int(upCount), int(downCount), upCount, downCount, id)
return false
}
return true
}
tableDiff := df.downstream.GetTables()[rangeInfo.GetTableIndex()]
schema, table := tableDiff.Schema, tableDiff.Table

var state string = checkpoints.SuccessState

isEqual, upCount, downCount, err := df.compareChecksumAndGetCount(ctx, rangeInfo)
Expand Down Expand Up @@ -447,7 +460,6 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool
isEqual = isDataEqual
}
dml.node.State = state
id := rangeInfo.ChunkRange.Index
df.report.SetTableDataCheckResult(schema, table, isEqual, dml.rowAdd, dml.rowDelete, upCount, downCount, id)
return isEqual
}
Expand Down
41 changes: 34 additions & 7 deletions sync_diff_inspector/progress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"os"
"strings"
"time"

"github.com/pingcap/tidb-tools/sync_diff_inspector/source/common"
)

type TableProgressPrinter struct {
Expand Down Expand Up @@ -53,7 +55,9 @@ const (
TABLE_STATE_RESULT_FAIL_STRUCTURE_PASS table_state_t = 0x40
TABLE_STATE_RESULT_DIFFERENT table_state_t = 0x80
TABLE_STATE_HEAD table_state_t = 0xff
TABLE_STATE_RESULT_MASK table_state_t = 0xf0
TABLE_STATE_RESULT_MASK table_state_t = 0xff0
TABLE_STATE_NOT_EXSIT_UPSTREAM table_state_t = 0x100
TABLE_STATE_NOT_EXSIT_DOWNSTREAM table_state_t = 0x200
)

type TableProgress struct {
Expand Down Expand Up @@ -127,11 +131,18 @@ func (tpp *TableProgressPrinter) UpdateTotal(name string, total int, stopUpdate
}
}

func (tpp *TableProgressPrinter) RegisterTable(name string, isFailed bool, isDone bool) {
func (tpp *TableProgressPrinter) RegisterTable(name string, isFailed bool, isDone bool, isExist int) {
var state table_state_t
if isFailed {
if isDone {
state = TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE | TABLE_STATE_REGISTER
switch isExist {
case common.UpstreamTableLackFlag:
state = TABLE_STATE_NOT_EXSIT_UPSTREAM | TABLE_STATE_REGISTER
case common.DownstreamTableLackFlag:
state = TABLE_STATE_NOT_EXSIT_DOWNSTREAM | TABLE_STATE_REGISTER
default:
state = TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE | TABLE_STATE_REGISTER
}
} else {
state = TABLE_STATE_RESULT_FAIL_STRUCTURE_CONTINUE | TABLE_STATE_REGISTER
}
Expand Down Expand Up @@ -181,6 +192,7 @@ func (tpp *TableProgressPrinter) PrintSummary() {
tpp.tableNums,
)
} else {
SkippedNum := 0
for p := tpp.tableFailList.Front(); p != nil; p = p.Next() {
tp := p.Value.(*TableProgress)
if tp.state&(TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE|TABLE_STATE_RESULT_FAIL_STRUCTURE_CONTINUE) != 0 {
Expand All @@ -189,10 +201,18 @@ func (tpp *TableProgressPrinter) PrintSummary() {
if tp.state&(TABLE_STATE_RESULT_DIFFERENT) != 0 {
fixStr = fmt.Sprintf("%sThe data of `%s` is not equal.\n", fixStr, tp.name)
}
if tp.state&(TABLE_STATE_NOT_EXSIT_DOWNSTREAM) != 0 {
fixStr = fmt.Sprintf("%sThe data of `%s` does not exist in downstream database.\n", fixStr, tp.name)
SkippedNum++
}
if tp.state&(TABLE_STATE_NOT_EXSIT_UPSTREAM) != 0 {
fixStr = fmt.Sprintf("%sThe data of `%s` does not exist in upstream database.\n", fixStr, tp.name)
SkippedNum++
}
}
fixStr = fmt.Sprintf(
"%s\nThe rest of the tables are all equal.\nThe patch file has been generated to './output_dir/patch.sql'\nYou can view the comparison details through './output_dir/sync_diff_inspector.log'\n",
fixStr,
"%s\nThe rest of the tables are all equal.\nA total of %d tables have been compared, %d tables finished, %d tables failed, %d tables skipped.\nThe patch file has been generated to './output_dir/patch.sql'\nYou can view the comparison details through './output_dir/sync_diff_inspector.log'\n",
fixStr, tpp.tableNums, tpp.tableNums-tpp.tableFailList.Len(), tpp.tableFailList.Len()-SkippedNum, SkippedNum,
)
}

Expand Down Expand Up @@ -337,6 +357,13 @@ func (tpp *TableProgressPrinter) flush(stateIsChanged bool) {
tpp.lines++
tpp.progressTableNums++
tp.state = TABLE_STATE_COMPARING
case TABLE_STATE_NOT_EXSIT_UPSTREAM, TABLE_STATE_NOT_EXSIT_DOWNSTREAM:
dynStr = fmt.Sprintf("%sComparing the table data of `%s` ...skipped\n", dynStr, tp.name)
tpp.tableFailList.PushBack(tp)
preNode := p.Prev()
tpp.tableList.Remove(p)
p = preNode
tpp.finishTableNums++
case TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE:
fixStr = fmt.Sprintf("%sComparing the table structure of `%s` ... failure\n", fixStr, tp.name)
tpp.tableFailList.PushBack(tp)
Expand Down Expand Up @@ -410,9 +437,9 @@ func UpdateTotal(name string, total int, stopUpdate bool) {
}
}

func RegisterTable(name string, isFailed bool, isDone bool) {
func RegisterTable(name string, isFailed bool, isDone bool, isExist int) {
if progress_ != nil {
progress_.RegisterTable(name, isFailed, isDone)
progress_.RegisterTable(name, isFailed, isDone, isExist)
}
}

Expand Down
31 changes: 21 additions & 10 deletions sync_diff_inspector/progress/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@ import (
"testing"
"time"

"github.com/pingcap/tidb-tools/sync_diff_inspector/source/common"
"github.com/stretchr/testify/require"
)

func TestProgress(t *testing.T) {
p := NewTableProgressPrinter(4, 0)
p.RegisterTable("1", true, true)
p := NewTableProgressPrinter(6, 0)
p.RegisterTable("1", true, true, common.AllTableExistFlag)
p.StartTable("1", 50, true)
p.RegisterTable("2", true, false)
p.RegisterTable("2", true, false, common.AllTableExistFlag)
p.StartTable("2", 2, true)
p.Inc("2")
p.RegisterTable("3", false, false)
p.RegisterTable("3", false, false, common.AllTableExistFlag)
p.StartTable("3", 1, false)
p.Inc("2")
p.Inc("3")
Expand All @@ -39,6 +40,10 @@ func TestProgress(t *testing.T) {
p.FailTable("4")
p.Inc("3")
p.Inc("4")
p.RegisterTable("5", true, true, common.UpstreamTableLackFlag)
p.StartTable("5", 1, true)
p.RegisterTable("6", true, true, common.DownstreamTableLackFlag)
p.StartTable("6", 1, true)
time.Sleep(500 * time.Millisecond)
p.Close()
buffer := new(bytes.Buffer)
Expand All @@ -47,18 +52,21 @@ func TestProgress(t *testing.T) {
require.Equal(
t,
buffer.String(),
"\x1b[1A\x1b[J\nSummary:\n\nThe structure of `1` is not equal.\nThe structure of `2` is not equal.\nThe data of `4` is not equal.\n"+
"\nThe rest of the tables are all equal.\nThe patch file has been generated to './output_dir/patch.sql'\n"+
"\x1b[1A\x1b[J\nSummary:\n\nThe structure of `1` is not equal.\nThe structure of `2` is not equal.\nThe data of `4` is not equal.\nThe data of `5` does not exist in upstream database.\nThe data of `6` does not exist in downstream database.\n"+
"\nThe rest of the tables are all equal.\nA total of 6 tables have been compared, 1 tables finished, 3 tables failed, 2 tables skipped.\nThe patch file has been generated to './output_dir/patch.sql'\n"+
"You can view the comparison details through './output_dir/sync_diff_inspector.log'\n\n",
)
}

func TestTableError(t *testing.T) {
p := NewTableProgressPrinter(4, 0)
p.RegisterTable("1", true, true)
p.RegisterTable("1", true, true, common.AllTableExistFlag)
p.StartTable("1", 50, true)
p.RegisterTable("2", true, true)
p.RegisterTable("2", true, true, common.AllTableExistFlag)
p.StartTable("2", 1, true)
p.RegisterTable("3", true, true, common.DownstreamTableLackFlag)
p.StartTable("3", 1, true)

p.Inc("2")
buffer := new(bytes.Buffer)
p.SetOutput(buffer)
Expand All @@ -73,16 +81,19 @@ func TestTableError(t *testing.T) {
"\x1b[2A\x1b[JComparing the table structure of `2` ... failure\n"+
"_____________________________________________________________________________\n"+
"Progress [==============================>------------------------------] 50% 0/0\n"+
"\x1b[2A\x1b[JComparing the table data of `3` ...skipped\n"+
"_____________________________________________________________________________\n"+
"Progress [=============================================>---------------] 75% 0/1\n"+
"\x1b[1A\x1b[J\nError in comparison process:\n[aaa]\n\n"+
"You can view the comparison details through './output_dir/sync_diff_inspector.log'\n",
)
}

func TestAllSuccess(t *testing.T) {
Init(2, 0)
RegisterTable("1", false, false)
RegisterTable("1", false, false, common.AllTableExistFlag)
StartTable("1", 1, true)
RegisterTable("2", false, false)
RegisterTable("2", false, false, common.AllTableExistFlag)
StartTable("2", 1, true)
Inc("1")
Inc("2")
Expand Down
46 changes: 32 additions & 14 deletions sync_diff_inspector/report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type TableResult struct {
ChunkMap map[string]*ChunkResult `json:"chunk-result"` // `ChunkMap` stores the `ChunkResult` of each chunk of the table
UpCount int64 `json:"up-count"` // `UpCount` is the number of rows in the table from upstream
DownCount int64 `json:"down-count"` // `DownCount` is the number of rows in the table from downstream

TableLack int `json:"table-lack"`
}

// ChunkResult save the necessarily information to provide summary information
Expand All @@ -80,6 +80,7 @@ type Report struct {
Result string `json:"-"` // Result is pass or fail
PassNum int32 `json:"-"` // The pass number of tables
FailedNum int32 `json:"-"` // The failed number of tables
SkippedNum int32 `json:"-"` // The skipped number of tables
TableResults map[string]map[string]*TableResult `json:"table-results"` // TableResult saved the map of `schema` => `table` => `tableResult`
StartTime time.Time `json:"start-time"`
Duration time.Duration `json:"time-duration"`
Expand Down Expand Up @@ -131,6 +132,11 @@ func (r *Report) getDiffRows() [][]string {
}
diffRow := make([]string, 0)
diffRow = append(diffRow, dbutil.TableName(schema, table))
if !common.AllTableExist(result.TableLack) {
diffRow = append(diffRow, "skipped")
} else {
diffRow = append(diffRow, "succeed")
}
if !result.StructEqual {
diffRow = append(diffRow, "false")
} else {
Expand All @@ -154,7 +160,6 @@ func (r *Report) CalculateTotalSize(ctx context.Context, db *sql.DB) {
for schema, tableMap := range r.TableResults {
for table := range tableMap {
size, err := utils.GetTableSize(ctx, db, schema, table)

if size == 0 || err != nil {
log.Warn("fail to get the correct size of table, if you want to get the correct size, please analyze the corresponding tables", zap.String("table", dbutil.TableName(schema, table)), zap.Error(err))
} else {
Expand All @@ -166,18 +171,21 @@ func (r *Report) CalculateTotalSize(ctx context.Context, db *sql.DB) {

// CommitSummary commit summary info
func (r *Report) CommitSummary() error {
passNum, failedNum := int32(0), int32(0)
passNum, failedNum, skippedNum := int32(0), int32(0), int32(0)
for _, tableMap := range r.TableResults {
for _, result := range tableMap {
if result.StructEqual && result.DataEqual {
passNum++
} else if !common.AllTableExist(result.TableLack) {
skippedNum++
} else {
failedNum++
}
}
}
r.PassNum = passNum
r.FailedNum = failedNum
r.SkippedNum = skippedNum
summaryPath := filepath.Join(r.task.OutputDir, "summary.txt")
summaryFile, err := os.Create(summaryPath)
if err != nil {
Expand Down Expand Up @@ -208,11 +216,11 @@ func (r *Report) CommitSummary() error {
summaryFile.WriteString(tableString.String())
summaryFile.WriteString("\n\n")
}
if r.Result == Fail {
if r.Result == Fail || r.SkippedNum != 0 {
summaryFile.WriteString("The following tables contains inconsistent data\n\n")
tableString := &strings.Builder{}
table := tablewriter.NewWriter(tableString)
table.SetHeader([]string{"Table", "Structure equality", "Data diff rows", "UpCount", "DownCount"})
table.SetHeader([]string{"Table", "Result", "Structure equality", "Data diff rows", "UpCount", "DownCount"})
diffRows := r.getDiffRows()
for _, v := range diffRows {
table.Append(v)
Expand All @@ -228,26 +236,35 @@ func (r *Report) CommitSummary() error {

func (r *Report) Print(w io.Writer) error {
var summary strings.Builder
if r.Result == Pass {
summary.WriteString(fmt.Sprintf("A total of %d table have been compared and all are equal.\n", r.FailedNum+r.PassNum))
if r.Result == Pass && r.SkippedNum == 0 {
summary.WriteString(fmt.Sprintf("A total of %d table have been compared and all are equal.\n", r.FailedNum+r.PassNum+r.SkippedNum))
summary.WriteString(fmt.Sprintf("You can view the comparision details through '%s/%s'\n", r.task.OutputDir, config.LogFileName))
} else if r.Result == Fail {
} else if r.Result == Fail || r.SkippedNum != 0 {
for schema, tableMap := range r.TableResults {
for table, result := range tableMap {
if !result.StructEqual {
if result.DataSkip {
summary.WriteString(fmt.Sprintf("The structure of %s is not equal, and data-check is skipped\n", dbutil.TableName(schema, table)))
switch result.TableLack {
case common.UpstreamTableLackFlag:
summary.WriteString(fmt.Sprintf("The data of %s does not exist in upstream database\n", dbutil.TableName(schema, table)))
case common.DownstreamTableLackFlag:
summary.WriteString(fmt.Sprintf("The data of %s does not exist in downstream database\n", dbutil.TableName(schema, table)))
default:
summary.WriteString(fmt.Sprintf("The structure of %s is not equal, and data-check is skipped\n", dbutil.TableName(schema, table)))
}
} else {
summary.WriteString(fmt.Sprintf("The structure of %s is not equal\n", dbutil.TableName(schema, table)))
}
}
if !result.DataEqual {
if !result.DataEqual && common.AllTableExist(result.TableLack) {
summary.WriteString(fmt.Sprintf("The data of %s is not equal\n", dbutil.TableName(schema, table)))
}
}
}
summary.WriteString("\n")
summary.WriteString("The rest of tables are all equal.\n")
summary.WriteString("\n")
summary.WriteString(fmt.Sprintf("A total of %d tables have been compared, %d tables finished, %d tables failed, %d tables skipped.\n", r.FailedNum+r.PassNum+r.SkippedNum, r.PassNum, r.FailedNum, r.SkippedNum))
summary.WriteString(fmt.Sprintf("The patch file has been generated in \n\t'%s/'\n", r.task.FixDir))
summary.WriteString(fmt.Sprintf("You can view the comparision details through '%s/%s'\n", r.task.OutputDir, config.LogFileName))
} else {
Expand Down Expand Up @@ -295,13 +312,14 @@ func (r *Report) Init(tableDiffs []*common.TableDiff, sourceConfig [][]byte, tar
}

// SetTableStructCheckResult sets the struct check result for table.
func (r *Report) SetTableStructCheckResult(schema, table string, equal bool, skip bool) {
func (r *Report) SetTableStructCheckResult(schema, table string, equal bool, skip bool, exist int) {
r.Lock()
defer r.Unlock()
tableResult := r.TableResults[schema][table]
tableResult.StructEqual = equal
tableResult.DataSkip = skip
if !equal && r.Result != Error {
tableResult.TableLack = exist
if !equal && common.AllTableExist(tableResult.TableLack) && r.Result != Error {
r.Result = Fail
}
}
Expand All @@ -323,11 +341,11 @@ func (r *Report) SetTableDataCheckResult(schema, table string, equal bool, rowsA
}
result.ChunkMap[id.ToString()].RowsAdd += rowsAdd
result.ChunkMap[id.ToString()].RowsDelete += rowsDelete
if r.Result != Error {
if r.Result != Error && common.AllTableExist(result.TableLack) {
r.Result = Fail
}
}
if !equal && r.Result != Error {
if !equal && common.AllTableExist(result.TableLack) && r.Result != Error {
r.Result = Fail
}
}
Expand Down
Loading

0 comments on commit b4be17a

Please sign in to comment.