Skip to content

Commit

Permalink
Merge PR pingcap#130, pingcap#133, pingcap#145 into `keyspace on rele…
Browse files Browse the repository at this point in the history
…ase 6.4` rate limit (pingcap#155)

* merge pr#130,pingcap#133 into 6.4

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* fix start limit rate

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* default config

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* keyspace: use `user_storage_size` instead of `storage_size` for ratelimit (pingcap#145)

* use user_storage_size instead of storage_size for ratelimit

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* update golang.org/x/text to 0.3.8

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* Fix error handle when loadKeyspace failed at startup (pingcap#143)

Signed-off-by: yongman <yming0221@gmail.com>

Signed-off-by: yongman <yming0221@gmail.com>

* add keyspace-activate mode (pingcap#142)

* fix loadkeyspace nil (pingcap#144)

* fix loadkeyspace nil

Signed-off-by: ystaticy <y_static_y@sina.com>

* retry load keyspace

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

* address comments

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

Signed-off-by: ystaticy <y_static_y@sina.com>
Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>
Co-authored-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

* Compatible with old pd

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>
Signed-off-by: yongman <yming0221@gmail.com>
Signed-off-by: ystaticy <y_static_y@sina.com>
Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>
Co-authored-by: yongman <yming0221@gmail.com>
Co-authored-by: better0332 <better0332@163.com>
Co-authored-by: ystaticy <y_static_y@sina.com>
Co-authored-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

* make check

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* fix confict

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* make check

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>
Signed-off-by: yongman <yming0221@gmail.com>
Signed-off-by: ystaticy <y_static_y@sina.com>
Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>
Co-authored-by: yongman <yming0221@gmail.com>
Co-authored-by: better0332 <better0332@163.com>
Co-authored-by: ystaticy <y_static_y@sina.com>
Co-authored-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>
  • Loading branch information
5 people authored Dec 20, 2022
1 parent ca134e7 commit e11f56c
Show file tree
Hide file tree
Showing 13 changed files with 296 additions and 1 deletion.
1 change: 1 addition & 0 deletions br/pkg/conn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//br/pkg/utils",
"//br/pkg/version",
"//domain",
"//keyspace",
"//kv",
"@com_github_docker_go_units//:go-units",
"@com_github_opentracing_opentracing_go//:opentracing-go",
Expand Down
1 change: 1 addition & 0 deletions br/pkg/metautil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/encryptionpb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_zap//:zap",
],
)
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ go_library(
"//ddl",
"//ddl/util",
"//domain",
"//keyspace",
"//kv",
"//meta",
"//parser/model",
Expand Down Expand Up @@ -157,6 +158,7 @@ go_test(
"@com_github_pingcap_kvproto//pkg/encryptionpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_pingcap_kvproto//pkg/recoverdatapb",
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_library(
"//br/pkg/version",
"//config",
"//ddl",
"//keyspace",
"//kv",
"//parser/model",
"//parser/mysql",
Expand All @@ -67,6 +68,7 @@ go_library(
"@com_github_spf13_pflag//:pflag",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@com_google_cloud_go_storage//:storage",
"@io_etcd_go_etcd_client_pkg_v3//transport",
Expand Down
1 change: 1 addition & 0 deletions br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//br/pkg/logutil",
"//br/pkg/metautil",
"//errno",
"//keyspace",
"//kv",
"//parser/model",
"//parser/mysql",
Expand Down
1 change: 1 addition & 0 deletions config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"config.go",
"config_util.go",
"const.go",
"ratelimit.go",
],
importpath = "github.com/pingcap/tidb/config",
visibility = ["//visibility:public"],
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ type Config struct {
TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"`
// TiDBMaxReuseColumn indicates max cached column num
TiDBMaxReuseColumn uint32 `toml:"tidb-max-reuse-column" json:"tidb-max-reuse-column"`

// Ratelimit is used to control the rate limit of the tenant requests.
Ratelimit RatelimitConfig `toml:"ratelimit" json:"ratelimit"`
}

// UpdateTempStoragePath is to update the `TempStoragePath` if port/statusPort was changed
Expand Down Expand Up @@ -997,6 +1000,7 @@ var defaultConf = Config{
TrxSummary: DefaultTrxSummary(),
TiDBMaxReuseChunk: 64,
TiDBMaxReuseColumn: 256,
Ratelimit: defaultRatelimitConfig(),
}

