Skip to content

Commit

Permalink
keyspace: introduce keyspace conf and etcd path (#40269)
Browse files Browse the repository at this point in the history
ref #40425
  • Loading branch information
ystaticy authored Jan 10, 2023
1 parent 02332b2 commit 10f0093
Show file tree
Hide file tree
Showing 24 changed files with 244 additions and 21 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3582,8 +3582,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:cPtMXTExqjzk8L40qhrgB/mXiBXKP5LRU0vwjtI2Xxo=",
version = "v2.0.4",
sum = "h1:RI6bs9TDIIJ96N0lR5uZoGO8QNot4qS/1l+Mobx0InM=",
version = "v2.0.5-0.20230110071533-f313ddf58d73",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
6 changes: 4 additions & 2 deletions build/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,8 @@
"parser/": "parser code",
"meta/": "parser code",
"extension/": "extension code",
"resourcemanager/": "resourcemanager code"
"resourcemanager/": "resourcemanager code",
"keyspace": "keyspace code"
}
},
"shift": {
Expand Down Expand Up @@ -767,7 +768,8 @@
"server/conn_stmt.go": "server/conn_stmt.go",
"server/conn_test.go": "server/conn_test.go",
"extension/": "extension code",
"resourcemanager/": "resourcemanager code"
"resourcemanager/": "resourcemanager code",
"keyspace/": "keyspace code"
}
},
"SA2000": {
Expand Down
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ const (
DefTempDir = "/tmp/tidb"
// DefAuthTokenRefreshInterval is the default time interval to refresh tidb auth token.
DefAuthTokenRefreshInterval = time.Hour
// EnvVarKeyspaceName is the system env name for keyspace name.
EnvVarKeyspaceName = "KEYSPACE_NAME"
)

// Valid config maps
Expand Down Expand Up @@ -183,6 +185,7 @@ type Config struct {
VersionComment string `toml:"version-comment" json:"version-comment"`
TiDBEdition string `toml:"tidb-edition" json:"tidb-edition"`
TiDBReleaseVersion string `toml:"tidb-release-version" json:"tidb-release-version"`
KeyspaceName string `toml:"keyspace-name" json:"keyspace-name"`
Log Log `toml:"log" json:"log"`
Instance Instance `toml:"instance" json:"instance"`
Security Security `toml:"security" json:"security"`
Expand Down Expand Up @@ -1457,3 +1460,9 @@ func ContainHiddenConfig(s string) bool {
}
return false
}

// GetGlobalKeyspaceName is used to get global keyspace name
// from config file or command line.
func GetGlobalKeyspaceName() string {
return GetGlobalConfig().KeyspaceName
}
15 changes: 15 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1288,3 +1288,18 @@ func TestStatsLoadLimit(t *testing.T) {
checkQueueSizeValid(DefMaxOfStatsLoadQueueSizeLimit, true)
checkQueueSizeValid(DefMaxOfStatsLoadQueueSizeLimit+1, false)
}

func TestGetGlobalKeyspaceName(t *testing.T) {
conf := NewConfig()
require.Empty(t, conf.KeyspaceName)

UpdateGlobal(func(conf *Config) {
conf.KeyspaceName = "test"
})

require.Equal(t, "test", GetGlobalKeyspaceName())

UpdateGlobal(func(conf *Config) {
conf.KeyspaceName = ""
})
}
2 changes: 2 additions & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//errno",
"//infoschema",
"//infoschema/perfschema",
"//keyspace",
"//kv",
"//meta",
"//metrics",
Expand All @@ -55,6 +56,7 @@ go_library(
"//util/dbterror",
"//util/domainutil",
"//util/engine",
"//util/etcd",
"//util/execdetails",
"//util/expensivequery",
"//util/logutil",
Expand Down
5 changes: 5 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/infoschema/perfschema"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
Expand All @@ -65,6 +66,7 @@ import (
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/domainutil"
"github.com/pingcap/tidb/util/engine"
"github.com/pingcap/tidb/util/etcd"
"github.com/pingcap/tidb/util/expensivequery"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
Expand Down Expand Up @@ -965,6 +967,9 @@ func (do *Domain) Init(
if err != nil {
return errors.Trace(err)
}

etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec()))

do.etcdClient = cli
}
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.4
github.com/tikv/client-go/v2 v2.0.5-0.20230110071533-f313ddf58d73
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -936,8 +936,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.4 h1:cPtMXTExqjzk8L40qhrgB/mXiBXKP5LRU0vwjtI2Xxo=
github.com/tikv/client-go/v2 v2.0.4/go.mod h1:v52O5zDtv2BBus4lm5yrSQhxGW4Z4RaXWfg0U1Kuyqo=
github.com/tikv/client-go/v2 v2.0.5-0.20230110071533-f313ddf58d73 h1:RI6bs9TDIIJ96N0lR5uZoGO8QNot4qS/1l+Mobx0InM=
github.com/tikv/client-go/v2 v2.0.5-0.20230110071533-f313ddf58d73/go.mod h1:dO/2a/xi/EO3eVv9xN5G1VFtd/hythzgTeeCbW5SWuI=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down
12 changes: 12 additions & 0 deletions keyspace/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "keyspace",
srcs = ["keyspace.go"],
importpath = "github.com/pingcap/tidb/keyspace",
visibility = ["//visibility:public"],
deps = [
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_tikv_client_go_v2//tikv",
],
)
38 changes: 38 additions & 0 deletions keyspace/keyspace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package keyspace

