Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/tikv/client-go/v2

go 1.23.0
go 1.23.12

toolchain go1.24.1

require (
github.com/VividCortex/ewma v1.2.0
Expand All @@ -22,7 +24,7 @@ require (
github.com/prometheus/client_model v0.6.1
github.com/stretchr/testify v1.9.0
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a
github.com/tikv/pd/client v0.0.0-20250625073039-fb496b371ff3
github.com/tikv/pd/client v0.0.0-20251110024032-264a65392018
github.com/twmb/murmur3 v1.1.3
go.etcd.io/etcd/api/v3 v3.5.10
go.etcd.io/etcd/client/v3 v3.5.10
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/pd/client v0.0.0-20250625073039-fb496b371ff3 h1:lAuYtEVeJQ+FaD1WrYEJhnMHl5YeREq39QSqCXGDk6s=
github.com/tikv/pd/client v0.0.0-20250625073039-fb496b371ff3/go.mod h1:SicyvcZE0fzrGGWW3AEtZWWPRzGw/h5img4/6qiSYws=
github.com/tikv/pd/client v0.0.0-20251110024032-264a65392018 h1:7VY8pOJikojUzh/ducMlhAhkn8wPvsEf1/uoUgK85Pc=
github.com/tikv/pd/client v0.0.0-20251110024032-264a65392018/go.mod h1:GYbMoLE5+FyMQ5f/1i6jsR7L6mLVkCLeldone1xxj1w=
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
12 changes: 12 additions & 0 deletions integration_tests/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ func (c *hookedGCStatesClient) DeleteGCBarrier(ctx context.Context, barrierID st
return c.inner.DeleteGCBarrier(ctx, barrierID)
}

func (c *hookedGCStatesClient) SetGlobalGCBarrier(ctx context.Context, barrierID string, barrierTS uint64, ttl time.Duration) (*pdgc.GlobalGCBarrierInfo, error) {
return c.inner.SetGlobalGCBarrier(ctx, barrierID, barrierTS, ttl)
}

func (c *hookedGCStatesClient) DeleteGlobalGCBarrier(ctx context.Context, barrierID string) (*pdgc.GlobalGCBarrierInfo, error) {
return c.inner.DeleteGlobalGCBarrier(ctx, barrierID)
}

func (c *hookedGCStatesClient) GetAllKeyspacesGCStates(ctx context.Context) (pdgc.ClusterGCStates, error) {
return c.inner.GetAllKeyspacesGCStates(ctx)
}

func (c *hookedGCStatesClient) GetGCState(ctx context.Context) (pdgc.GCState, error) {
if c.getGCStatesHook != nil {
return c.getGCStatesHook(c.inner, ctx)
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/stretchr/testify v1.10.0
github.com/tidwall/gjson v1.14.4
github.com/tikv/client-go/v2 v2.0.8-0.20250528090949-e84f1a780fa6
github.com/tikv/pd/client v0.0.0-20250625073039-fb496b371ff3
github.com/tikv/pd/client v0.0.0-20251110024032-264a65392018
go.uber.org/goleak v1.3.0
google.golang.org/grpc v1.63.2
)
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1486,8 +1486,8 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tikv/pd/client v0.0.0-20250625073039-fb496b371ff3 h1:lAuYtEVeJQ+FaD1WrYEJhnMHl5YeREq39QSqCXGDk6s=
github.com/tikv/pd/client v0.0.0-20250625073039-fb496b371ff3/go.mod h1:SicyvcZE0fzrGGWW3AEtZWWPRzGw/h5img4/6qiSYws=
github.com/tikv/pd/client v0.0.0-20251110024032-264a65392018 h1:7VY8pOJikojUzh/ducMlhAhkn8wPvsEf1/uoUgK85Pc=
github.com/tikv/pd/client v0.0.0-20251110024032-264a65392018/go.mod h1:GYbMoLE5+FyMQ5f/1i6jsR7L6mLVkCLeldone1xxj1w=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
Expand Down
8 changes: 7 additions & 1 deletion internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -1762,7 +1762,8 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, r
metrics.TiKVRPCErrorCounter.WithLabelValues("unknown", storeLabel).Inc()
}

// don't need to retry for ResourceGroup error
// don't need to retry for ResourceGroup error when specific errors are returned from server.
// when other error is returned from server (not leader etc.), back off and retry.
if errors.Is(err, pderr.ErrClientResourceGroupThrottled) {
return err
}
Expand All @@ -1771,6 +1772,11 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, r
}
var errGetResourceGroup *pderr.ErrClientGetResourceGroup
if errors.As(err, &errGetResourceGroup) {
if strings.Contains(errGetResourceGroup.Cause, "does not exist") {
return err
}
// back off and retry
err = bo.Backoff(retry.BoPDRPC, errors.Errorf("send pd request error:%v, ctx: %v, try next peer later", err, ctx))
return err
}

Expand Down
54 changes: 54 additions & 0 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,60 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailByResourceGroupThrot
s.Run("AsyncAPI", test)
}