var (
Expand Down
23 changes: 23 additions & 0 deletions config/ratelimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package config

// RatelimitConfig contains ratelimit configuration options.
type RatelimitConfig struct {
FullSpeed int `toml:"full-speed" json:"full-speed"`
FullSpeedCapacity int `toml:"full-speed-capacity" json:"full-speed-capacity"`
LowSpeed int `toml:"low-speed" json:"low-speed"`
LowSpeedCapacity int `toml:"low-speed-capacity" json:"low-speed-capacity"`
LowSpeedWatermark int64 `toml:"low-speed-watermark" json:"low-speed-watermark"`
BlockWriteWatermark int64 `toml:"block-write-watermark" json:"block-write-watermark"`
}

// defaultRatelimitConfig creates a new RatelimitConfig.
func defaultRatelimitConfig() RatelimitConfig {
return RatelimitConfig{
FullSpeed: 1024 * 1024, // 1MiB/s
FullSpeedCapacity: 10 * 1024 * 1024, // 10MiB
LowSpeed: 1024 * 10, // 10KiB/s
LowSpeedCapacity: 1024 * 1024, // 1MiB
LowSpeedWatermark: 1024 * 1024 * 1024, // 1GiB
BlockWriteWatermark: 2 * 1024 * 1024 * 1024, // 2GiB
}
}
9 changes: 8 additions & 1 deletion keyspace/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "keyspace",
srcs = ["keyspace.go"],
srcs = [
"keyspace.go",
"ratelimit.go",
],
importpath = "github.com/pingcap/tidb/keyspace",
visibility = ["//visibility:public"],
deps = [
"//config",
"//kv",
"//util",
"//util/codec",
"//util/logutil",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_zap//:zap",
],
)

Expand Down
157 changes: 157 additions & 0 deletions keyspace/ratelimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package keyspace

import (
"encoding/binary"
"encoding/json"
"fmt"
"net/http"
"net/url"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

type rateLimiter struct {
sync.Mutex
accumulateSpeed int
maxToken int

token int
lastUpdate int64
dataSize int64
}

func (r *rateLimiter) Usage() int64 {
return atomic.LoadInt64(&r.dataSize)
}

func (r *rateLimiter) MaxToken() int {
r.Lock()
defer r.Unlock()
return r.maxToken
}

func (r *rateLimiter) Consume(n int) (bool, time.Duration) {
r.Lock()
defer r.Unlock()

now := time.Now().Unix()
if now > r.lastUpdate {
r.token += int(now-r.lastUpdate) * r.accumulateSpeed
if r.token > r.maxToken {
r.token = r.maxToken
}
r.lastUpdate = now
}

if n <= r.token {
r.token -= n
return true, 0
}
if n > r.maxToken {
return false, 0
}
return false, time.Duration((n-r.token)/r.accumulateSpeed+1) * time.Second
}

func (r *rateLimiter) updateSpeed(dataSize int64) {
r.Lock()
defer r.Unlock()

cfg := config.GetGlobalConfig().Ratelimit

speed, max := cfg.FullSpeed, cfg.FullSpeedCapacity
if dataSize > cfg.LowSpeedWatermark {
speed, max = cfg.LowSpeed, cfg.LowSpeedCapacity
}
if dataSize > cfg.BlockWriteWatermark {
speed, max = 10, cfg.LowSpeedCapacity/2 // set a minimal value, or tidb-server may fail to start.
}

if speed != r.accumulateSpeed || max != r.maxToken {
logutil.BgLogger().Info("update rate limit speed", zap.Int("speed", speed), zap.Int("max", max))
}

r.accumulateSpeed = speed
r.maxToken = max
if r.token > r.maxToken {
r.token = r.maxToken
}
atomic.StoreInt64(&r.dataSize, dataSize)
}

func (r *rateLimiter) StartAdjustLimit(pdAddrs []string, keyspaceID uint32) {
r.adjustLimit(pdAddrs, keyspaceID)
go func() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
<-ticker.C
r.adjustLimit(pdAddrs, keyspaceID)
}
}()
}

// PDRegionStats is the json response from PD.
type PDRegionStats struct {
StorageSize *int64 `json:"storage_size"`
UserStorageSize *int64 `json:"user_storage_size"`
}

