Skip to content

Commit

Permalink
Merge branch 'master' into drop_role
Browse files Browse the repository at this point in the history
  • Loading branch information
morgo authored Nov 14, 2021
2 parents ee7d2d1 + 4324e4e commit 2df1e20
Show file tree
Hide file tree
Showing 286 changed files with 14,198 additions and 12,282 deletions.
28 changes: 15 additions & 13 deletions .github/workflows/br_compatible_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ name: BR / Compatibility Test

on:
push:
# merged git action
branches:
- master
- 'release-[0-9].[0-9]*'
Expand All @@ -14,19 +15,20 @@ on:
- '!br/docs/**'
- '!br/tests/**'
- '!br/docker/**'
pull_request:
branches:
- master
- 'release-[0-9].[0-9]*'
paths:
- 'br/**'
- '!**.html'
- '!**.md'
- '!CNAME'
- '!LICENSE'
- '!br/docs/**'
- '!br/tests/**'
- '!br/docker/**'
# disable pull request only keep the merge action since it is very costly to run those tests
# pull_request:
# branches:
# - master
# - 'release-[0-9].[0-9]*'
# paths:
# - 'br/**'
# - '!**.html'
# - '!**.md'
# - '!CNAME'
# - '!LICENSE'
# - '!br/docs/**'
# - '!br/tests/**'
# - '!br/docker/**'

# See: https://docs.github.com/en/actions/reference/workflow-syntax-for-github-actions#concurrency.
concurrency:
Expand Down
23 changes: 21 additions & 2 deletions .github/workflows/compile_br.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ on:
- '!br/docs/**'
- '!br/tests/**'
- '!br/docker/**'
#change trigger policy
pull_request:
types:
- labeled # <--
branches:
- master
- 'release-[0-9].[0-9]*'
Expand All @@ -35,8 +38,25 @@ concurrency:
cancel-in-progress: true

jobs:
compile-windows:
if: github.event_name == 'push' || github.event_name == 'pull_request' && github.event.label.name == 'action/run-br-cross-platform-build'
name: Compile for Windows job
runs-on: windows-latest
steps:
- uses: actions/checkout@v2.1.0

- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16

- name: Run build
run: make build_tools

compile:
if: github.event_name == 'pull_request' && github.event.label.name == 'action/run-br-cross-platform-build'
name: Compile for ${{ matrix.os }} / ${{ matrix.target}}

runs-on: ${{ matrix.os }}
strategy:
matrix:
Expand All @@ -47,8 +67,6 @@ jobs:
- os: ubuntu-latest
target: aarch64-unknown-linux-gnu

- os: windows-latest
target: x86_64-pc-windows-msvc
steps:
- uses: actions/checkout@v2.1.0

Expand All @@ -61,6 +79,7 @@ jobs:
run: make build_tools

compile-freebsd:
if: github.event_name == 'pull_request' && github.event.label.name == 'action/run-br-cross-platform-build'
name: Compile for FreeBSD job
runs-on: ubuntu-latest
steps:
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/dumpling_integration_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ on:
- 'util/codec/**'
- 'parser/model/**'

# See: https://docs.github.com/en/actions/reference/workflow-syntax-for-github-actions#concurrency.
concurrency:
group: ${{ github.ref }}-${{ github.workflow }}
cancel-in-progress: true

jobs:
unit-test:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ linters:
- exportloopref
- rowserrcheck
- unconvert
- makezero

linters-settings:
staticcheck:
checks: ["S1002","S1004","S1007","S1009","S1010","S1012","S1019","S1020","S1021","S1024","S1030","SA2*","SA3*","SA4009","SA5*","SA6000","SA6001","SA6005", "-SA2002"]
Expand All @@ -36,3 +38,4 @@ issues:
- errcheck
- gosec
- rowserrcheck
- makezero
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ explaintest: server_check
ddltest:
@cd cmd/ddltest && $(GO) test -o ../../bin/ddltest -c