func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailByResourceGroupError() {
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Value: []byte("value"),
})
region, err := s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)

// test ErrClientGetResourceGroup with "resource group does not exist" cause should not retry
func() {
oc := s.regionRequestSender.client
defer func() {
s.regionRequestSender.client = oc
}()
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
return nil, &pderr.ErrClientGetResourceGroup{Cause: "resource group does not exist"}
}}
bo := retry.NewBackofferWithVars(context.Background(), 5, nil)
_, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
s.NotNil(err)
var errGetResourceGroup *pderr.ErrClientGetResourceGroup
s.True(errors.As(err, &errGetResourceGroup))
s.Equal("resource group does not exist", errGetResourceGroup.Cause)
// should not have backoff since error is directly returned
s.Equal(0, bo.GetTotalBackoffTimes())
}()

// test ErrClientGetResourceGroup with other causes should retry with backoff
func() {
oc := s.regionRequestSender.client
defer func() {
s.regionRequestSender.client = oc
}()
callCount := 0
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
callCount++
if callCount < 2 {
return nil, &pderr.ErrClientGetResourceGroup{Cause: "not leader"}
}
// return success after first retry
return &tikvrpc.Response{Resp: &kvrpcpb.RawPutResponse{}}, nil
}}
bo := retry.NewBackofferWithVars(context.Background(), 2000, nil)
resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp)
// should have backoff since error triggered retry
s.Greater(bo.GetTotalBackoffTimes(), 0)
// should have retried at least once
s.GreaterOrEqual(callCount, 2)
}()
}