import (
"fmt"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/tikv/client-go/v2/tikv"
)

const (
// tidbKeyspaceEtcdPathPrefix is the keyspace prefix for etcd namespace
tidbKeyspaceEtcdPathPrefix = "/keyspaces/tidb/"
)

// CodecV1 represents api v1 codec.
var CodecV1 = tikv.NewCodecV1(tikv.ModeTxn)

// MakeKeyspaceEtcdNamespace return the keyspace prefix path for etcd namespace
func MakeKeyspaceEtcdNamespace(c tikv.Codec) string {
if c.GetAPIVersion() == kvrpcpb.APIVersion_V1 {
return ""
}
return fmt.Sprintf(tidbKeyspaceEtcdPathPrefix+"%d", c.GetKeyspaceID())
}
4 changes: 4 additions & 0 deletions kv/interface_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ func newMockTxn() Transaction {
// mockStorage is used to start a must commit-failed txn.
type mockStorage struct{}

func (s *mockStorage) GetCodec() tikv.Codec {
return nil
}

func (s *mockStorage) Begin(opts ...tikv.TxnOption) (Transaction, error) {
return newMockTxn(), nil
}
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,8 @@ type Storage interface {
GetMinSafeTS(txnScope string) uint64
// GetLockWaits return all lock wait information
GetLockWaits() ([]*deadlockpb.WaitForEntry, error)
// GetCodec gets the codec of the storage.
GetCodec() tikv.Codec
}

// EtcdBackend is used for judging a storage is a real TiKV.
Expand Down
8 changes: 7 additions & 1 deletion server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,14 @@ func (s *Server) startStatusServerAndRPCServer(serverMux *http.ServeMux) {
grpcServer := NewRPCServer(s.cfg, s.dom, s)
service.RegisterChannelzServiceToServer(grpcServer)
if s.cfg.Store == "tikv" {
keyspaceName := config.GetGlobalKeyspaceName()
for {
fullPath := fmt.Sprintf("tikv://%s", s.cfg.Path)
var fullPath string
if keyspaceName == "" {
fullPath = fmt.Sprintf("%s://%s", s.cfg.Store, s.cfg.Path)
} else {
fullPath = fmt.Sprintf("%s://%s?keyspaceName=%s", s.cfg.Store, s.cfg.Path, keyspaceName)
}
store, err := store.New(fullPath)
if err != nil {
logutil.BgLogger().Error("new tikv store fail", zap.Error(err))
Expand Down
1 change: 1 addition & 0 deletions store/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//util",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/pdpb",
"@org_uber_go_zap//:zap",
],
)
Expand Down
12 changes: 6 additions & 6 deletions store/copr/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestBuildTasksWithoutBuckets(t *testing.T) {
}()

_, regionIDs, _ := testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t"))
pdCli := &tikv.CodecPDClient{Client: pdClient}
pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
defer pdCli.Close()

cache := NewRegionCache(tikv.NewRegionCache(pdCli))
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestBuildTasksByBuckets(t *testing.T) {
cluster.SplitRegionBuckets(regionIDs[0], [][]byte{{}, {'c'}, {'g'}, {'k'}, {'n'}}, regionIDs[0])
cluster.SplitRegionBuckets(regionIDs[1], [][]byte{{'n'}, {'t'}, {'x'}}, regionIDs[1])
cluster.SplitRegionBuckets(regionIDs[2], [][]byte{{'x'}, {}}, regionIDs[2])
pdCli := &tikv.CodecPDClient{Client: pdClient}
pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
defer pdCli.Close()

cache := NewRegionCache(tikv.NewRegionCache(pdCli))
Expand Down Expand Up @@ -363,7 +363,7 @@ func TestSplitRegionRanges(t *testing.T) {
}()

testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t"))
pdCli := &tikv.CodecPDClient{Client: pdClient}
pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
defer pdCli.Close()

cache := NewRegionCache(tikv.NewRegionCache(pdCli))
Expand Down Expand Up @@ -425,7 +425,7 @@ func TestRebuild(t *testing.T) {
}()

storeID, regionIDs, peerIDs := testutils.BootstrapWithMultiRegions(cluster, []byte("m"))
pdCli := &tikv.CodecPDClient{Client: pdClient}
pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
defer pdCli.Close()
cache := NewRegionCache(tikv.NewRegionCache(pdCli))
defer cache.Close()
Expand Down Expand Up @@ -488,7 +488,7 @@ func TestBuildPagingTasks(t *testing.T) {
}()

_, regionIDs, _ := testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t"))
pdCli := &tikv.CodecPDClient{Client: pdClient}
pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
defer pdCli.Close()

cache := NewRegionCache(tikv.NewRegionCache(pdCli))
Expand Down Expand Up @@ -667,7 +667,7 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) {
require.NoError(t, err)
}()
_, _, _ = testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t"))
pdCli := &tikv.CodecPDClient{Client: pdClient}
pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
defer pdCli.Close()
cache := NewRegionCache(tikv.NewRegionCache(pdCli))
defer cache.Close()
Expand Down
41 changes: 38 additions & 3 deletions store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func WithPDClientConfig(client config.PDClient) Option {

// TiKVDriver implements engine TiKV.
type TiKVDriver struct {
keyspaceName string
pdConfig config.PDClient
security config.Security
tikvConfig config.TiKVClient
Expand Down Expand Up @@ -117,7 +118,7 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (kv.Storage,
mc.Lock()
defer mc.Unlock()
d.setDefaultAndOptions(options...)
etcdAddrs, disableGC, err := config.ParsePath(path)
etcdAddrs, disableGC, keyspaceName, err := config.ParsePath(path)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -157,11 +158,39 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (kv.Storage,
return nil, errors.Trace(err)
}

pdClient := tikv.CodecPDClient{Client: pdCli}
s, err := tikv.NewKVStore(uuid, &pdClient, spkv, tikv.NewRPCClient(tikv.WithSecurity(d.security)))
// ---------------- keyspace logic ----------------
var (
pdClient *tikv.CodecPDClient
)

if keyspaceName == "" {
logutil.BgLogger().Info("using API V1.")
pdClient = tikv.NewCodecPDClient(tikv.ModeTxn, pdCli)
} else {
logutil.BgLogger().Info("using API V2.", zap.String("keyspaceName", keyspaceName))
pdClient, err = tikv.NewCodecPDClientWithKeyspace(tikv.ModeTxn, pdCli, keyspaceName)
if err != nil {
return nil, errors.Trace(err)
}
// If there's setting keyspace-name, then skipped GC worker logic.
// It needs a group of special tidb nodes to execute GC worker logic.
// TODO: remove this restriction while merged keyspace GC worker logic.
disableGC = true
}

codec := pdClient.GetCodec()

rpcClient := tikv.NewRPCClient(
tikv.WithSecurity(d.security),
tikv.WithCodec(codec),
)

s, err := tikv.NewKVStore(uuid, pdClient, spkv, rpcClient)
if err != nil {
return nil, errors.Trace(err)
}

// ---------------- keyspace logic ----------------
if d.txnLocalLatches.Enabled {
s.EnableTxnLocalLatches(d.txnLocalLatches.Capacity)
}
Expand All @@ -178,6 +207,7 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (kv.Storage,
memCache: kv.NewCacheDB(),
enableGC: !disableGC,
coprStore: coprStore,
codec: codec,
}

mc.cache[uuid] = store
Expand All @@ -192,6 +222,7 @@ type tikvStore struct {
enableGC bool
gcWorker *gcworker.GCWorker
coprStore *copr.Store
codec tikv.Codec
}

// Name gets the name of the storage engine
Expand Down Expand Up @@ -343,3 +374,7 @@ func (s *tikvStore) GetLockWaits() ([]*deadlockpb.WaitForEntry, error) {
}
return result, nil
}

func (s *tikvStore) GetCodec() tikv.Codec {
return s.codec
}
1 change: 1 addition & 0 deletions store/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type Storage interface {
Closed() <-chan struct{}
GetMinSafeTS(txnScope string) uint64
GetLockWaits() ([]*deadlockpb.WaitForEntry, error)
GetCodec() tikv.Codec
}

// Helper is a middleware to get some information from tikv/pd. It can be used for TiDB's http api or mem table.
Expand Down
6 changes: 6 additions & 0 deletions store/mockstore/mockstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ func (s *mockStorage) Close() error {
return s.KVStore.Close()
}

func (s *mockStorage) GetCodec() tikv.Codec {
pdClient := s.KVStore.GetPDClient()
pdCodecCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
return pdCodecCli.GetCodec()
}

// MockLockWaitSetter is used to set the mocked lock wait information, which helps implementing tests that uses the
// GetLockWaits function.
type MockLockWaitSetter interface {
Expand Down
Loading

0 comments on commit 10f0093

Please sign in to comment.