Skip to content

Commit

Permalink
Merge branch 'master' into refactor_plan_cache
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Jul 28, 2022
2 parents 22f18bb + 2c3d4f1 commit 1eb5094
Show file tree
Hide file tree
Showing 118 changed files with 1,382 additions and 935 deletions.
16 changes: 0 additions & 16 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ goword:tools/bin/goword
check-static: tools/bin/golangci-lint
GO111MODULE=on CGO_ENABLED=0 tools/bin/golangci-lint run -v $$($(PACKAGE_DIRECTORIES)) --config .golangci.yml

unconvert:tools/bin/unconvert
@echo "unconvert check(skip check the generated or copied code in lightning)"
@GO111MODULE=on tools/bin/unconvert $(UNCONVERT_PACKAGES)

gogenerate:
@echo "go generate ./..."
./tools/check/check-gogenerate.sh
Expand Down Expand Up @@ -211,22 +207,10 @@ tools/bin/xprog: tools/check/xprog.go
cd tools/check; \
$(GO) build -o ../bin/xprog xprog.go

tools/bin/megacheck: tools/check/go.mod
cd tools/check; \
$(GO) build -o ../bin/megacheck honnef.co/go/tools/cmd/megacheck

tools/bin/revive: tools/check/go.mod
cd tools/check; \
$(GO) build -o ../bin/revive github.com/mgechev/revive

tools/bin/goword: tools/check/go.mod
cd tools/check; \
$(GO) build -o ../bin/goword github.com/chzchzchz/goword

tools/bin/unconvert: tools/check/go.mod
cd tools/check; \
$(GO) build -o ../bin/unconvert github.com/mdempsky/unconvert

tools/bin/failpoint-ctl: tools/check/go.mod
cd tools/check; \
$(GO) build -o ../bin/failpoint-ctl github.com/pingcap/failpoint/failpoint-ctl
Expand Down
8 changes: 4 additions & 4 deletions bindinfo/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestBindingCache(t *testing.T) {
}

