From e11f56caa292638332e0509ddf6d001e41cb3e21 Mon Sep 17 00:00:00 2001 From: zzm Date: Tue, 20 Dec 2022 14:27:46 +0800 Subject: [PATCH] Merge PR #130, #133, #145 into `keyspace on release 6.4` rate limit (#155) * merge pr#130,#133 into 6.4 Signed-off-by: zeminzhou * fix start limit rate Signed-off-by: zeminzhou * default config Signed-off-by: zeminzhou * keyspace: use `user_storage_size` instead of `storage_size` for ratelimit (#145) * use user_storage_size instead of storage_size for ratelimit Signed-off-by: zeminzhou * update golang.org/x/text to 0.3.8 Signed-off-by: zeminzhou * Fix error handle when loadKeyspace failed at startup (#143) Signed-off-by: yongman Signed-off-by: yongman * add keyspace-activate mode (#142) * fix loadkeyspace nil (#144) * fix loadkeyspace nil Signed-off-by: ystaticy * retry load keyspace Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com> * address comments Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com> Signed-off-by: ystaticy Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com> Co-authored-by: David <8039876+AmoebaProtozoa@users.noreply.github.com> * Compatible with old pd Signed-off-by: zeminzhou Signed-off-by: zeminzhou Signed-off-by: yongman Signed-off-by: ystaticy Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com> Co-authored-by: yongman Co-authored-by: better0332 Co-authored-by: ystaticy Co-authored-by: David <8039876+AmoebaProtozoa@users.noreply.github.com> * make check Signed-off-by: zeminzhou * fix confict Signed-off-by: zeminzhou * make check Signed-off-by: zeminzhou Signed-off-by: zeminzhou Signed-off-by: yongman Signed-off-by: ystaticy Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com> Co-authored-by: yongman Co-authored-by: better0332 Co-authored-by: ystaticy Co-authored-by: David <8039876+AmoebaProtozoa@users.noreply.github.com> --- br/pkg/conn/BUILD.bazel | 1 + br/pkg/metautil/BUILD.bazel | 1 + br/pkg/restore/BUILD.bazel | 2 + br/pkg/task/BUILD.bazel | 2 + br/pkg/utils/BUILD.bazel | 1 + config/BUILD.bazel | 1 + config/config.go | 4 + config/ratelimit.go | 23 +++++ keyspace/BUILD.bazel | 9 +- keyspace/ratelimit.go | 157 +++++++++++++++++++++++++++++++++ store/driver/txn/BUILD.bazel | 2 + store/driver/txn/txn_driver.go | 60 +++++++++++++ tidb-server/main.go | 34 +++++++ 13 files changed, 296 insertions(+), 1 deletion(-) create mode 100644 config/ratelimit.go create mode 100644 keyspace/ratelimit.go diff --git a/br/pkg/conn/BUILD.bazel b/br/pkg/conn/BUILD.bazel index fc88f174394f3..ec732ffc20c1f 100644 --- a/br/pkg/conn/BUILD.bazel +++ b/br/pkg/conn/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//br/pkg/utils", "//br/pkg/version", "//domain", + "//keyspace", "//kv", "@com_github_docker_go_units//:go-units", "@com_github_opentracing_opentracing_go//:opentracing-go", diff --git a/br/pkg/metautil/BUILD.bazel b/br/pkg/metautil/BUILD.bazel index 97a62414eff19..8973bbfc91dbd 100644 --- a/br/pkg/metautil/BUILD.bazel +++ b/br/pkg/metautil/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/encryptionpb", "@com_github_pingcap_log//:log", + "@com_github_tikv_client_go_v2//tikv", "@org_uber_go_zap//:zap", ], ) diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel index 2c42717bafe36..3f9ff4363b4dc 100644 --- a/br/pkg/restore/BUILD.bazel +++ b/br/pkg/restore/BUILD.bazel @@ -47,6 +47,7 @@ go_library( "//ddl", "//ddl/util", "//domain", + "//keyspace", "//kv", "//meta", "//parser/model", @@ -157,6 +158,7 @@ go_test( "@com_github_pingcap_kvproto//pkg/encryptionpb", "@com_github_pingcap_kvproto//pkg/errorpb", "@com_github_pingcap_kvproto//pkg/import_sstpb", + "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_kvproto//pkg/pdpb", "@com_github_pingcap_kvproto//pkg/recoverdatapb", diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index 979afd1ba9110..852836ed6f23e 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -42,6 +42,7 @@ go_library( "//br/pkg/version", "//config", "//ddl", + "//keyspace", "//kv", "//parser/model", "//parser/mysql", @@ -67,6 +68,7 @@ go_library( "@com_github_spf13_pflag//:pflag", "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_pd_client//:client", "@com_google_cloud_go_storage//:storage", "@io_etcd_go_etcd_client_pkg_v3//transport", diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index 0ae948d18a779..13c4be5294419 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "//br/pkg/logutil", "//br/pkg/metautil", "//errno", + "//keyspace", "//kv", "//parser/model", "//parser/mysql", diff --git a/config/BUILD.bazel b/config/BUILD.bazel index e06843bec3df2..839d2bf3df475 100644 --- a/config/BUILD.bazel +++ b/config/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "config.go", "config_util.go", "const.go", + "ratelimit.go", ], importpath = "github.com/pingcap/tidb/config", visibility = ["//visibility:public"], diff --git a/config/config.go b/config/config.go index 94ef886547de3..b1a0e7d624115 100644 --- a/config/config.go +++ b/config/config.go @@ -295,6 +295,9 @@ type Config struct { TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"` // TiDBMaxReuseColumn indicates max cached column num TiDBMaxReuseColumn uint32 `toml:"tidb-max-reuse-column" json:"tidb-max-reuse-column"` + + // Ratelimit is used to control the rate limit of the tenant requests. + Ratelimit RatelimitConfig `toml:"ratelimit" json:"ratelimit"` } // UpdateTempStoragePath is to update the `TempStoragePath` if port/statusPort was changed @@ -997,6 +1000,7 @@ var defaultConf = Config{ TrxSummary: DefaultTrxSummary(), TiDBMaxReuseChunk: 64, TiDBMaxReuseColumn: 256, + Ratelimit: defaultRatelimitConfig(), } var ( diff --git a/config/ratelimit.go b/config/ratelimit.go new file mode 100644 index 0000000000000..ce121569fa124 --- /dev/null +++ b/config/ratelimit.go @@ -0,0 +1,23 @@ +package config + +// RatelimitConfig contains ratelimit configuration options. +type RatelimitConfig struct { + FullSpeed int `toml:"full-speed" json:"full-speed"` + FullSpeedCapacity int `toml:"full-speed-capacity" json:"full-speed-capacity"` + LowSpeed int `toml:"low-speed" json:"low-speed"` + LowSpeedCapacity int `toml:"low-speed-capacity" json:"low-speed-capacity"` + LowSpeedWatermark int64 `toml:"low-speed-watermark" json:"low-speed-watermark"` + BlockWriteWatermark int64 `toml:"block-write-watermark" json:"block-write-watermark"` +} + +// defaultRatelimitConfig creates a new RatelimitConfig. +func defaultRatelimitConfig() RatelimitConfig { + return RatelimitConfig{ + FullSpeed: 1024 * 1024, // 1MiB/s + FullSpeedCapacity: 10 * 1024 * 1024, // 10MiB + LowSpeed: 1024 * 10, // 10KiB/s + LowSpeedCapacity: 1024 * 1024, // 1MiB + LowSpeedWatermark: 1024 * 1024 * 1024, // 1GiB + BlockWriteWatermark: 2 * 1024 * 1024 * 1024, // 2GiB + } +} diff --git a/keyspace/BUILD.bazel b/keyspace/BUILD.bazel index 32be4ec46afac..a299fc7e0eb76 100644 --- a/keyspace/BUILD.bazel +++ b/keyspace/BUILD.bazel @@ -2,13 +2,20 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "keyspace", - srcs = ["keyspace.go"], + srcs = [ + "keyspace.go", + "ratelimit.go", + ], importpath = "github.com/pingcap/tidb/keyspace", visibility = ["//visibility:public"], deps = [ "//config", "//kv", + "//util", + "//util/codec", + "//util/logutil", "@com_github_tikv_client_go_v2//tikv", + "@org_uber_go_zap//:zap", ], ) diff --git a/keyspace/ratelimit.go b/keyspace/ratelimit.go new file mode 100644 index 0000000000000..236df513b4469 --- /dev/null +++ b/keyspace/ratelimit.go @@ -0,0 +1,157 @@ +package keyspace + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "net/http" + "net/url" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +type rateLimiter struct { + sync.Mutex + accumulateSpeed int + maxToken int + + token int + lastUpdate int64 + dataSize int64 +} + +func (r *rateLimiter) Usage() int64 { + return atomic.LoadInt64(&r.dataSize) +} + +func (r *rateLimiter) MaxToken() int { + r.Lock() + defer r.Unlock() + return r.maxToken +} + +func (r *rateLimiter) Consume(n int) (bool, time.Duration) { + r.Lock() + defer r.Unlock() + + now := time.Now().Unix() + if now > r.lastUpdate { + r.token += int(now-r.lastUpdate) * r.accumulateSpeed + if r.token > r.maxToken { + r.token = r.maxToken + } + r.lastUpdate = now + } + + if n <= r.token { + r.token -= n + return true, 0 + } + if n > r.maxToken { + return false, 0 + } + return false, time.Duration((n-r.token)/r.accumulateSpeed+1) * time.Second +} + +func (r *rateLimiter) updateSpeed(dataSize int64) { + r.Lock() + defer r.Unlock() + + cfg := config.GetGlobalConfig().Ratelimit + + speed, max := cfg.FullSpeed, cfg.FullSpeedCapacity + if dataSize > cfg.LowSpeedWatermark { + speed, max = cfg.LowSpeed, cfg.LowSpeedCapacity + } + if dataSize > cfg.BlockWriteWatermark { + speed, max = 10, cfg.LowSpeedCapacity/2 // set a minimal value, or tidb-server may fail to start. + } + + if speed != r.accumulateSpeed || max != r.maxToken { + logutil.BgLogger().Info("update rate limit speed", zap.Int("speed", speed), zap.Int("max", max)) + } + + r.accumulateSpeed = speed + r.maxToken = max + if r.token > r.maxToken { + r.token = r.maxToken + } + atomic.StoreInt64(&r.dataSize, dataSize) +} + +func (r *rateLimiter) StartAdjustLimit(pdAddrs []string, keyspaceID uint32) { + r.adjustLimit(pdAddrs, keyspaceID) + go func() { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + <-ticker.C + r.adjustLimit(pdAddrs, keyspaceID) + } + }() +} + +// PDRegionStats is the json response from PD. +type PDRegionStats struct { + StorageSize *int64 `json:"storage_size"` + UserStorageSize *int64 `json:"user_storage_size"` +} + +func (r *rateLimiter) adjustLimit(pdAddrs []string, keyspaceID uint32) { + start, end := r.keyspaceRange(keyspaceID) + for _, addr := range pdAddrs { + path := fmt.Sprintf("/pd/api/v1/stats/region?start_key=%s&end_key=%s", url.QueryEscape(string(start)), url.QueryEscape(string(end))) + res, err := util.InternalHTTPClient().Get(util.ComposeURL(addr, path)) + if err != nil { + logutil.BgLogger().Warn("get region stats failed", zap.String("pd", addr), zap.Error(err)) + continue + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + logutil.BgLogger().Warn("get region stats failed", zap.String("pd", addr), zap.Int("status", res.StatusCode)) + continue + } + var stats PDRegionStats + err = json.NewDecoder(res.Body).Decode(&stats) + if err != nil { + logutil.BgLogger().Warn("decode region stats failed", zap.String("pd", addr), zap.Error(err)) + continue + } + + var userStorageSize int64 + if stats.UserStorageSize != nil { + userStorageSize = *(stats.UserStorageSize) + } else if stats.StorageSize != nil { + userStorageSize = *(stats.StorageSize) + } + r.updateSpeed(userStorageSize * 1024 * 1024) // storage size unit is MiB. + return + } + logutil.BgLogger().Error("get region stats failed from all PDs") +} + +func (r *rateLimiter) keyspaceRange(id uint32) ([]byte, []byte) { + var start, end [4]byte + binary.BigEndian.PutUint32(start[:], id) + binary.BigEndian.PutUint32(end[:], id+1) + start[0], end[0] = 'x', 'x' // we only care about txn data. + if id == 0xffffff { + end[0] = 'x' + 1 // handle overflow for max keyspace id. + } + return codec.EncodeBytes(nil, start[:]), codec.EncodeBytes(nil, end[:]) +} + +// Limiter is an instance of rateLimiter +var Limiter = &rateLimiter{ + accumulateSpeed: 10 * 1024, + maxToken: 1024 * 1024, + token: 1024 * 1024, + lastUpdate: time.Now().Unix(), +} diff --git a/store/driver/txn/BUILD.bazel b/store/driver/txn/BUILD.bazel index ab10c88f12c9c..5bd47e7ea09ff 100644 --- a/store/driver/txn/BUILD.bazel +++ b/store/driver/txn/BUILD.bazel @@ -15,6 +15,8 @@ go_library( importpath = "github.com/pingcap/tidb/store/driver/txn", visibility = ["//visibility:public"], deps = [ + "//config", + "//keyspace", "//kv", "//parser/model", "//parser/mysql", diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 851e68eac89ef..c25af083a4b0c 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -16,12 +16,16 @@ package txn import ( "context" + "fmt" "sync/atomic" + "time" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/keyspace" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/binloginfo" @@ -76,8 +80,64 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput return txn.extractKeyErr(err) } +type size interface { + int64 | int +} + +func toGiB[T size](size T) int { + return int(size / (1024 * 1024 * 1024)) +} + +func toMiB[T size](size T) int { + return int(size / (1024 * 1024)) +} + +func toKiB[T size](size T) int { + return int(size / 1024) +} + +func humanReadable[T size](size T) string { + if size < 1024 { + return fmt.Sprintf("%d B", size) + } else if size < 1024*1024 { + return fmt.Sprintf("%d KiB", toKiB(size)) + } else if size < 1024*1024*1024 { + return fmt.Sprintf("%d MiB", toMiB(size)) + } else { + return fmt.Sprintf("%d GiB", toGiB(size)) + } +} + func (txn *tikvTxn) Commit(ctx context.Context) error { err := txn.KVTxn.Commit(ctx) + if keyspace.GetKeyspaceNameBySettings() != "" && txn.GetDiskFullOpt() == kvrpcpb.DiskFullOpt_NotAllowedOnFull { + txnSize := txn.GetUnionStore().GetMemBuffer().Size() + for txnSize > 0 { + ok, wait := keyspace.Limiter.Consume(txnSize) + if ok { + break + } + if wait == time.Duration(0) { + usage := keyspace.Limiter.Usage() + txnLimit := fmt.Sprintf( + "You have reached max transaction limit %s of Serverless Tier cluster", + humanReadable(keyspace.Limiter.MaxToken())) + cfg := config.GetGlobalConfig().Ratelimit + if usage < cfg.LowSpeedWatermark { + return errors.New(txnLimit + ".") + } + if usage < cfg.BlockWriteWatermark { + return errors.New(fmt.Sprintf("%s when total used data %s is more than soft "+ + "limit %s at beta stage.", txnLimit, humanReadable(usage), humanReadable(cfg.LowSpeedCapacity))) + } + return errors.New(fmt.Sprintf("%s when total used data %s is more than twice of "+ + "soft limit %s at beta stage. Please delete some of your data to reclaim spaces. "+ + "Be aware that deleting data is also throttled and can be slow.", + txnLimit, humanReadable(usage), humanReadable(cfg.LowSpeedWatermark))) + } + time.Sleep(wait + time.Millisecond*10) + } + } return txn.extractKeyErr(err) } diff --git a/tidb-server/main.go b/tidb-server/main.go index 44448af16dadc..d7f651174cf47 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -297,8 +297,12 @@ func main() { mainErrHandler(err) svr := createServer(storage, dom) + err = startRateLimit() + terror.MustNil(err) + session.RunBootstrapSQL(storage) + // Register error API is not thread-safe, the caller MUST NOT register errors after initialization. // To prevent misuse, set a flag to indicate that register new error will panic immediately. // For regression of issue like https://github.com/pingcap/tidb/issues/28190 @@ -385,6 +389,36 @@ func registerStores() { terror.MustNil(err) } +func startRateLimit() error { + // load keyspace and set metric labels. + cfg := config.GetGlobalConfig() + if strings.ToLower(cfg.Store) == "tikv" { + etcdAddrs, _, _, err := tikvconfig.ParsePath("tikv://" + cfg.Path) + if err != nil { + return err + } + pdCli, err := pd.NewClient(etcdAddrs, pd.SecurityOption{ + CAPath: cfg.Security.ClusterSSLCA, + CertPath: cfg.Security.ClusterSSLCert, + KeyPath: cfg.Security.ClusterSSLKey, + }, + pd.WithCustomTimeoutOption(time.Duration(cfg.PDClient.PDServerTimeout)*time.Second), + ) + if err != nil { + return err + } + defer pdCli.Close() + + log.Info("start rate limit") + keyspaceMeta, err := pdCli.LoadKeyspace(context.TODO(), cfg.KeyspaceName) + if err != nil { + return err + } + keyspace.Limiter.StartAdjustLimit(etcdAddrs, keyspaceMeta.Id) + } + return nil +} + func registerMetrics() { metrics.RegisterMetrics() if config.GetGlobalConfig().Store == "unistore" {