func (r *rateLimiter) adjustLimit(pdAddrs []string, keyspaceID uint32) {
start, end := r.keyspaceRange(keyspaceID)
for _, addr := range pdAddrs {
path := fmt.Sprintf("/pd/api/v1/stats/region?start_key=%s&end_key=%s", url.QueryEscape(string(start)), url.QueryEscape(string(end)))
res, err := util.InternalHTTPClient().Get(util.ComposeURL(addr, path))
if err != nil {
logutil.BgLogger().Warn("get region stats failed", zap.String("pd", addr), zap.Error(err))
continue
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
logutil.BgLogger().Warn("get region stats failed", zap.String("pd", addr), zap.Int("status", res.StatusCode))
continue
}
var stats PDRegionStats
err = json.NewDecoder(res.Body).Decode(&stats)
if err != nil {
logutil.BgLogger().Warn("decode region stats failed", zap.String("pd", addr), zap.Error(err))
continue
}

var userStorageSize int64
if stats.UserStorageSize != nil {
userStorageSize = *(stats.UserStorageSize)
} else if stats.StorageSize != nil {
userStorageSize = *(stats.StorageSize)
}
r.updateSpeed(userStorageSize * 1024 * 1024) // storage size unit is MiB.
return
}
logutil.BgLogger().Error("get region stats failed from all PDs")
}

func (r *rateLimiter) keyspaceRange(id uint32) ([]byte, []byte) {
var start, end [4]byte
binary.BigEndian.PutUint32(start[:], id)
binary.BigEndian.PutUint32(end[:], id+1)
start[0], end[0] = 'x', 'x' // we only care about txn data.
if id == 0xffffff {
end[0] = 'x' + 1 // handle overflow for max keyspace id.
}
return codec.EncodeBytes(nil, start[:]), codec.EncodeBytes(nil, end[:])
}

// Limiter is an instance of rateLimiter
var Limiter = &rateLimiter{
accumulateSpeed: 10 * 1024,
maxToken: 1024 * 1024,
token: 1024 * 1024,
lastUpdate: time.Now().Unix(),
}
2 changes: 2 additions & 0 deletions store/driver/txn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ go_library(
importpath = "github.com/pingcap/tidb/store/driver/txn",
visibility = ["//visibility:public"],
deps = [
"//config",
"//keyspace",
"//kv",
"//parser/model",
"//parser/mysql",
Expand Down
60 changes: 60 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ package txn

import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/binloginfo"
Expand Down Expand Up @@ -76,8 +80,64 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
return txn.extractKeyErr(err)
}

type size interface {
int64 | int
}

func toGiB[T size](size T) int {
return int(size / (1024 * 1024 * 1024))
}

func toMiB[T size](size T) int {
return int(size / (1024 * 1024))
}

func toKiB[T size](size T) int {
return int(size / 1024)
}

func humanReadable[T size](size T) string {
if size < 1024 {
return fmt.Sprintf("%d B", size)
} else if size < 1024*1024 {
return fmt.Sprintf("%d KiB", toKiB(size))
} else if size < 1024*1024*1024 {
return fmt.Sprintf("%d MiB", toMiB(size))
} else {
return fmt.Sprintf("%d GiB", toGiB(size))
}
}

func (txn *tikvTxn) Commit(ctx context.Context) error {
err := txn.KVTxn.Commit(ctx)
if keyspace.GetKeyspaceNameBySettings() != "" && txn.GetDiskFullOpt() == kvrpcpb.DiskFullOpt_NotAllowedOnFull {
txnSize := txn.GetUnionStore().GetMemBuffer().Size()
for txnSize > 0 {
ok, wait := keyspace.Limiter.Consume(txnSize)
if ok {
break
}
if wait == time.Duration(0) {
usage := keyspace.Limiter.Usage()
txnLimit := fmt.Sprintf(
"You have reached max transaction limit %s of Serverless Tier cluster",
humanReadable(keyspace.Limiter.MaxToken()))
cfg := config.GetGlobalConfig().Ratelimit
if usage < cfg.LowSpeedWatermark {
return errors.New(txnLimit + ".")
}
if usage < cfg.BlockWriteWatermark {
return errors.New(fmt.Sprintf("%s when total used data %s is more than soft "+
"limit %s at beta stage.", txnLimit, humanReadable(usage), humanReadable(cfg.LowSpeedCapacity)))
}
return errors.New(fmt.Sprintf("%s when total used data %s is more than twice of "+
"soft limit %s at beta stage. Please delete some of your data to reclaim spaces. "+
"Be aware that deleting data is also throttled and can be slow.",
txnLimit, humanReadable(usage), humanReadable(cfg.LowSpeedWatermark)))
}
time.Sleep(wait + time.Millisecond*10)
}
}
return txn.extractKeyErr(err)
}

Expand Down
Loading

0 comments on commit e11f56c

Please sign in to comment.