func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestart() {
s.testOnSendFailedWithStoreRestart()
}
Expand Down
108 changes: 104 additions & 4 deletions internal/mockstore/mocktikv/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ type pdClient struct {
txnSafePoint uint64
// Represents the GC barriers for blocking GC from advancing.
gcBarriers map[string]uint64
// Represents the global GC barriers that block GC across all keyspaces.
globalGCBarriers map[string]uint64
// As there are still usages of SafePointKV, the txn safe point will still be put in the SavePointKV.
gcStatesMu sync.Mutex

Expand All @@ -108,9 +110,10 @@ type pdClient struct {
// from a Cluster.
func NewPDClient(cluster *Cluster, ops ...MockPDOption) *pdClient {
mockCli := &pdClient{
cluster: cluster,
gcBarriers: make(map[string]uint64),
groups: make(map[string]*rmpb.ResourceGroup),
cluster: cluster,
gcBarriers: make(map[string]uint64),
globalGCBarriers: make(map[string]uint64),
groups: make(map[string]*rmpb.ResourceGroup),
}

mockCli.groups[defaultResourceGroupName] = &rmpb.ResourceGroup{
Expand Down Expand Up @@ -527,19 +530,35 @@ func (c gcInternalController) AdvanceTxnSafePoint(ctx context.Context, target ui

minGCBarrierName := ""
var minGCBarrierTS uint64 = 0
isGlobalBarrier := false
for name, ts := range c.inner.gcBarriers {
if ts == 0 {
panic("found 0 in barrier ts of GC barriers")
}
if ts < minGCBarrierTS || minGCBarrierTS == 0 {
minGCBarrierName = name
minGCBarrierTS = ts
isGlobalBarrier = false
}
}
for name, ts := range c.inner.globalGCBarriers {
if ts == 0 {
panic("found 0 in barrier ts of global GC barriers")
}
if ts < minGCBarrierTS || minGCBarrierTS == 0 {
minGCBarrierName = name
minGCBarrierTS = ts
isGlobalBarrier = true
}
}

if minGCBarrierTS != 0 && minGCBarrierTS < res.NewTxnSafePoint {
res.NewTxnSafePoint = minGCBarrierTS
res.BlockerDescription = fmt.Sprintf("GCBarrier { BarrierID: %+q, BarrierTS: %d, ExpirationTime: <nil> }", minGCBarrierName, res.NewTxnSafePoint)
barrierType := "GCBarrier"
if isGlobalBarrier {
barrierType = "GlobalGCBarrier"
}
res.BlockerDescription = fmt.Sprintf("%s { BarrierID: %+q, BarrierTS: %d, ExpirationTime: <nil> }", barrierType, minGCBarrierName, res.NewTxnSafePoint)
logutil.Logger(ctx).Info("txn safe point blocked",
zap.Uint64("oldTxnSafePoint", res.OldTxnSafePoint), zap.Uint64("newTxnSafePoint", res.NewTxnSafePoint),
zap.String("blocker", res.BlockerDescription))
Expand Down Expand Up @@ -638,6 +657,87 @@ func (c gcStatesClient) DeleteGCBarrier(ctx context.Context, barrierID string) (
return pdgc.NewGCBarrierInfo(barrierID, barrierTS, pdgc.TTLNeverExpire, startTime), nil
}

func (c gcStatesClient) SetGlobalGCBarrier(ctx context.Context, barrierID string, barrierTS uint64, ttl time.Duration) (*pdgc.GlobalGCBarrierInfo, error) {
if c.keyspaceID != constants.NullKeyspaceID {
panic("unimplemented")
}

startTime := time.Now()

c.inner.gcStatesMu.Lock()
defer c.inner.gcStatesMu.Unlock()

if barrierTS == 0 || barrierID == "" || ttl <= 0 {
return nil, errors.New("invalid arguments")
}

if barrierTS < c.inner.txnSafePoint {
return nil, errors.Errorf("trying to set a global GC barrier on ts %d which is already behind the txn safe point %d", barrierTS, c.inner.txnSafePoint)
}

ttlToUse := ttl
if ttlToUse == 0 || ttlToUse == pdgc.TTLNeverExpire {
ttlToUse = pdgc.TTLNeverExpire
}

c.inner.globalGCBarriers[barrierID] = barrierTS
return pdgc.NewGlobalGCBarrierInfo(barrierID, barrierTS, ttlToUse, startTime), nil
}

func (c gcStatesClient) DeleteGlobalGCBarrier(ctx context.Context, barrierID string) (*pdgc.GlobalGCBarrierInfo, error) {
if c.keyspaceID != constants.NullKeyspaceID {
panic("unimplemented")
}

startTime := time.Now()

c.inner.gcStatesMu.Lock()
defer c.inner.gcStatesMu.Unlock()

barrierTS, exists := c.inner.globalGCBarriers[barrierID]
if !exists {
return nil, nil
}

delete(c.inner.globalGCBarriers, barrierID)
return pdgc.NewGlobalGCBarrierInfo(barrierID, barrierTS, pdgc.TTLNeverExpire, startTime), nil
}

func (c gcStatesClient) GetAllKeyspacesGCStates(ctx context.Context) (pdgc.ClusterGCStates, error) {
if c.keyspaceID != constants.NullKeyspaceID {
panic("unimplemented")
}

startTime := time.Now()

c.inner.gcStatesMu.Lock()
defer c.inner.gcStatesMu.Unlock()

gcStates := make(map[uint32]pdgc.GCState, 1)

localBarriers := make([]*pdgc.GCBarrierInfo, 0, len(c.inner.gcBarriers))
for barrierID, barrierTS := range c.inner.gcBarriers {
localBarriers = append(localBarriers, pdgc.NewGCBarrierInfo(barrierID, barrierTS, pdgc.TTLNeverExpire, startTime))
}

gcStates[constants.NullKeyspaceID] = pdgc.GCState{
KeyspaceID: constants.NullKeyspaceID,
TxnSafePoint: c.inner.txnSafePoint,
GCSafePoint: c.inner.gcSafePoint,
GCBarriers: localBarriers,
}

globalBarriers := make([]*pdgc.GlobalGCBarrierInfo, 0, len(c.inner.globalGCBarriers))
for barrierID, barrierTS := range c.inner.globalGCBarriers {
globalBarriers = append(globalBarriers, pdgc.NewGlobalGCBarrierInfo(barrierID, barrierTS, pdgc.TTLNeverExpire, startTime))
}

return pdgc.ClusterGCStates{
GCStates: gcStates,
GlobalGCBarriers: globalBarriers,
}, nil
}

func (c gcStatesClient) GetGCState(ctx context.Context) (pdgc.GCState, error) {
if c.keyspaceID != constants.NullKeyspaceID {
panic("unimplemented")
Expand Down
Loading