Skip to content

Commit

Permalink
ddl: forbid tiflash while enabling API v2 (#41167)
Browse files Browse the repository at this point in the history
ref #41166
  • Loading branch information
iosmanthus authored Feb 8, 2023
1 parent 37fce43 commit ae60542
Show file tree
Hide file tree
Showing 15 changed files with 58 additions and 23 deletions.
12 changes: 6 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -410,20 +410,20 @@ bazel_test: failpoint-enable bazel_ci_prepare


bazel_coverage_test: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) \
--build_event_json_file=bazel_1.json --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest \
bazel $(BAZEL_GLOBAL_CONFIG) --nohome_rc coverage $(BAZEL_CMD_CONFIG) --local_ram_resources=30720 --jobs=25 \
--@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) \
--build_event_json_file=bazel_1.json --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest,distributereorg \
bazel $(BAZEL_GLOBAL_CONFIG) --nohome_rc coverage $(BAZEL_CMD_CONFIG) --local_ram_resources=30720 --jobs=25 \
--@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest,distributereorg \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...

bazel_build: bazel_ci_prepare
mkdir -p bin
bazel $(BAZEL_GLOBAL_CONFIG) build $(BAZEL_CMD_CONFIG) \
bazel $(BAZEL_GLOBAL_CONFIG) --nohome_rc build $(BAZEL_CMD_CONFIG) --local_ram_resources=61440 --jobs=25 \
//... --//build:with_nogo_flag=true
bazel $(BAZEL_GLOBAL_CONFIG) build $(BAZEL_CMD_CONFIG) \
bazel $(BAZEL_GLOBAL_CONFIG) --nohome_rc build $(BAZEL_CMD_CONFIG) \
//cmd/importer:importer //tidb-server:tidb-server //tidb-server:tidb-server-check --//build:with_nogo_flag=true
cp bazel-out/k8-fastbuild/bin/tidb-server/tidb-server_/tidb-server ./bin
cp bazel-out/k8-fastbuild/bin/cmd/importer/importer_/importer ./bin
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/backup"
Expand Down Expand Up @@ -1736,6 +1737,15 @@ func (rc *Client) PreCheckTableTiFlashReplica(
tables []*metautil.Table,
recorder *tiflashrec.TiFlashRecorder,
) error {
// For TiDB 6.6, we do not support recover TiFlash replica while enabling API V2.
// TODO(iosmanthus): remove this after TiFlash support API V2.
if rc.GetDomain().Store().GetCodec().GetAPIVersion() == kvrpcpb.APIVersion_V2 {
log.Warn("TiFlash does not support API V2, reset replica count to 0")
for _, table := range tables {
table.Info.TiFlashReplica = nil
}
return nil
}
tiFlashStoreCount, err := rc.getTiFlashNodeCount(ctx)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ go_test(
"//errno",
"//executor",
"//infoschema",
"//keyspace",
"//kv",
"//meta",
"//meta/autoid",
Expand Down
13 changes: 7 additions & 6 deletions ddl/attributes_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/store/gcworker"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/gcutil"
Expand Down Expand Up @@ -273,7 +274,7 @@ PARTITION BY RANGE (c) (
func TestFlashbackTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -331,7 +332,7 @@ PARTITION BY RANGE (c) (
func TestDropTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -384,7 +385,7 @@ PARTITION BY RANGE (c) (
func TestCreateWithSameName(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -448,7 +449,7 @@ PARTITION BY RANGE (c) (
func TestPartition(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -508,7 +509,7 @@ PARTITION BY RANGE (c) (
func TestDropSchema(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand All @@ -534,7 +535,7 @@ PARTITION BY RANGE (c) (
func TestDefaultKeyword(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down
3 changes: 2 additions & 1 deletion ddl/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/testkit/testsetup"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -54,7 +55,7 @@ func TestMain(m *testing.M) {
conf.Experimental.AllowsExpressionIndex = true
})

_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, keyspace.CodecV1, true)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err)
os.Exit(1)
Expand Down
1 change: 1 addition & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ go_test(
"//ddl",
"//domain/infosync",
"//errno",
"//keyspace",
"//kv",
"//metrics",
"//parser/ast",
Expand Down
5 changes: 3 additions & 2 deletions domain/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/server"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
Expand Down Expand Up @@ -73,7 +74,7 @@ func TestNormalSessionPool(t *testing.T) {
domain, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domain.Close()
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, true)
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, keyspace.CodecV1, true)
require.NoError(t, err1)
conf := config.GetGlobalConfig()
conf.Socket = ""
Expand Down Expand Up @@ -107,7 +108,7 @@ func TestAbnormalSessionPool(t *testing.T) {
domain, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domain.Close()
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, true)
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, keyspace.CodecV1, true)
require.NoError(t, err1)
conf := config.GetGlobalConfig()
conf.Socket = ""
Expand Down
4 changes: 3 additions & 1 deletion domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,9 @@ func (do *Domain) Init(
// step 1: prepare the info/schema syncer which domain reload needed.
pdCli := do.GetPDClient()
skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, do.unprefixedEtcdCli, pdCli, skipRegisterToDashboard)
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID,
do.etcdClient, do.unprefixedEtcdCli, pdCli, do.Store().GetCodec(),
skipRegisterToDashboard)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions domain/infosync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ go_library(
"@com_github_gorilla_mux//:mux",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/resource_manager",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
Expand All @@ -62,6 +64,7 @@ go_test(
deps = [
"//ddl/placement",
"//ddl/util",
"//keyspace",
"//parser/model",
"//testkit/testsetup",
"//util",
Expand Down
8 changes: 5 additions & 3 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/versioninfo"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
Expand Down Expand Up @@ -192,6 +193,7 @@ func GlobalInfoSyncerInit(
serverIDGetter func() uint64,
etcdCli, unprefixedEtcdCli *clientv3.Client,
pdCli pd.Client,
codec tikv.Codec,
skipRegisterToDashBoard bool,
) (*InfoSyncer, error) {
is := &InfoSyncer{
Expand All @@ -208,7 +210,7 @@ func GlobalInfoSyncerInit(
is.labelRuleManager = initLabelRuleManager(etcdCli)
is.placementManager = initPlacementManager(etcdCli)
is.scheduleManager = initScheduleManager(etcdCli)
is.tiflashReplicaManager = initTiFlashReplicaManager(etcdCli)
is.tiflashReplicaManager = initTiFlashReplicaManager(etcdCli, codec)
is.resourceGroupManager = initResourceGroupManager(pdCli)
setGlobalInfoSyncer(is)
return is, nil
Expand Down Expand Up @@ -261,13 +263,13 @@ func initResourceGroupManager(pdCli pd.Client) pd.ResourceManagerClient {
return pdCli
}

func initTiFlashReplicaManager(etcdCli *clientv3.Client) TiFlashReplicaManager {
func initTiFlashReplicaManager(etcdCli *clientv3.Client, codec tikv.Codec) TiFlashReplicaManager {
if etcdCli == nil {
m := mockTiFlashReplicaManagerCtx{tiflashProgressCache: make(map[int64]float64)}
return &m
}
logutil.BgLogger().Warn("init TiFlashReplicaManager", zap.Strings("pd addrs", etcdCli.Endpoints()))
return &TiFlashReplicaManagerCtx{etcdCli: etcdCli, tiflashProgressCache: make(map[int64]float64)}
return &TiFlashReplicaManagerCtx{etcdCli: etcdCli, tiflashProgressCache: make(map[int64]float64), codec: codec}
}

func initScheduleManager(etcdCli *clientv3.Client) ScheduleManager {
Expand Down
7 changes: 4 additions & 3 deletions domain/infosync/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit/testsetup"
util2 "github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -67,7 +68,7 @@ func TestTopology(t *testing.T) {
require.NoError(t, err)
}()

info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, nil, false)
info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, nil, keyspace.CodecV1, false)
require.NoError(t, err)

err = info.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt)
Expand Down Expand Up @@ -152,7 +153,7 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) {
}

func TestPutBundlesRetry(t *testing.T) {
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, nil, false)
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, nil, keyspace.CodecV1, false)
require.NoError(t, err)

bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"})
Expand Down Expand Up @@ -216,7 +217,7 @@ func TestPutBundlesRetry(t *testing.T) {

func TestTiFlashManager(t *testing.T) {
ctx := context.Background()
_, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, nil, false)
_, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, nil, keyspace.CodecV1, false)
tiflash := NewMockTiFlash()
SetMockTiFlash(tiflash)

Expand Down
9 changes: 9 additions & 0 deletions domain/infosync/tiflash_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ import (
"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/pdapi"
"github.com/tikv/client-go/v2/tikv"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -77,6 +80,7 @@ type TiFlashReplicaManagerCtx struct {
etcdCli *clientv3.Client
sync.RWMutex // protect tiflashProgressCache
tiflashProgressCache map[int64]float64
codec tikv.Codec
}

// Close is called to close TiFlashReplicaManagerCtx.
Expand Down Expand Up @@ -230,6 +234,11 @@ func (m *TiFlashReplicaManagerCtx) SetTiFlashGroupConfig(ctx context.Context) er

// SetPlacementRule is a helper function to set placement rule.
func (m *TiFlashReplicaManagerCtx) SetPlacementRule(ctx context.Context, rule placement.TiFlashRule) error {
// TiDB 6.6 doesn't support tiflash multi-tenancy yet.
// TODO(iosmanthus): remove this check after TiDB supports tiflash multi-tenancy.
if m.codec.GetAPIVersion() == kvrpcpb.APIVersion_V2 {
return errors.Trace(dbterror.ErrNotSupportedYet.GenWithStackByArgs("set TiFlash replica count while enabling API V2"))
}
if err := m.SetTiFlashGroupConfig(ctx); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ go_test(
"//expression",
"//extension",
"//infoschema",
"//keyspace",
"//kv",
"//meta",
"//metrics",
Expand Down
3 changes: 2 additions & 1 deletion server/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/stretchr/testify/require"
Expand All @@ -46,7 +47,7 @@ func TestUptime(t *testing.T) {
}()
require.NoError(t, err)

_, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
_, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true)
require.NoError(t, err)

tidbdrv := NewTiDBDriver(store)
Expand Down
1 change: 1 addition & 0 deletions statistics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ go_test(
data = glob(["testdata/**"]),
embed = [":statistics"],
flaky = True,
shard_count = 50,
deps = [
"//config",
"//domain",
Expand Down

0 comments on commit ae60542

Please sign in to comment.