diff --git a/DEPS.bzl b/DEPS.bzl index 3c199328beec7..b84df42aa6f94 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -5893,13 +5893,13 @@ def go_deps(): name = "com_github_pingcap_tipb", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/tipb", - sha256 = "6e910c9689f1a81bad2ae55be1746d456c317d696ff2687390d3fb30f7d05c6d", - strip_prefix = "github.com/pingcap/tipb@v0.0.0-20240703084358-e46e4632bd2b", + sha256 = "598bb728ddaf83863e7c79c8790cd2ae907c7ae762b13bfcf4164da5b9e2e4dc", + strip_prefix = "github.com/pingcap/tipb@v0.0.0-20240823074000-a40c2347786e", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20240703084358-e46e4632bd2b.zip", - "http://ats.apps.svc/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20240703084358-e46e4632bd2b.zip", - "https://cache.hawkingrei.com/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20240703084358-e46e4632bd2b.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20240703084358-e46e4632bd2b.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20240823074000-a40c2347786e.zip", + "http://ats.apps.svc/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20240823074000-a40c2347786e.zip", + "https://cache.hawkingrei.com/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20240823074000-a40c2347786e.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20240823074000-a40c2347786e.zip", ], ) go_repository( diff --git a/go.mod b/go.mod index 5e92dc5fb10aa..e950405f8352c 100644 --- a/go.mod +++ b/go.mod @@ -89,7 +89,7 @@ require ( github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 github.com/pingcap/tidb/pkg/parser v0.0.0-20211011031125-9b13dc409c5e - github.com/pingcap/tipb v0.0.0-20240703084358-e46e4632bd2b + github.com/pingcap/tipb v0.0.0-20240823074000-a40c2347786e github.com/prometheus/client_golang v1.20.2 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.57.0 diff --git a/go.sum b/go.sum index 392fe58738f34..354d41e345934 100644 --- a/go.sum +++ b/go.sum @@ -676,8 +676,8 @@ github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d h1:y3EueKVfVykdpTyfU github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA= github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 h1:T4pXRhBflzDeAhmOQHNPRRogMYxP13V7BkYw3ZsoSfE= github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5/go.mod h1:rlimy0GcTvjiJqvD5mXTRr8O2eNZPBrcUgiWVYp9530= -github.com/pingcap/tipb v0.0.0-20240703084358-e46e4632bd2b h1:tySAGYw21A3Xa8CcA9jBTfrgAB3+KQWyqyW7fUyokzk= -github.com/pingcap/tipb v0.0.0-20240703084358-e46e4632bd2b/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= +github.com/pingcap/tipb v0.0.0-20240823074000-a40c2347786e h1:chR6iRwU9MUivOYNH+26UqrO8Y7t3ZltX+Jukv+f+iM= +github.com/pingcap/tipb v0.0.0-20240823074000-a40c2347786e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 3c178ab46e003..82bb4bab1817e 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -160,7 +160,6 @@ go_library( "//pkg/util/mock", "//pkg/util/ppcpuusage", "//pkg/util/ranger", - "//pkg/util/resourcegrouptag", "//pkg/util/rowDecoder", "//pkg/util/rowcodec", "//pkg/util/set", diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 9ac1dccf3dd76..30a86c2d00f9e 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -62,7 +62,6 @@ import ( "github.com/pingcap/tidb/pkg/util/gcutil" "github.com/pingcap/tidb/pkg/util/generic" "github.com/pingcap/tidb/pkg/util/intest" - "github.com/tikv/client-go/v2/tikvrpc" clientv3 "go.etcd.io/etcd/client/v3" atomicutil "go.uber.org/atomic" "go.uber.org/zap" @@ -421,7 +420,7 @@ func (dc *ddlCtx) setDDLSourceForDiagnosis(jobID int64, jobType model.ActionType ctx.setDDLLabelForDiagnosis(jobType) } -func (dc *ddlCtx) getResourceGroupTaggerForTopSQL(jobID int64) tikvrpc.ResourceGroupTagger { +func (dc *ddlCtx) getResourceGroupTaggerForTopSQL(jobID int64) *kv.ResourceGroupTagBuilder { dc.jobCtx.Lock() defer dc.jobCtx.Unlock() ctx, exists := dc.jobCtx.jobCtxMap[jobID] diff --git a/pkg/ddl/job_worker.go b/pkg/ddl/job_worker.go index 145b10a1c6240..c6fc8b723bde2 100644 --- a/pkg/ddl/job_worker.go +++ b/pkg/ddl/job_worker.go @@ -44,10 +44,8 @@ import ( tidbutil "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/dbterror" tidblogutil "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/pingcap/tidb/pkg/util/resourcegrouptag" "github.com/pingcap/tidb/pkg/util/topsql" topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state" - "github.com/tikv/client-go/v2/tikvrpc" kvutil "github.com/tikv/client-go/v2/util" atomicutil "go.uber.org/atomic" "go.uber.org/zap" @@ -637,17 +635,13 @@ func (w *worker) checkBeforeCommit() error { return nil } -func (w *ReorgContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger { +func (w *ReorgContext) getResourceGroupTaggerForTopSQL() *kv.ResourceGroupTagBuilder { if !topsqlstate.TopSQLEnabled() || w.cacheDigest == nil { return nil } digest := w.cacheDigest - tagger := func(req *tikvrpc.Request) { - req.ResourceGroupTag = resourcegrouptag.EncodeResourceGroupTag(digest, nil, - resourcegrouptag.GetResourceGroupLabelByKey(resourcegrouptag.GetFirstKeyFromRequest(req))) - } - return tagger + return kv.NewResourceGroupTagBuilder().SetSQLDigest(digest) } func (w *ReorgContext) ddlJobSourceType() string { diff --git a/pkg/distsql/BUILD.bazel b/pkg/distsql/BUILD.bazel index cd8b4ea89ed78..a3be4231a4e8f 100644 --- a/pkg/distsql/BUILD.bazel +++ b/pkg/distsql/BUILD.bazel @@ -44,7 +44,6 @@ go_library( "@com_github_pingcap_tipb//go-tipb", "@com_github_tikv_client_go_v2//metrics", "@com_github_tikv_client_go_v2//tikv", - "@com_github_tikv_client_go_v2//tikvrpc", "@com_github_tikv_client_go_v2//tikvrpc/interceptor", "@com_github_tikv_client_go_v2//util", "@org_golang_google_grpc//metadata", @@ -66,7 +65,7 @@ go_test( embed = [":distsql"], flaky = True, race = "on", - shard_count = 27, + shard_count = 28, deps = [ "//pkg/distsql/context", "//pkg/errctx", @@ -93,6 +92,7 @@ go_test( "//pkg/util/mock", "//pkg/util/paging", "//pkg/util/ranger", + "@com_github_pingcap_kvproto//pkg/coprocessor", "@com_github_pingcap_tipb//go-tipb", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//kv", diff --git a/pkg/distsql/context/BUILD.bazel b/pkg/distsql/context/BUILD.bazel index 1ec9bbf453197..635681bb46e71 100644 --- a/pkg/distsql/context/BUILD.bazel +++ b/pkg/distsql/context/BUILD.bazel @@ -18,7 +18,6 @@ go_library( "//pkg/util/tiflash", "//pkg/util/topsql/stmtstats", "@com_github_tikv_client_go_v2//kv", - "@com_github_tikv_client_go_v2//tikvrpc", ], ) diff --git a/pkg/distsql/context/context.go b/pkg/distsql/context/context.go index 11fc041003c2a..9612a30f1048f 100644 --- a/pkg/distsql/context/context.go +++ b/pkg/distsql/context/context.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tidb/pkg/util/tiflash" "github.com/pingcap/tidb/pkg/util/topsql/stmtstats" tikvstore "github.com/tikv/client-go/v2/kv" - "github.com/tikv/client-go/v2/tikvrpc" ) // DistSQLContext provides all information needed by using functions in `distsql` @@ -68,7 +67,7 @@ type DistSQLContext struct { NotFillCache bool TaskID uint64 Priority mysql.PriorityEnum - ResourceGroupTagger tikvrpc.ResourceGroupTagger + ResourceGroupTagger *kv.ResourceGroupTagBuilder EnablePaging bool MinPagingSize int MaxPagingSize int diff --git a/pkg/distsql/request_builder.go b/pkg/distsql/request_builder.go index 9e6bfe16af4b0..313dbdd79c44d 100644 --- a/pkg/distsql/request_builder.go +++ b/pkg/distsql/request_builder.go @@ -38,7 +38,6 @@ import ( "github.com/pingcap/tidb/pkg/util/memory" "github.com/pingcap/tidb/pkg/util/ranger" "github.com/pingcap/tipb/go-tipb" - "github.com/tikv/client-go/v2/tikvrpc" ) // RequestBuilder is used to build a "kv.Request". @@ -374,7 +373,7 @@ func (builder *RequestBuilder) SetFromInfoSchema(is infoschema.MetaOnlyInfoSchem } // SetResourceGroupTagger sets the request resource group tagger. -func (builder *RequestBuilder) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) *RequestBuilder { +func (builder *RequestBuilder) SetResourceGroupTagger(tagger *kv.ResourceGroupTagBuilder) *RequestBuilder { builder.Request.ResourceGroupTagger = tagger return builder } diff --git a/pkg/distsql/request_builder_test.go b/pkg/distsql/request_builder_test.go index 7641f7269d18f..93726a0c5be52 100644 --- a/pkg/distsql/request_builder_test.go +++ b/pkg/distsql/request_builder_test.go @@ -18,7 +18,9 @@ import ( "math" "testing" "time" + "unsafe" + "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/resourcegroup" @@ -32,6 +34,8 @@ import ( "github.com/pingcap/tidb/pkg/util/ranger" "github.com/pingcap/tipb/go-tipb" "github.com/stretchr/testify/require" + clikv "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/tikvrpc" ) type handleRange struct { @@ -852,3 +856,42 @@ func TestBuildTableRangeCommonHandle(t *testing.T) { {StartKey: tablecodec.EncodeRowKey(7, low), EndKey: tablecodec.EncodeRowKey(7, high)}, }, ranges) } + +func TestRequestBuilderHandle(t *testing.T) { + handles := []kv.Handle{kv.IntHandle(0), kv.IntHandle(2), kv.IntHandle(3), kv.IntHandle(4), + kv.IntHandle(5), kv.IntHandle(10), kv.IntHandle(11), kv.IntHandle(100)} + + resourceTagBuilder := kv.NewResourceGroupTagBuilder() + tableID := int64(15) + actual, err := (&RequestBuilder{}).SetTableHandles(tableID, handles). + SetDAGRequest(&tipb.DAGRequest{}). + SetDesc(false). + SetKeepOrder(false). + SetFromSessionVars(DefaultDistSQLContext). + SetResourceGroupTagger(resourceTagBuilder). + Build() + require.NoError(t, err) + ranges := make([]*coprocessor.KeyRange, 0, actual.KeyRanges.TotalRangeNum()) + actual.KeyRanges.ForEachPartition( + func(innerRanges []kv.KeyRange) { + for _, ran := range innerRanges { + ranges = append(ranges, (*coprocessor.KeyRange)(unsafe.Pointer(&ran))) + } + }) + + copReq := coprocessor.Request{ + Tp: actual.Tp, + StartTs: actual.StartTs, + Data: actual.Data, + Ranges: ranges, + } + var seed uint32 + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdCop, &copReq, clikv.ReplicaReadLeader, &seed) + actual.ResourceGroupTagger.Build(req) + + // the request should have the resource group tag, and the tag should contain the table id + tag := &tipb.ResourceGroupTag{} + err = tag.Unmarshal(req.ResourceGroupTag) + require.NoError(t, err) + require.Equal(t, tag.TableId, tableID) +} diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 54ce540fbabf2..ede5b300afcdb 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -83,7 +83,6 @@ import ( "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/logutil/consistency" "github.com/pingcap/tidb/pkg/util/memory" - "github.com/pingcap/tidb/pkg/util/resourcegrouptag" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/pingcap/tidb/pkg/util/syncutil" "github.com/pingcap/tidb/pkg/util/topsql" @@ -1223,13 +1222,16 @@ func newLockCtx(sctx sessionctx.Context, lockWaitTime int64, numKeys int) (*tikv return nil } if mutation := req.Mutations[0]; mutation != nil { - label := resourcegrouptag.GetResourceGroupLabelByKey(mutation.Key) normalized, digest := seVars.StmtCtx.SQLDigest() if len(normalized) == 0 { return nil } _, planDigest := seVars.StmtCtx.GetPlanDigest() - return resourcegrouptag.EncodeResourceGroupTag(digest, planDigest, label) + + return kv.NewResourceGroupTagBuilder(). + SetPlanDigest(planDigest). + SetSQLDigest(digest). + EncodeTagWithKey(mutation.Key) } return nil } diff --git a/pkg/executor/resource_tag_test.go b/pkg/executor/resource_tag_test.go index a65a096afdb4e..f3afb431b548d 100644 --- a/pkg/executor/resource_tag_test.go +++ b/pkg/executor/resource_tag_test.go @@ -57,7 +57,7 @@ func TestResourceGroupTag(t *testing.T) { var sqlDigest, planDigest *parser.Digest var tagLabel tipb.ResourceGroupTagLabel - checkFn := func() {} + checkFn := func(int64) {} unistoreRPCClientSendHook := func(req *tikvrpc.Request) { var startKey []byte var ctx *kvrpcpb.Context @@ -99,8 +99,10 @@ func TestResourceGroupTag(t *testing.T) { require.NoError(t, err) sqlDigest = parser.NewDigest(tag.SqlDigest) planDigest = parser.NewDigest(tag.PlanDigest) - tagLabel = *tag.Label - checkFn() + if tag.Label != nil { + tagLabel = *tag.Label + } + checkFn(tag.TableId) } unistore.UnistoreRPCClientSendHook.Store(&unistoreRPCClientSendHook) @@ -187,10 +189,11 @@ func TestResourceGroupTag(t *testing.T) { _, expectSQLDigest := parser.NormalizeDigest(ca.sql) var expectPlanDigest *parser.Digest checkCnt := 0 - checkFn = func() { + checkFn = func(tid int64) { if ca.ignore { return } + require.Equal(t, tbInfo.Meta().ID, tid) if expectPlanDigest == nil { info := tk.Session().ShowProcess() require.NotNil(t, info) @@ -208,7 +211,6 @@ func TestResourceGroupTag(t *testing.T) { require.True(t, ok) checkCnt++ } - if strings.HasPrefix(ca.sql, "select") { tk.MustQuery(ca.sql) } else { diff --git a/pkg/kv/BUILD.bazel b/pkg/kv/BUILD.bazel index c7955b8c5358a..196b70fc3203b 100644 --- a/pkg/kv/BUILD.bazel +++ b/pkg/kv/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//pkg/config", "//pkg/errno", "//pkg/meta/model", + "//pkg/parser", "//pkg/parser/mysql", "//pkg/parser/terror", "//pkg/resourcegroup", @@ -34,6 +35,7 @@ go_library( "//pkg/util/intest", "//pkg/util/logutil", "//pkg/util/memory", + "//pkg/util/resourcegrouptag", "//pkg/util/set", "//pkg/util/size", "//pkg/util/tiflash", diff --git a/pkg/kv/kv.go b/pkg/kv/kv.go index 184d8b1eac8a0..b2ad11cb33dec 100644 --- a/pkg/kv/kv.go +++ b/pkg/kv/kv.go @@ -27,10 +27,13 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/resourcegroup" "github.com/pingcap/tidb/pkg/util/memory" + "github.com/pingcap/tidb/pkg/util/resourcegrouptag" "github.com/pingcap/tidb/pkg/util/tiflash" "github.com/pingcap/tidb/pkg/util/trxevents" + "github.com/pingcap/tipb/go-tipb" tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" @@ -576,7 +579,7 @@ type Request struct { // MatchStoreLabels indicates the labels the store should be matched MatchStoreLabels []*metapb.StoreLabel // ResourceGroupTagger indicates the kv request task group tagger. - ResourceGroupTagger tikvrpc.ResourceGroupTagger + ResourceGroupTagger *ResourceGroupTagBuilder // Paging indicates whether the request is a paging request. Paging struct { Enable bool @@ -766,3 +769,76 @@ const ( // RCCheckTS stands for 'read consistency read with ts check'. RCCheckTS ) + +// ResourceGroupTagBuilder is used to build the resource group tag for a kv request. +type ResourceGroupTagBuilder struct { + sqlDigest *parser.Digest + planDigest *parser.Digest + accessKey []byte +} + +// NewResourceGroupTagBuilder creates a new ResourceGroupTagBuilder. +func NewResourceGroupTagBuilder() *ResourceGroupTagBuilder { + return &ResourceGroupTagBuilder{} +} + +// SetSQLDigest sets the sql digest for the request. +func (b *ResourceGroupTagBuilder) SetSQLDigest(digest *parser.Digest) *ResourceGroupTagBuilder { + b.sqlDigest = digest + return b +} + +// SetPlanDigest sets the plan digest for the request. +func (b *ResourceGroupTagBuilder) SetPlanDigest(digest *parser.Digest) *ResourceGroupTagBuilder { + b.planDigest = digest + return b +} + +// BuildProtoTagger sets the access key for the request. +func (b *ResourceGroupTagBuilder) BuildProtoTagger() tikvrpc.ResourceGroupTagger { + return func(req *tikvrpc.Request) { + b.Build(req) + } +} + +// EncodeTagWithKey encodes the resource group tag, returns the encoded bytes. +func (b *ResourceGroupTagBuilder) EncodeTagWithKey(key []byte) []byte { + tag := &tipb.ResourceGroupTag{} + if b.sqlDigest != nil { + tag.SqlDigest = b.sqlDigest.Bytes() + } + if b.planDigest != nil { + tag.PlanDigest = b.planDigest.Bytes() + } + if len(key) > 0 { + tag.TableId = decodeTableID(key) + label := resourcegrouptag.GetResourceGroupLabelByKey(key) + tag.Label = &label + } + tagEncoded, err := tag.Marshal() + if err != nil { + return nil + } + return tagEncoded +} + +// Build builds the resource group tag for the request. +func (b *ResourceGroupTagBuilder) Build(req *tikvrpc.Request) { + if req == nil { + return + } + if encodedBytes := b.EncodeTagWithKey(resourcegrouptag.GetFirstKeyFromRequest(req)); len(encodedBytes) > 0 { + req.ResourceGroupTag = encodedBytes + } +} + +// DecodeTableIDFunc is used to decode table id from key. +var DecodeTableIDFunc func(Key) int64 + +// avoid import cycle, not import tablecodec in kv package. +func decodeTableID(key Key) int64 { + if DecodeTableIDFunc != nil { + return DecodeTableIDFunc(key) + } + return 0 +} diff --git a/pkg/sessionctx/stmtctx/BUILD.bazel b/pkg/sessionctx/stmtctx/BUILD.bazel index e61dd9b4a7a2b..0d7a0ee29b39c 100644 --- a/pkg/sessionctx/stmtctx/BUILD.bazel +++ b/pkg/sessionctx/stmtctx/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//pkg/distsql/context", "//pkg/errctx", + "//pkg/kv", "//pkg/meta/model", "//pkg/parser", "//pkg/parser/mysql", @@ -25,10 +26,8 @@ go_library( "//pkg/util/linter/constructor", "//pkg/util/memory", "//pkg/util/nocopy", - "//pkg/util/resourcegrouptag", "//pkg/util/topsql/stmtstats", "//pkg/util/tracing", - "@com_github_tikv_client_go_v2//tikvrpc", "@org_golang_x_exp//maps", "@org_golang_x_sync//singleflight", "@org_uber_go_atomic//:atomic", diff --git a/pkg/sessionctx/stmtctx/stmtctx.go b/pkg/sessionctx/stmtctx/stmtctx.go index 83376dcdf6def..92f9302f40c9c 100644 --- a/pkg/sessionctx/stmtctx/stmtctx.go +++ b/pkg/sessionctx/stmtctx/stmtctx.go @@ -27,6 +27,7 @@ import ( distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" "github.com/pingcap/tidb/pkg/errctx" + "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/mysql" @@ -44,10 +45,8 @@ import ( "github.com/pingcap/tidb/pkg/util/linter/constructor" "github.com/pingcap/tidb/pkg/util/memory" "github.com/pingcap/tidb/pkg/util/nocopy" - "github.com/pingcap/tidb/pkg/util/resourcegrouptag" "github.com/pingcap/tidb/pkg/util/topsql/stmtstats" "github.com/pingcap/tidb/pkg/util/tracing" - "github.com/tikv/client-go/v2/tikvrpc" atomic2 "go.uber.org/atomic" "golang.org/x/exp/maps" "golang.org/x/sync/singleflight" @@ -686,20 +685,14 @@ func (sc *StatementContext) SetBinaryPlan(binaryPlan string) { sc.binaryPlan = binaryPlan } -// GetResourceGroupTagger returns the implementation of tikvrpc.ResourceGroupTagger related to self. -func (sc *StatementContext) GetResourceGroupTagger() tikvrpc.ResourceGroupTagger { +// GetResourceGroupTagger returns the implementation of kv.ResourceGroupTagBuilder related to self. +func (sc *StatementContext) GetResourceGroupTagger() *kv.ResourceGroupTagBuilder { + tagger := kv.NewResourceGroupTagBuilder().SetPlanDigest(sc.planDigest) normalized, digest := sc.SQLDigest() - planDigest := sc.planDigest - return func(req *tikvrpc.Request) { - if req == nil { - return - } - if len(normalized) == 0 { - return - } - req.ResourceGroupTag = resourcegrouptag.EncodeResourceGroupTag(digest, planDigest, - resourcegrouptag.GetResourceGroupLabelByKey(resourcegrouptag.GetFirstKeyFromRequest(req))) + if len(normalized) > 0 { + tagger.SetSQLDigest(digest) } + return tagger } // SetUseChunkAlloc set use chunk alloc status diff --git a/pkg/store/copr/batch_coprocessor.go b/pkg/store/copr/batch_coprocessor.go index 4c2849395c398..5a1847e99ae86 100644 --- a/pkg/store/copr/batch_coprocessor.go +++ b/pkg/store/copr/batch_coprocessor.go @@ -1353,7 +1353,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backo }, }) if b.req.ResourceGroupTagger != nil { - b.req.ResourceGroupTagger(req) + b.req.ResourceGroupTagger.Build(req) } req.StoreTp = getEndPointType(kv.TiFlash) diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go index 8e0c81fba348d..8353902a76439 100644 --- a/pkg/store/copr/coprocessor.go +++ b/pkg/store/copr/coprocessor.go @@ -1223,7 +1223,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch req.IsRetryRequest = true } if worker.req.ResourceGroupTagger != nil { - worker.req.ResourceGroupTagger(req) + worker.req.ResourceGroupTagger.Build(req) } timeout := config.GetGlobalConfig().TiKVClient.CoprReqTimeout if task.tikvClientReadTimeout > 0 { diff --git a/pkg/store/driver/txn/snapshot.go b/pkg/store/driver/txn/snapshot.go index 5c1edee5362b9..d1494cabc35de 100644 --- a/pkg/store/driver/txn/snapshot.go +++ b/pkg/store/driver/txn/snapshot.go @@ -118,7 +118,12 @@ func (s *tikvSnapshot) SetOption(opt int, val any) { case kv.ResourceGroupTag: s.KVSnapshot.SetResourceGroupTag(val.([]byte)) case kv.ResourceGroupTagger: - s.KVSnapshot.SetResourceGroupTagger(val.(tikvrpc.ResourceGroupTagger)) + switch tagger := val.(type) { + case tikvrpc.ResourceGroupTagger: + s.KVSnapshot.SetResourceGroupTagger(tagger) + case *kv.ResourceGroupTagBuilder: + s.KVSnapshot.SetResourceGroupTagger(tagger.BuildProtoTagger()) + } case kv.ReadReplicaScope: s.KVSnapshot.SetReadReplicaScope(val.(string)) case kv.SnapInterceptor: diff --git a/pkg/store/driver/txn/txn_driver.go b/pkg/store/driver/txn/txn_driver.go index c0074c728294a..5ff918e365881 100644 --- a/pkg/store/driver/txn/txn_driver.go +++ b/pkg/store/driver/txn/txn_driver.go @@ -269,7 +269,12 @@ func (txn *tikvTxn) SetOption(opt int, val any) { case kv.ResourceGroupTag: txn.KVTxn.SetResourceGroupTag(val.([]byte)) case kv.ResourceGroupTagger: - txn.KVTxn.SetResourceGroupTagger(val.(tikvrpc.ResourceGroupTagger)) + switch tagger := val.(type) { + case tikvrpc.ResourceGroupTagger: + txn.KVTxn.SetResourceGroupTagger(tagger) + case *kv.ResourceGroupTagBuilder: + txn.KVTxn.SetResourceGroupTagger(tagger.BuildProtoTagger()) + } case kv.KVFilter: txn.KVTxn.SetKVFilter(val.(tikv.KVFilter)) case kv.SnapInterceptor: diff --git a/pkg/tablecodec/tablecodec.go b/pkg/tablecodec/tablecodec.go index ff7819b6f9e01..ffbfe30dc62f0 100644 --- a/pkg/tablecodec/tablecodec.go +++ b/pkg/tablecodec/tablecodec.go @@ -78,6 +78,17 @@ const ( // TableSplitKeyLen is the length of key 't{table_id}' which is used for table split. const TableSplitKeyLen = 1 + idLen +func init() { + // help kv package to refer the tablecodec package to resolve the kv.Key functions. + kv.DecodeTableIDFunc = func(key kv.Key) int64 { + //preCheck, avoid the noise error log. + if hasTablePrefix(key) && len(key) >= TableSplitKeyLen { + return DecodeTableID(key) + } + return 0 + } +} + // TablePrefix returns table's prefix 't'. func TablePrefix() []byte { return tablePrefix diff --git a/pkg/util/resourcegrouptag/resource_group_tag.go b/pkg/util/resourcegrouptag/resource_group_tag.go index fba6a5fdfc66c..1a2ca9b2ff789 100644 --- a/pkg/util/resourcegrouptag/resource_group_tag.go +++ b/pkg/util/resourcegrouptag/resource_group_tag.go @@ -125,6 +125,12 @@ func GetFirstKeyFromRequest(req *tikvrpc.Request) (firstKey []byte) { } } } + case *kvrpcpb.PessimisticLockRequest: + r := req.Req.(*kvrpcpb.PessimisticLockRequest) + if len(r.Mutations) == 0 { + return nil + } + return r.Mutations[0].Key } return } diff --git a/pkg/util/resourcegrouptag/resource_group_tag_test.go b/pkg/util/resourcegrouptag/resource_group_tag_test.go index 8c154ae0de6d5..cfcaa3cd1d58e 100644 --- a/pkg/util/resourcegrouptag/resource_group_tag_test.go +++ b/pkg/util/resourcegrouptag/resource_group_tag_test.go @@ -31,7 +31,7 @@ import ( func TestResourceGroupTagEncoding(t *testing.T) { sqlDigest := parser.NewDigest(nil) tag := EncodeResourceGroupTag(sqlDigest, nil, tipb.ResourceGroupTagLabel_ResourceGroupTagLabelUnknown) - require.Len(t, tag, 2) + require.Len(t, tag, 4) decodedSQLDigest, err := DecodeResourceGroupTag(tag) require.NoError(t, err) @@ -40,7 +40,7 @@ func TestResourceGroupTagEncoding(t *testing.T) { sqlDigest = parser.NewDigest([]byte{'a', 'a'}) tag = EncodeResourceGroupTag(sqlDigest, nil, tipb.ResourceGroupTagLabel_ResourceGroupTagLabelUnknown) // version(1) + prefix(1) + length(1) + content(2hex -> 1byte) + label(2) - require.Len(t, tag, 6) + require.Len(t, tag, 8) decodedSQLDigest, err = DecodeResourceGroupTag(tag) require.NoError(t, err) @@ -69,7 +69,7 @@ func TestResourceGroupTagEncodingPB(t *testing.T) { } buf, err := resourceTag.Marshal() require.NoError(t, err) - require.Len(t, buf, 68) + require.Len(t, buf, 70) tag := &tipb.ResourceGroupTag{} err = tag.Unmarshal(buf) @@ -83,7 +83,7 @@ func TestResourceGroupTagEncodingPB(t *testing.T) { } buf, err = resourceTag.Marshal() require.NoError(t, err) - require.Len(t, buf, 34) + require.Len(t, buf, 36) tag = &tipb.ResourceGroupTag{} err = tag.Unmarshal(buf)