Skip to content

Commit

Permalink
copr: support load_based_replica_read_threshold (#40742)
Browse files Browse the repository at this point in the history
close #41664
  • Loading branch information
sticnarf authored Mar 3, 2023
1 parent 1d293d8 commit ac6a9eb
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 0 deletions.
1 change: 1 addition & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
builder.RequestSource.RequestSourceType = sv.RequestSourceType
builder.StoreBatchSize = sv.StoreBatchSize
builder.Request.ResourceGroupName = sv.ResourceGroupName
builder.Request.StoreBusyThreshold = sv.LoadBasedReplicaReadThreshold
return builder
}

Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,8 @@ type Request struct {
ResourceGroupName string
// LimitSize indicates whether the request is scan and limit
LimitSize uint64
// StoreBusyThreshold is the threshold for the store to return ServerIsBusy
StoreBusyThreshold time.Duration
}

// CoprRequestAdjuster is used to check and adjust a copr request according to specific rules.
Expand Down
2 changes: 2 additions & 0 deletions kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ const (
TxnSource
// ResourceGroupName set the bind resource group name.
ResourceGroupName
// LoadBasedReplicaReadThreshold sets the TiKV wait duration threshold of enabling replica read automatically.
LoadBasedReplicaReadThreshold
)

// ReplicaReadType is the type of replica to read data from
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1373,6 +1373,10 @@ type SessionVars struct {
// EnableINLJoinInnerMultiPattern indicates whether enable multi pattern for index join inner side
// For now it is not public to user
EnableINLJoinInnerMultiPattern bool

// LoadBasedReplicaReadThreshold is the threshold for the estimated wait duration of a store.
// If exceeding the threshold, try other stores using replica read.
LoadBasedReplicaReadThreshold time.Duration
}

// planReplayerSessionFinishedTaskKeyLen is used to control the max size for the finished plan replayer task key in session
Expand Down
8 changes: 8 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2384,6 +2384,14 @@ var defaultSysVars = []*SysVar{
s.EnablePlanCacheForSubquery = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBLoadBasedReplicaReadThreshold, Value: time.Duration(DefTiDBLoadBasedReplicaReadThreshold).String(), Type: TypeDuration, MaxValue: uint64(time.Hour), SetSession: func(s *SessionVars, val string) error {
d, err := time.ParseDuration(val)
if err != nil {
return err
}
s.LoadBasedReplicaReadThreshold = d
return nil
}},
{Scope: ScopeGlobal, Name: TiDBTTLRunningTasks, Value: strconv.Itoa(DefTiDBTTLRunningTasks), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,9 @@ const (

// TiDBEnablePlanCacheForSubquery controls whether prepare statement with subquery can be cached
TiDBEnablePlanCacheForSubquery = "tidb_enable_plan_cache_for_subquery"

// TiDBLoadBasedReplicaReadThreshold is the wait duration threshold to enable replica read automatically.
TiDBLoadBasedReplicaReadThreshold = "tidb_load_based_replica_read_threshold"
)

// TiDB vars that have only global scope
Expand Down Expand Up @@ -1212,6 +1215,7 @@ const (
DefTiDBPessimisticTransactionAggressiveLocking = false
DefTiDBEnablePlanCacheForParamLimit = true
DefTiDBEnablePlanCacheForSubquery = true
DefTiDBLoadBasedReplicaReadThreshold = 0
)

// Process global variables.
Expand Down
3 changes: 3 additions & 0 deletions sessiontxn/internal/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,8 @@ func GetSnapshotWithTS(s sessionctx.Context, ts uint64, interceptor kv.SnapshotI
if tp := s.GetSessionVars().RequestSourceType; tp != "" {
snap.SetOption(kv.RequestSourceType, tp)
}
if s.GetSessionVars().LoadBasedReplicaReadThreshold > 0 {
snap.SetOption(kv.LoadBasedReplicaReadThreshold, s.GetSessionVars().LoadBasedReplicaReadThreshold)
}
return snap
}
4 changes: 4 additions & 0 deletions sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) {
txn.SetOption(kv.RequestSourceType, tp)
}

if sessVars.LoadBasedReplicaReadThreshold > 0 {
txn.SetOption(kv.LoadBasedReplicaReadThreshold, sessVars.LoadBasedReplicaReadThreshold)
}

p.txn = txn
return txn, nil
}
Expand Down
8 changes: 8 additions & 0 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,13 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch

cacheKey, cacheValue := worker.buildCacheKey(task, &copReq)

// TODO: Load-based replica read is currently not compatible with store batched tasks now.
// The batched tasks should be dispatched to their own followers, but it's not implemented yet.
// So, only enable load-based replica read when there is no batched tasks.
var busyThresholdMs uint32
if len(copReq.Tasks) == 0 {
busyThresholdMs = uint32(worker.req.StoreBusyThreshold.Milliseconds())
}
req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, options.GetTiKVReplicaReadType(worker.req.ReplicaRead), &worker.replicaReadSeed, kvrpcpb.Context{
IsolationLevel: isolationLevelToPB(worker.req.IsolationLevel),
Priority: priorityToPB(worker.req.Priority),
Expand All @@ -1144,6 +1151,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
TaskId: worker.req.TaskID,
RequestSource: task.requestSource.GetRequestSource(),
ResourceGroupName: worker.req.ResourceGroupName,
BusyThresholdMs: busyThresholdMs,
})
if worker.req.ResourceGroupTagger != nil {
worker.req.ResourceGroupTagger(req)
Expand Down
3 changes: 3 additions & 0 deletions store/driver/txn/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package txn

import (
"context"
"time"
"unsafe"

"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -137,6 +138,8 @@ func (s *tikvSnapshot) SetOption(opt int, val interface{}) {
}
case kv.ResourceGroupName:
s.KVSnapshot.SetResourceGroupName(val.(string))
case kv.LoadBasedReplicaReadThreshold:
s.KVSnapshot.SetLoadBasedReplicaReadThreshold(val.(time.Duration))
}
}

Expand Down
3 changes: 3 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
Expand Down Expand Up @@ -276,6 +277,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
txn.KVTxn.SetTxnSource(val.(uint64))
case kv.ResourceGroupName:
txn.KVTxn.SetResourceGroupName(val.(string))
case kv.LoadBasedReplicaReadThreshold:
txn.KVTxn.GetSnapshot().SetLoadBasedReplicaReadThreshold(val.(time.Duration))
}
}

Expand Down

0 comments on commit ac6a9eb

Please sign in to comment.