func TestBindingLastUpdateTime(t *testing.T) {
store, _, clean := testkit.CreateMockStoreAndDomain(t)
store, clean := testkit.CreateMockStore(t)
defer clean()

tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestBindingLastUpdateTime(t *testing.T) {
}

func TestBindingLastUpdateTimeWithInvalidBind(t *testing.T) {
store, _, clean := testkit.CreateMockStoreAndDomain(t)
store, clean := testkit.CreateMockStore(t)
defer clean()

tk := testkit.NewTestKit(t, store)
Expand All @@ -125,7 +125,7 @@ func TestBindingLastUpdateTimeWithInvalidBind(t *testing.T) {
}

func TestBindParse(t *testing.T) {
store, _, clean := testkit.CreateMockStoreAndDomain(t)
store, clean := testkit.CreateMockStore(t)
defer clean()

tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestEvolveInvalidBindings(t *testing.T) {
}

func TestSetBindingStatus(t *testing.T) {
store, _, clean := testkit.CreateMockStoreAndDomain(t)
store, clean := testkit.CreateMockStore(t)
defer clean()

tk := testkit.NewTestKit(t, store)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ go_library(
"//meta",
"//meta/autoid",
"//parser/model",
"//sessionctx",
"//statistics/handle",
"//util",
"//util/codec",
Expand Down Expand Up @@ -67,6 +66,7 @@ go_test(
flaky = True,
deps = [
"//br/pkg/conn",
"//br/pkg/gluetidb",
"//br/pkg/metautil",
"//br/pkg/mock",
"//br/pkg/pdutil",
Expand Down
7 changes: 3 additions & 4 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/conn"
connutil "github.com/pingcap/tidb/br/pkg/conn/util"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand Down Expand Up @@ -615,7 +616,7 @@ func (bc *Client) BackupRange(
zap.Uint32("concurrency", req.Concurrency))

var allStores []*metapb.Store
allStores, err = conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), conn.SkipTiFlash)
allStores, err = conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), connutil.SkipTiFlash)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -676,9 +677,7 @@ func (bc *Client) BackupRange(
func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool) (*metapb.Peer, error) {
// Keys are saved in encoded format in TiKV, so the key must be encoded
// in order to find the correct region.
if !isRawKv {
key = codec.EncodeBytes([]byte{}, key)
}
key = codec.EncodeBytesExt([]byte{}, key, isRawKv)
for i := 0; i < 5; i++ {
// better backoff.
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
Expand Down
65 changes: 7 additions & 58 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/conn/util"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand All @@ -27,7 +28,6 @@ import (
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/engine"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
Expand Down Expand Up @@ -68,68 +68,17 @@ type Mgr struct {
*utils.StoreManager
}

// StoreBehavior is the action to do in GetAllTiKVStores when a non-TiKV
// store (e.g. TiFlash store) is found.
type StoreBehavior uint8

const (
// ErrorOnTiFlash causes GetAllTiKVStores to return error when the store is
// found to be a TiFlash node.
ErrorOnTiFlash StoreBehavior = 0
// SkipTiFlash causes GetAllTiKVStores to skip the store when it is found to
// be a TiFlash node.
SkipTiFlash StoreBehavior = 1
// TiFlashOnly caused GetAllTiKVStores to skip the store which is not a
// TiFlash node.
TiFlashOnly StoreBehavior = 2
)

// GetAllTiKVStores returns all TiKV stores registered to the PD client. The
// stores must not be a tombstone and must never contain a label `engine=tiflash`.
func GetAllTiKVStores(
ctx context.Context,
pdClient pd.Client,
storeBehavior StoreBehavior,
) ([]*metapb.Store, error) {
// get all live stores.
stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return nil, errors.Trace(err)
}

// filter out all stores which are TiFlash.
j := 0
for _, store := range stores {
isTiFlash := false
if engine.IsTiFlash(store) {
if storeBehavior == SkipTiFlash {
continue
} else if storeBehavior == ErrorOnTiFlash {
return nil, errors.Annotatef(berrors.ErrPDInvalidResponse,
"cannot restore to a cluster with active TiFlash stores (store %d at %s)", store.Id, store.Address)
}
isTiFlash = true
}
if !isTiFlash && storeBehavior == TiFlashOnly {
continue
}
stores[j] = store
j++
}
return stores[:j], nil
}

func GetAllTiKVStoresWithRetry(ctx context.Context,
pdClient pd.Client,
storeBehavior StoreBehavior,
storeBehavior util.StoreBehavior,
) ([]*metapb.Store, error) {
stores := make([]*metapb.Store, 0)
var err error

errRetry := utils.WithRetry(
ctx,
func() error {
stores, err = GetAllTiKVStores(ctx, pdClient, storeBehavior)
stores, err = util.GetAllTiKVStores(ctx, pdClient, storeBehavior)
failpoint.Inject("hint-GetAllTiKVStores-error", func(val failpoint.Value) {
if val.(bool) {
logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-error injected.")
Expand All @@ -154,9 +103,9 @@ func GetAllTiKVStoresWithRetry(ctx context.Context,

func checkStoresAlive(ctx context.Context,
pdclient pd.Client,
storeBehavior StoreBehavior) error {
storeBehavior util.StoreBehavior) error {
// Check live tikv.
stores, err := GetAllTiKVStores(ctx, pdclient, storeBehavior)
stores, err := util.GetAllTiKVStores(ctx, pdclient, storeBehavior)
if err != nil {
log.Error("fail to get store", zap.Error(err))
return errors.Trace(err)
Expand Down Expand Up @@ -184,7 +133,7 @@ func NewMgr(
tlsConf *tls.Config,
securityOption pd.SecurityOption,
keepalive keepalive.ClientParameters,
storeBehavior StoreBehavior,
storeBehavior util.StoreBehavior,
checkRequirements bool,
needDomain bool,
versionCheckerType VersionCheckerType,
Expand Down Expand Up @@ -368,7 +317,7 @@ func (mgr *Mgr) GetMergeRegionSizeAndCount(ctx context.Context, client *http.Cli

// GetConfigFromTiKV get configs from all alive tikv stores.
func (mgr *Mgr) GetConfigFromTiKV(ctx context.Context, cli *http.Client, fn func(*http.Response) error) error {
allStores, err := GetAllTiKVStoresWithRetry(ctx, mgr.GetPDClient(), SkipTiFlash)
allStores, err := GetAllTiKVStoresWithRetry(ctx, mgr.GetPDClient(), util.SkipTiFlash)
if err != nil {
return errors.Trace(err)
}
Expand Down
27 changes: 14 additions & 13 deletions br/pkg/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/conn/util"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -56,7 +57,7 @@ func TestGetAllTiKVStoresWithRetryCancel(t *testing.T) {
Stores: stores,
}

_, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash)
_, err := GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash)
require.Error(t, err)
require.Equal(t, codes.Canceled, status.Code(errors.Cause(err)))
}
Expand Down Expand Up @@ -96,7 +97,7 @@ func TestGetAllTiKVStoresWithUnknown(t *testing.T) {
Stores: stores,
}

_, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash)
_, err := GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash)
require.Error(t, err)
require.Equal(t, codes.Unknown, status.Code(errors.Cause(err)))
}
Expand Down Expand Up @@ -151,50 +152,50 @@ func TestCheckStoresAlive(t *testing.T) {
Stores: stores,
}

kvStores, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash)
kvStores, err := GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash)
require.NoError(t, err)
require.Len(t, kvStores, 2)
require.Equal(t, stores[2:], kvStores)

err = checkStoresAlive(ctx, fpdc, SkipTiFlash)
err = checkStoresAlive(ctx, fpdc, util.SkipTiFlash)
require.NoError(t, err)
}

func TestGetAllTiKVStores(t *testing.T) {
testCases := []struct {
stores []*metapb.Store
storeBehavior StoreBehavior
storeBehavior util.StoreBehavior
expectedStores map[uint64]int
expectedError string
}{
{
stores: []*metapb.Store{
{Id: 1},
},
storeBehavior: SkipTiFlash,
storeBehavior: util.SkipTiFlash,
expectedStores: map[uint64]int{1: 1},
},
{
stores: []*metapb.Store{
{Id: 1},
},
storeBehavior: ErrorOnTiFlash,
storeBehavior: util.ErrorOnTiFlash,
expectedStores: map[uint64]int{1: 1},
},
{
stores: []*metapb.Store{
{Id: 1},
{Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}},
},
storeBehavior: SkipTiFlash,
storeBehavior: util.SkipTiFlash,
expectedStores: map[uint64]int{1: 1},
},
{
stores: []*metapb.Store{
{Id: 1},
{Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}},
},
storeBehavior: ErrorOnTiFlash,
storeBehavior: util.ErrorOnTiFlash,
expectedError: "^cannot restore to a cluster with active TiFlash stores",
},
{
Expand All @@ -206,7 +207,7 @@ func TestGetAllTiKVStores(t *testing.T) {
{Id: 5, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tikv"}, {Key: "engine", Value: "tiflash"}}},
{Id: 6, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tiflash"}, {Key: "engine", Value: "tikv"}}},
},
storeBehavior: SkipTiFlash,
storeBehavior: util.SkipTiFlash,
expectedStores: map[uint64]int{1: 1, 3: 1, 4: 1, 6: 1},
},
{
Expand All @@ -218,7 +219,7 @@ func TestGetAllTiKVStores(t *testing.T) {
{Id: 5, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tikv"}, {Key: "engine", Value: "tiflash"}}},
{Id: 6, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tiflash"}, {Key: "engine", Value: "tikv"}}},
},
storeBehavior: ErrorOnTiFlash,
storeBehavior: util.ErrorOnTiFlash,
expectedError: "^cannot restore to a cluster with active TiFlash stores",
},
{
Expand All @@ -230,14 +231,14 @@ func TestGetAllTiKVStores(t *testing.T) {
{Id: 5, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tikv"}, {Key: "engine", Value: "tiflash"}}},
{Id: 6, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tiflash"}, {Key: "engine", Value: "tikv"}}},
},
storeBehavior: TiFlashOnly,
storeBehavior: util.TiFlashOnly,
expectedStores: map[uint64]int{2: 1, 5: 1},
},
}

for _, testCase := range testCases {
pdClient := utils.FakePDClient{Stores: testCase.stores}
stores, err := GetAllTiKVStores(context.Background(), pdClient, testCase.storeBehavior)
stores, err := util.GetAllTiKVStores(context.Background(), pdClient, testCase.storeBehavior)
if len(testCase.expectedError) != 0 {
require.Error(t, err)
require.Regexp(t, testCase.expectedError, err.Error())
Expand Down
Loading

0 comments on commit 1eb5094

Please sign in to comment.