Skip to content

Commit

Permalink
session: support leader-and-follower for tidb_replica_read (#14761) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu authored Mar 26, 2020
1 parent 046d35d commit 8c1cde1
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 3 deletions.
7 changes: 4 additions & 3 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,14 @@ const (
ReplicaReadLeader ReplicaReadType = 1 << iota
// ReplicaReadFollower stands for 'read from follower'.
ReplicaReadFollower
// ReplicaReadLearner stands for 'read from learner'.
ReplicaReadLearner
// ReplicaReadMixed stands for 'read from leader and follower and learner'.
ReplicaReadMixed
)

// IsFollowerRead checks if leader is going to be used to read data.
func (r ReplicaReadType) IsFollowerRead() bool {
return r == ReplicaReadFollower
// In some cases the default value is 0, which should be treated as `ReplicaReadLeader`.
return r != ReplicaReadLeader && r != 0
}

// Those limits is enforced to make sure the transaction can be well handled by TiKV.
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
case TiDBReplicaRead:
if strings.EqualFold(val, "follower") {
s.ReplicaRead = kv.ReplicaReadFollower
} else if strings.EqualFold(val, "leader-and-follower") {
s.ReplicaRead = kv.ReplicaReadMixed
} else if strings.EqualFold(val, "leader") || len(val) == 0 {
s.ReplicaRead = kv.ReplicaReadLeader
}
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
case TiDBReplicaRead:
if strings.EqualFold(value, "follower") {
return "follower", nil
} else if strings.EqualFold(value, "leader-and-follower") {
return "leader-and-follower", nil
} else if strings.EqualFold(value, "leader") || len(value) == 0 {
return "leader", nil
}
Expand Down
6 changes: 6 additions & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,12 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) {
c.Assert(val, Equals, "leader")
c.Assert(v.ReplicaRead, Equals, kv.ReplicaReadLeader)

SetSessionSystemVar(v, TiDBReplicaRead, types.NewStringDatum("leader-and-follower"))
val, err = GetSessionSystemVar(v, TiDBReplicaRead)
c.Assert(err, IsNil)
c.Assert(val, Equals, "leader-and-follower")
c.Assert(v.ReplicaRead, Equals, kv.ReplicaReadMixed)

SetSessionSystemVar(v, TiDBEnableStmtSummary, types.NewStringDatum("on"))
val, err = GetSessionSystemVar(v, TiDBEnableStmtSummary)
c.Assert(err, IsNil)
Expand Down
26 changes: 26 additions & 0 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,25 @@ func (r *RegionStore) follower(seed uint32) int32 {
return r.workTiKVIdx
}

// return next leader or follower store's index
func (r *RegionStore) peer(seed uint32) int32 {
candidates := make([]int32, 0, len(r.stores))
for i := 0; i < len(r.stores); i++ {
if r.stores[i].storeType != kv.TiKV {
continue
}
if r.storeFails[i] != atomic.LoadUint32(&r.stores[i].fail) {
continue
}
candidates = append(candidates, int32(i))
}

if len(candidates) == 0 {
return r.workTiKVIdx
}
return candidates[int32(seed)%int32(len(candidates))]
}

// init initializes region after constructed.
func (r *Region) init(c *RegionCache) {
// region store pull used store from global store map
Expand Down Expand Up @@ -305,6 +324,8 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe
switch replicaRead {
case kv.ReplicaReadFollower:
store, peer, storeIdx = cachedRegion.FollowerStorePeer(regionStore, followerStoreSeed)
case kv.ReplicaReadMixed:
store, peer, storeIdx = cachedRegion.AnyStorePeer(regionStore, followerStoreSeed)
default:
store, peer, storeIdx = cachedRegion.WorkStorePeer(regionStore)
}
Expand Down Expand Up @@ -959,6 +980,11 @@ func (r *Region) FollowerStorePeer(rs *RegionStore, followerStoreSeed uint32) (*
return r.getStorePeer(rs, rs.follower(followerStoreSeed))
}

// AnyStorePeer returns a leader or follower store with the associated peer.
func (r *Region) AnyStorePeer(rs *RegionStore, followerStoreSeed uint32) (*Store, *metapb.Peer, int) {
return r.getStorePeer(rs, rs.peer(followerStoreSeed))
}

// RegionVerID is a unique ID that can identify a Region at a specific version.
type RegionVerID struct {
id uint64
Expand Down
62 changes: 62 additions & 0 deletions store/tikv/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,41 @@ func (s *testRegionCacheSuite) TestFollowerReadFallback(c *C) {
c.Assert(ctx.Peer.Id, Equals, peer3)
}

func (s *testRegionCacheSuite) TestMixedReadFallback(c *C) {
// 3 nodes and no.1 is leader.
store3 := s.cluster.AllocID()
peer3 := s.cluster.AllocID()
s.cluster.AddStore(store3, s.storeAddr(store3))
s.cluster.AddPeer(s.region1, store3, peer3)
s.cluster.ChangeLeader(s.region1, s.peer1)

loc, err := s.cache.LocateKey(s.bo, []byte("a"))
c.Assert(err, IsNil)
ctx, err := s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0)
c.Assert(err, IsNil)
c.Assert(ctx.Peer.Id, Equals, s.peer1)
c.Assert(len(ctx.Meta.Peers), Equals, 3)

// verify follower to be store1, store2 and store3
ctxFollower1, err := s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadMixed, 0)
c.Assert(err, IsNil)
c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1)

ctxFollower2, err := s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadMixed, 1)
c.Assert(err, IsNil)
c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2)

