Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: fix CI and deprecate max-error.conflict #45349

Merged
merged 10 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix CI
Signed-off-by: lance6716 <lance6716@gmail.com>
  • Loading branch information
lance6716 committed Jul 13, 2023
commit 7ebbaf977e4b8e6666d288e5c32c3a5e201b1ba1
11 changes: 5 additions & 6 deletions br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,6 @@ func (em *ErrorManager) RecordTypeError(
}
return encodeErr
}
if em.conflictRecordsRemain.Add(-1) < 0 {
return nil
}

if em.db != nil {
errMsg := encodeErr.Error()
Expand Down Expand Up @@ -561,9 +558,11 @@ func (em *ErrorManager) syntaxError() int64 {
}

func (em *ErrorManager) conflictError() int64 {
return em.errorCount(func(maxError *config.MaxError) int64 {
return maxError.Conflict.Load()
})
val := em.conflictErrRemain.Load()
if val < 0 {
val = 0
}
return em.configConflict.Threshold - val
}

func (em *ErrorManager) charsetError() int64 {
Expand Down
88 changes: 63 additions & 25 deletions br/pkg/lightning/errormanager/errormanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"io"
"math/rand"
"strconv"
"strings"
"testing"

"github.com/DATA-DOG/go-sqlmock"
Expand All @@ -41,14 +40,14 @@ func TestInit(t *testing.T) {
cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone
cfg.TikvImporter.OnDuplicate = config.ReplaceOnDup
cfg.App.MaxError.Type.Store(10)
cfg.App.MaxError.Conflict.Store(20)
cfg.Conflict.Threshold = 20
cfg.App.TaskInfoSchemaName = "lightning_errors"

em := New(db, cfg, log.L())
require.False(t, em.conflictV1Enabled)
require.True(t, em.conflictV2Enabled)
require.Equal(t, cfg.App.MaxError.Type.Load(), em.remainingError.Type.Load())
require.Equal(t, cfg.App.MaxError.Conflict.Load(), em.remainingError.Conflict.Load())
require.Equal(t, cfg.Conflict.Threshold, em.conflictErrRemain.Load())

em.remainingError.Type.Store(0)
em.conflictV1Enabled = false
Expand Down Expand Up @@ -182,14 +181,16 @@ func TestResolveAllConflictKeys(t *testing.T) {
func TestErrorMgrHasError(t *testing.T) {
cfg := &config.Config{}
cfg.App.MaxError = config.MaxError{
Syntax: *atomic.NewInt64(100),
Charset: *atomic.NewInt64(100),
Type: *atomic.NewInt64(100),
Conflict: *atomic.NewInt64(100),
Syntax: *atomic.NewInt64(100),
Charset: *atomic.NewInt64(100),
Type: *atomic.NewInt64(100),
}
cfg.Conflict.Threshold = 100
em := &ErrorManager{
configError: &cfg.App.MaxError,
remainingError: cfg.App.MaxError,
configError: &cfg.App.MaxError,
remainingError: cfg.App.MaxError,
configConflict: &cfg.Conflict,
conflictErrRemain: atomic.NewInt64(100),
}

// no field changes, should return false
Expand All @@ -208,29 +209,32 @@ func TestErrorMgrHasError(t *testing.T) {
require.True(t, em.HasError())

em.remainingError = cfg.App.MaxError
em.remainingError.Conflict.Sub(1)
em.conflictErrRemain.Sub(1)
require.True(t, em.HasError())

// change multiple keys
em.remainingError = cfg.App.MaxError
em.remainingError.Syntax.Store(0)
em.remainingError.Charset.Store(0)
em.remainingError.Type.Store(0)
em.remainingError.Conflict.Store(0)
em.conflictErrRemain.Store(0)
require.True(t, em.HasError())
}

func TestErrorMgrErrorOutput(t *testing.T) {
cfg := &config.Config{}
cfg.App.MaxError = config.MaxError{
Syntax: *atomic.NewInt64(100),
Charset: *atomic.NewInt64(100),
Type: *atomic.NewInt64(100),
Conflict: *atomic.NewInt64(100),
Syntax: *atomic.NewInt64(100),
Charset: *atomic.NewInt64(100),
Type: *atomic.NewInt64(100),
}
cfg.Conflict.Threshold = 100

em := &ErrorManager{
configError: &cfg.App.MaxError,
remainingError: cfg.App.MaxError,
configConflict: &cfg.Conflict,
conflictErrRemain: atomic.NewInt64(100),
schemaEscaped: "`error_info`",
conflictV1Enabled: true,
}
Expand All @@ -240,26 +244,60 @@ func TestErrorMgrErrorOutput(t *testing.T) {

em.remainingError.Syntax.Sub(1)
output = em.Output()
checkStr := strings.ReplaceAll(output, "\n", "")
expected := "Import Data Error Summary: +---+-------------+-------------+--------------------------------+| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |+---+-------------+-------------+--------------------------------+|\x1b[31m 1 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 1 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m|+---+-------------+-------------+--------------------------------+"
require.Equal(t, expected, checkStr)
expected := "\n" +
"Import Data Error Summary: \n" +
"+---+-------------+-------------+--------------------------------+\n" +
"| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |\n" +
"+---+-------------+-------------+--------------------------------+\n" +
"|\x1b[31m 1 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 1 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m|\n" +
"+---+-------------+-------------+--------------------------------+\n"
require.Equal(t, expected, output)

em.remainingError = cfg.App.MaxError
em.remainingError.Syntax.Sub(10)
em.remainingError.Type.Store(10)
output = em.Output()
checkStr = strings.ReplaceAll(output, "\n", "")
expected = "Import Data Error Summary: +---+-------------+-------------+--------------------------------+| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |+---+-------------+-------------+--------------------------------+|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 90 \x1b[0m|\x1b[31m `error_info`.`type_error_v1` \x1b[0m||\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 10 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m|+---+-------------+-------------+--------------------------------+"
require.Equal(t, expected, checkStr)
expected = "\n" +
"Import Data Error Summary: \n" +
"+---+-------------+-------------+--------------------------------+\n" +
"| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |\n" +
"+---+-------------+-------------+--------------------------------+\n" +
"|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 90 \x1b[0m|\x1b[31m `error_info`.`type_error_v1` \x1b[0m|\n" +
"|\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 10 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m|\n" +
"+---+-------------+-------------+--------------------------------+\n"
require.Equal(t, expected, output)

// change multiple keys
em.remainingError = cfg.App.MaxError
em.remainingError.Syntax.Store(0)
em.remainingError.Charset.Store(0)
em.remainingError.Type.Store(0)
em.remainingError.Conflict.Store(0)
em.conflictErrRemain.Store(0)
output = em.Output()
expected = "\n" +
"Import Data Error Summary: \n" +
"+---+---------------------+-------------+----------------------------------+\n" +
"| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |\n" +
"+---+---------------------+-------------+----------------------------------+\n" +
"|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`type_error_v1` \x1b[0m|\n" +
"|\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m|\n" +
"|\x1b[31m 3 \x1b[0m|\x1b[31m Charset Error \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m \x1b[0m|\n" +
"|\x1b[31m 4 \x1b[0m|\x1b[31m Unique Key Conflict \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`conflict_error_v1` \x1b[0m|\n" +
"+---+---------------------+-------------+----------------------------------+\n"
require.Equal(t, expected, output)

em.conflictV2Enabled = true
em.conflictV1Enabled = false
output = em.Output()
checkStr = strings.ReplaceAll(output, "\n", "")
expected = "Import Data Error Summary: +---+---------------------+-------------+----------------------------------+| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |+---+---------------------+-------------+----------------------------------+|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`type_error_v1` \x1b[0m||\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m||\x1b[31m 3 \x1b[0m|\x1b[31m Charset Error \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m \x1b[0m||\x1b[31m 4 \x1b[0m|\x1b[31m Unique Key Conflict \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`conflict_error_v1` \x1b[0m|+---+---------------------+-------------+----------------------------------+"
require.Equal(t, expected, checkStr)
expected = "\n" +
"Import Data Error Summary: \n" +
"+---+---------------------+-------------+----------------------------------+\n" +
"| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |\n" +
"+---+---------------------+-------------+----------------------------------+\n" +
"|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`type_error_v1` \x1b[0m|\n" +
"|\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m|\n" +
"|\x1b[31m 3 \x1b[0m|\x1b[31m Charset Error \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m \x1b[0m|\n" +
"|\x1b[31m 4 \x1b[0m|\x1b[31m Unique Key Conflict \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`duplicate_records` \x1b[0m|\n" +
"+---+---------------------+-------------+----------------------------------+\n"
require.Equal(t, expected, output)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[lightning]
max-error-records = 50
[conflict]
max-record-rows = 50

[tikv-importer]
on-duplicate = "replace"
Expand Down