upload-coverage: SHELL:=/bin/bash
upload-coverage:
ifeq ("$(TRAVIS_COVERAGE)", "1")
mv overalls.coverprofile coverage.txt
bash <(curl -s https://codecov.io/bash)
ifneq ($(CODECOV_TOKEN), "")
curl -LO ${FILE_SERVER_URL}/download/cicd/ci-tools/codecov
chmod +x codecov
./codecov -t ${CODECOV_TOKEN}
endif

devgotest: failpoint-enable
Expand All @@ -129,7 +129,7 @@ devgotest: failpoint-enable
gotest: failpoint-enable
@echo "Running in native mode."
@export log_level=info; export TZ='Asia/Shanghai'; \
$(GOTEST) -ldflags '$(TEST_LDFLAGS)' $(EXTRA_TEST_ARGS) -cover $(PACKAGES_TIDB_TESTS) -check.p true > gotest.log || { $(FAILPOINT_DISABLE); cat 'gotest.log'; exit 1; }
$(GOTEST) -ldflags '$(TEST_LDFLAGS)' $(EXTRA_TEST_ARGS) -cover $(PACKAGES_TIDB_TESTS) -coverprofile=coverage.txt -check.p true > gotest.log || { $(FAILPOINT_DISABLE); cat 'gotest.log'; exit 1; }
@$(FAILPOINT_DISABLE)

race: failpoint-enable
Expand Down
5 changes: 2 additions & 3 deletions bindinfo/capture_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,8 @@ func TestCapturedBindingCharset(t *testing.T) {
require.Len(t, rows, 1)
require.Equal(t, "update `test` . `t` set `name` = ? where `name` <= ?", rows[0][0])
require.Equal(t, "UPDATE /*+ use_index(@`upd_1` `test`.`t` `idx`)*/ `test`.`t` SET `name`='hello' WHERE `name` <= 'abc'", rows[0][1])
// Charset and Collation are empty now, they are not used currently.
require.Equal(t, "", rows[0][6])
require.Equal(t, "", rows[0][7])
require.Equal(t, "utf8mb4", rows[0][6])
require.Equal(t, "utf8mb4_bin", rows[0][7])
}

func TestConcurrentCapture(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ func (manager *DuplicateManager) CollectDuplicateRowsFromLocalIndex(
defer iter.Close()

for iter.First(); iter.Valid(); iter.Next() {
hasDataConflict = true
rawKey, _, _, err := manager.keyAdapter.Decode(nil, iter.Key())
if err != nil {
return err
Expand Down Expand Up @@ -525,7 +526,6 @@ func (manager *DuplicateManager) CollectDuplicateRowsFromLocalIndex(
return err
}
dataConflictInfos = dataConflictInfos[:0]
hasDataConflict = true
return nil
}(); err != nil {
return false, errors.Trace(err)
Expand Down Expand Up @@ -569,6 +569,7 @@ func (manager *DuplicateManager) CollectDuplicateRowsFromLocalIndex(
defer iter.Close()

for iter.First(); iter.Valid(); iter.Next() {
hasDataConflict = true
rawKey, _, _, err := manager.keyAdapter.Decode(nil, iter.Key())
if err != nil {
indexLogger.Error(
Expand Down
13 changes: 7 additions & 6 deletions br/pkg/lightning/backend/local/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type duplicateIter struct {
keyAdapter KeyAdapter
writeBatch *pebble.Batch
writeBatchSize int64
logger log.Logger
}

func (d *duplicateIter) Seek(key []byte) bool {
Expand Down Expand Up @@ -111,11 +112,6 @@ func (d *duplicateIter) record(key []byte, val []byte) {
}

func (d *duplicateIter) Next() bool {
logger := log.With(
zap.String("table", common.UniqueTable(d.engineFile.tableInfo.DB, d.engineFile.tableInfo.Name)),
zap.Int64("tableID", d.engineFile.tableInfo.ID),
zap.Stringer("engineUUID", d.engineFile.UUID))

recordFirst := false
for d.err == nil && d.ctx.Err() == nil && d.iter.Next() {
d.nextKey, _, _, d.err = d.keyAdapter.Decode(d.nextKey[:0], d.iter.Key())
Expand All @@ -128,7 +124,7 @@ func (d *duplicateIter) Next() bool {
d.curVal = append(d.curVal[:0], d.iter.Value()...)
return true
}
logger.Debug("[detect-dupe] local duplicate key detected",
d.logger.Debug("[detect-dupe] local duplicate key detected",
logutil.Key("key", d.curKey),
logutil.Key("prevValue", d.curVal),
logutil.Key("value", d.iter.Value()))
Expand Down Expand Up @@ -182,12 +178,17 @@ func newDuplicateIter(ctx context.Context, engineFile *File, opts *pebble.IterOp
if len(opts.UpperBound) > 0 {
newOpts.UpperBound = codec.EncodeBytes(nil, opts.UpperBound)
}
logger := log.With(
zap.String("table", common.UniqueTable(engineFile.tableInfo.DB, engineFile.tableInfo.Name)),
zap.Int64("tableID", engineFile.tableInfo.ID),
zap.Stringer("engineUUID", engineFile.UUID))
return &duplicateIter{
ctx: ctx,
iter: engineFile.db.NewIter(newOpts),
engineFile: engineFile,
keyAdapter: engineFile.keyAdapter,
writeBatch: engineFile.duplicateDB.NewBatch(),
logger: logger,
}
}

Expand Down
53 changes: 33 additions & 20 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,11 @@ var bufferPool = membuf.NewPool(1024, manual.Allocator{})
func openDuplicateDB(storeDir string) (*pebble.DB, error) {
dbPath := filepath.Join(storeDir, duplicateDBName)
// TODO: Optimize the opts for better write.
opts := &pebble.Options{}
opts := &pebble.Options{
TablePropertyCollectors: []func() pebble.TablePropertyCollector{
newRangePropertiesCollector,
},
}
return pebble.Open(dbPath, opts)
}

Expand Down Expand Up @@ -2162,22 +2166,37 @@ func (local *local) ResolveDuplicateRows(ctx context.Context, tbl table.Table, t
return err
}

// Collect all duplicating rows from downstream TiDB.
// TODO: what if there are 1,000,000 duplicate rows? need some pagination scheme.
handleRows, err := local.errorMgr.GetConflictKeys(ctx, tableName)
if err != nil {
return err
preRowID := int64(0)
for {
handleRows, lastRowID, err := local.errorMgr.GetConflictKeys(ctx, tableName, preRowID, 1000)
if err != nil {
return errors.Annotate(err, "cannot query conflict keys")
}
if len(handleRows) == 0 {
break
}
if err := local.deleteDuplicateRows(ctx, logger, handleRows, decoder); err != nil {
return errors.Annotate(err, "cannot delete duplicated entries")
}
preRowID = lastRowID
}
return nil
}

func (local *local) deleteDuplicateRows(ctx context.Context, logger *log.Task, handleRows [][2][]byte, decoder *kv.TableKVDecoder) (err error) {
// Starts a Delete transaction.
txn, err := local.tikvCli.Begin()
if err != nil {
return err
}
txn.SetPessimistic(true)
defer func() {
if txn != nil && err != nil {
txn.Rollback()
if err == nil {
err = txn.Commit(ctx)
} else {
if rollbackErr := txn.Rollback(); rollbackErr != nil {
logger.Warn("failed to rollback transaction", zap.Error(rollbackErr))
}
}
}()

Expand Down Expand Up @@ -2210,11 +2229,7 @@ func (local *local) ResolveDuplicateRows(ctx context.Context, tbl table.Table, t
}

logger.Info("[resolve-dupe] number of KV pairs to be deleted", zap.Int("count", txn.Len()))

// Commit the transaction.
err = txn.Commit(ctx)
txn = nil
return errors.Annotate(err, "cannot delete duplicated entries")
return nil
}

func (e *File) unfinishedRanges(ranges []Range) []Range {
Expand Down Expand Up @@ -2351,11 +2366,9 @@ func (local *local) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) err
}

func (local *local) CheckRequirements(ctx context.Context, checkCtx *backend.CheckCtx) error {
versionStr, err := local.g.GetSQLExecutor().ObtainStringWithLog(
ctx,
"SELECT version();",
"check TiDB version",
log.L())
// TODO: support lightning via SQL
db, _ := local.g.GetDB()
versionStr, err := version.FetchVersion(ctx, db)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -2369,8 +2382,8 @@ func (local *local) CheckRequirements(ctx context.Context, checkCtx *backend.Che
return err
}

tidbVersion, _ := version.ExtractTiDBVersion(versionStr)
return checkTiFlashVersion(ctx, local.g, checkCtx, *tidbVersion)
serverInfo := version.ParseServerInfo(versionStr)
return checkTiFlashVersion(ctx, local.g, checkCtx, *serverInfo.ServerVersion)
}

func checkTiDBVersion(_ context.Context, versionStr string, requiredMinVersion, requiredMaxVersion semver.Version) error {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/local_freebsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build freebsd
// +build freebsd

package local
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/local_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows
// +build !windows

package local
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/local_unix_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !freebsd && !windows
// +build !freebsd,!windows

package local
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/local_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build windows
// +build windows

package local
Expand Down
5 changes: 1 addition & 4 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,10 @@ func (s *mysqlSuite) TestWriteRowsErrorDowngrading(c *C) {
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "9.csv", int64(0), nonRetryableError.Error(), "(3)").
WillReturnResult(driver.ResultNoRows)
// the forth row will exceed the error threshold, won't record this error
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(4)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "10.csv", int64(0), nonRetryableError.Error(), "(4)").
WillReturnResult(driver.ResultNoRows)

ctx := context.Background()
logger := log.L()
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/common/storage_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows
// +build !windows

// TODO: Deduplicate this implementation with DM!
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/common/storage_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build windows
// +build windows

// TODO: Deduplicate this implementation with DM!
Expand Down
Loading

0 comments on commit 2df1e20

Please sign in to comment.