ctxFollower3, err := s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadMixed, 2)
c.Assert(err, IsNil)
c.Assert(ctxFollower3.Peer.Id, Equals, peer3)

// send fail on store2, next follower read is going to fallback to store3
s.cache.OnSendFail(s.bo, ctxFollower1, false, errors.New("test error"))
ctx, err = s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadMixed, 0)
c.Assert(err, IsNil)
c.Assert(ctx.Peer.Id, Equals, s.peer2)
}

func (s *testRegionCacheSuite) TestFollowerMeetEpochNotMatch(c *C) {
// 3 nodes and no.1 is region1 leader.
store3 := s.cluster.AllocID()
Expand Down Expand Up @@ -872,6 +907,33 @@ func (s *testRegionCacheSuite) TestFollowerMeetEpochNotMatch(c *C) {
c.Assert(followReqSeed, Equals, uint32(1))
}

func (s *testRegionCacheSuite) TestMixedMeetEpochNotMatch(c *C) {
// 3 nodes and no.1 is region1 leader.
store3 := s.cluster.AllocID()
peer3 := s.cluster.AllocID()
s.cluster.AddStore(store3, s.storeAddr(store3))
s.cluster.AddPeer(s.region1, store3, peer3)
s.cluster.ChangeLeader(s.region1, s.peer1)

// Check the two regions.
loc1, err := s.cache.LocateKey(s.bo, []byte("a"))
c.Assert(err, IsNil)
c.Assert(loc1.Region.id, Equals, s.region1)

reqSend := NewRegionRequestSender(s.cache, nil)

// follower read failed on store1
followReqSeed := uint32(0)
ctxFollower1, err := s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadMixed, followReqSeed)
c.Assert(err, IsNil)
c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1)
c.Assert(ctxFollower1.Store.storeID, Equals, s.store1)

regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}
reqSend.onRegionError(s.bo, ctxFollower1, &followReqSeed, regionErr)
c.Assert(followReqSeed, Equals, uint32(1))
}

func createSampleRegion(startKey, endKey []byte) *Region {
return &Region{
meta: &metapb.Region{
Expand Down

0 comments on commit 8c1cde1

Please sign in to comment.