Skip to content

Commit 0d15c52

Browse files
authored
Support follower read (tikv#96)
1 parent 9dc52d8 commit 0d15c52

File tree

8 files changed

+106
-27
lines changed

8 files changed

+106
-27
lines changed

src/main/java/org/tikv/common/PDClient.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,8 @@ public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) {
224224
resp.getLeader(),
225225
conf.getIsolationLevel(),
226226
conf.getCommandPriority(),
227-
conf.getKvMode());
227+
conf.getKvMode(),
228+
conf.isReplicaRead());
228229
}
229230

230231
@Override
@@ -237,7 +238,8 @@ public Future<TiRegion> getRegionByKeyAsync(BackOffer backOffer, ByteString key)
237238
resp.getLeader(),
238239
conf.getIsolationLevel(),
239240
conf.getCommandPriority(),
240-
conf.getKvMode()));
241+
conf.getKvMode(),
242+
conf.isReplicaRead()));
241243
Supplier<GetRegionRequest> request =
242244
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build();
243245

@@ -263,7 +265,8 @@ public TiRegion getRegionByID(BackOffer backOffer, long id) {
263265
resp.getLeader(),
264266
conf.getIsolationLevel(),
265267
conf.getCommandPriority(),
266-
conf.getKvMode());
268+
conf.getKvMode(),
269+
conf.isReplicaRead());
267270
}
268271

269272
@Override
@@ -276,7 +279,8 @@ public Future<TiRegion> getRegionByIDAsync(BackOffer backOffer, long id) {
276279
resp.getLeader(),
277280
conf.getIsolationLevel(),
278281
conf.getCommandPriority(),
279-
conf.getKvMode()));
282+
conf.getKvMode(),
283+
conf.isReplicaRead()));
280284

281285
Supplier<GetRegionByIDRequest> request =
282286
() -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
@@ -334,6 +338,11 @@ public List<Store> getAllStores(BackOffer backOffer) {
334338
.getStoresList();
335339
}
336340

341+
@Override
342+
public boolean isReplicaRead() {
343+
return conf.isReplicaRead();
344+
}
345+
337346
@Override
338347
public void close() throws InterruptedException {
339348
etcdClient.close();

src/main/java/org/tikv/common/ReadOnlyPDClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,6 @@ public interface ReadOnlyPDClient {
6363
Future<Store> getStoreAsync(BackOffer backOffer, long storeId);
6464

6565
List<Store> getAllStores(BackOffer backOffer);
66+
67+
boolean isReplicaRead();
6668
}

src/main/java/org/tikv/common/TiConfiguration.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class TiConfiguration implements Serializable {
5252
private static final int DEF_KV_CLIENT_CONCURRENCY = 10;
5353
private static final KVMode DEF_KV_MODE = KVMode.TXN;
5454
private static final int DEF_RAW_CLIENT_CONCURRENCY = 200;
55+
private static final boolean DEF_IS_REPLICA_READ = false;
5556

5657
private int timeout = DEF_TIMEOUT;
5758
private TimeUnit timeoutUnit = DEF_TIMEOUT_UNIT;
@@ -78,6 +79,7 @@ public class TiConfiguration implements Serializable {
7879
private int partitionPerSplit = DEF_PARTITION_PER_SPLIT;
7980

8081
private int kvClientConcurrency = DEF_KV_CLIENT_CONCURRENCY;
82+
private boolean isReplicaRead = DEF_IS_REPLICA_READ;
8183

8284
public enum KVMode {
8385
TXN,
@@ -314,4 +316,12 @@ public int getKvClientConcurrency() {
314316
public void setKvClientConcurrency(int kvClientConcurrency) {
315317
this.kvClientConcurrency = kvClientConcurrency;
316318
}
319+
320+
public boolean isReplicaRead() {
321+
return isReplicaRead;
322+
}
323+
324+
public void setReplicaRead(boolean isReplicaRead) {
325+
this.isReplicaRead = isReplicaRead;
326+
}
317327
}

src/main/java/org/tikv/common/region/RegionManager.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class RegionManager {
4949
// TODO: the region cache logic need rewrite.
5050
// https://github.com/pingcap/tispark/issues/1170
5151
private final RegionCache cache;
52+
private final boolean isReplicaRead;
5253

5354
private final Function<CacheInvalidateEvent, Void> cacheInvalidateCallback;
5455

@@ -57,11 +58,13 @@ public class RegionManager {
5758
public RegionManager(
5859
ReadOnlyPDClient pdClient, Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) {
5960
this.cache = new RegionCache(pdClient);
61+
this.isReplicaRead = pdClient.isReplicaRead();
6062
this.cacheInvalidateCallback = cacheInvalidateCallback;
6163
}
6264

6365
public RegionManager(ReadOnlyPDClient pdClient) {
6466
this.cache = new RegionCache(pdClient);
67+
this.isReplicaRead = pdClient.isReplicaRead();
6568
this.cacheInvalidateCallback = null;
6669
}
6770

@@ -112,8 +115,13 @@ public Pair<TiRegion, Store> getRegionStorePairByKey(
112115

113116
Store store = null;
114117
if (storeType == TiStoreType.TiKV) {
115-
Peer leader = region.getLeader();
116-
store = cache.getStoreById(leader.getStoreId(), backOffer);
118+
if (isReplicaRead) {
119+
Peer peer = region.getCurrentFollower();
120+
store = cache.getStoreById(peer.getStoreId(), backOffer);
121+
} else {
122+
Peer leader = region.getLeader();
123+
store = cache.getStoreById(leader.getStoreId(), backOffer);
124+
}
117125
} else {
118126
outerLoop:
119127
for (Peer peer : region.getLearnerList()) {

src/main/java/org/tikv/common/region/RegionStoreClient.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ private synchronized Boolean getIsV4() {
7979
private RegionStoreClient(
8080
TiConfiguration conf,
8181
TiRegion region,
82-
Store store,
82+
String storeVersion,
8383
TiStoreType storeType,
8484
ChannelFactory channelFactory,
8585
TikvBlockingStub blockingStub,
@@ -93,7 +93,7 @@ private RegionStoreClient(
9393
if (this.storeType == TiStoreType.TiKV) {
9494
this.lockResolverClient =
9595
AbstractLockResolverClient.getInstance(
96-
store,
96+
storeVersion,
9797
conf,
9898
region,
9999
this.blockingStub,
@@ -118,7 +118,7 @@ private RegionStoreClient(
118118

119119
this.lockResolverClient =
120120
AbstractLockResolverClient.getInstance(
121-
tikvStore,
121+
tikvStore.getVersion(),
122122
conf,
123123
region,
124124
tikvBlockingStub,
@@ -790,7 +790,8 @@ public List<TiRegion> splitRegion(Iterable<ByteString> splitKeys) {
790790
null,
791791
conf.getIsolationLevel(),
792792
conf.getCommandPriority(),
793-
conf.getKvMode()))
793+
conf.getKvMode(),
794+
conf.isReplicaRead()))
794795
.collect(Collectors.toList());
795796
}
796797

@@ -1021,7 +1022,7 @@ public RegionStoreClient build(TiRegion region, Store store, TiStoreType storeTy
10211022
return new RegionStoreClient(
10221023
conf,
10231024
region,
1024-
store,
1025+
store.getVersion(),
10251026
storeType,
10261027
channelFactory,
10271028
blockingStub,

src/main/java/org/tikv/common/region/TiRegion.java

Lines changed: 60 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,27 +41,48 @@ public class TiRegion implements Serializable {
4141
private final Region meta;
4242
private final IsolationLevel isolationLevel;
4343
private final Kvrpcpb.CommandPri commandPri;
44-
private Peer peer;
44+
private Peer leader;
45+
private int followerIdx = 0;
46+
private final boolean isReplicaRead;
4547

4648
public TiRegion(
4749
Region meta,
48-
Peer peer,
50+
Peer leader,
4951
IsolationLevel isolationLevel,
5052
Kvrpcpb.CommandPri commandPri,
5153
KVMode kvMode) {
54+
this(meta, leader, isolationLevel, commandPri, kvMode, false);
55+
}
56+
57+
public TiRegion(
58+
Region meta,
59+
Peer leader,
60+
IsolationLevel isolationLevel,
61+
Kvrpcpb.CommandPri commandPri,
62+
KVMode kvMode,
63+
boolean isReplicaRead) {
5264
Objects.requireNonNull(meta, "meta is null");
5365
this.meta = decodeRegion(meta, kvMode == KVMode.RAW);
54-
if (peer == null || peer.getId() == 0) {
66+
if (leader == null || leader.getId() == 0) {
5567
if (meta.getPeersCount() == 0) {
5668
throw new TiClientInternalException("Empty peer list for region " + meta.getId());
5769
}
5870
// region's first peer is leader.
59-
this.peer = meta.getPeers(0);
71+
this.leader = meta.getPeers(0);
6072
} else {
61-
this.peer = peer;
73+
this.leader = leader;
74+
}
75+
if (isReplicaRead && meta.getPeersCount() > 0) {
76+
// try to get first follower
77+
try {
78+
getNextFollower();
79+
} catch (Exception ignore) {
80+
// ignore
81+
}
6282
}
6383
this.isolationLevel = isolationLevel;
6484
this.commandPri = commandPri;
85+
this.isReplicaRead = isReplicaRead;
6586
}
6687

6788
private Region decodeRegion(Region region, boolean isRawRegion) {
@@ -89,7 +110,24 @@ private Region decodeRegion(Region region, boolean isRawRegion) {
89110
}
90111

91112
public Peer getLeader() {
92-
return peer;
113+
return leader;
114+
}
115+
116+
public Peer getCurrentFollower() {
117+
return meta.getPeers(followerIdx);
118+
}
119+
120+
public Peer getNextFollower() {
121+
int cnt = meta.getPeersCount();
122+
for (int retry = cnt - 1; retry > 0; retry--) {
123+
followerIdx = (followerIdx + 1) % cnt;
124+
Peer cur = meta.getPeers(followerIdx);
125+
if (cur.getIsLearner()) {
126+
continue;
127+
}
128+
return cur;
129+
}
130+
return leader;
93131
}
94132

95133
public List<Peer> getLearnerList() {
@@ -130,7 +168,18 @@ public Kvrpcpb.Context getContext(Set<Long> resolvedLocks) {
130168
Kvrpcpb.Context.Builder builder = Kvrpcpb.Context.newBuilder();
131169
builder.setIsolationLevel(this.isolationLevel);
132170
builder.setPriority(this.commandPri);
133-
builder.setRegionId(meta.getId()).setPeer(this.peer).setRegionEpoch(this.meta.getRegionEpoch());
171+
if (isReplicaRead) {
172+
builder
173+
.setRegionId(meta.getId())
174+
.setPeer(getCurrentFollower())
175+
.setReplicaRead(true)
176+
.setRegionEpoch(this.meta.getRegionEpoch());
177+
} else {
178+
builder
179+
.setRegionId(meta.getId())
180+
.setPeer(this.leader)
181+
.setRegionEpoch(this.meta.getRegionEpoch());
182+
}
134183
builder.addAllResolvedLocks(resolvedLocks);
135184
return builder.build();
136185
}
@@ -152,7 +201,7 @@ boolean switchPeer(long leaderStoreID) {
152201
List<Peer> peers = meta.getPeersList();
153202
for (Peer p : peers) {
154203
if (p.getStoreId() == leaderStoreID) {
155-
this.peer = p;
204+
this.leader = p;
156205
return true;
157206
}
158207
}
@@ -186,7 +235,7 @@ public boolean contains(ByteString key) {
186235
}
187236

188237
public boolean isValid() {
189-
return peer != null && meta != null;
238+
return leader != null && meta != null;
190239
}
191240

192241
public Metapb.RegionEpoch getRegionEpoch() {
@@ -204,14 +253,14 @@ public boolean equals(final Object another) {
204253
}
205254
TiRegion anotherRegion = ((TiRegion) another);
206255
return anotherRegion.meta.equals(this.meta)
207-
&& anotherRegion.peer.equals(this.peer)
256+
&& anotherRegion.leader.equals(this.leader)
208257
&& anotherRegion.commandPri.equals(this.commandPri)
209258
&& anotherRegion.isolationLevel.equals(this.isolationLevel);
210259
}
211260

212261
@Override
213262
public int hashCode() {
214-
return Objects.hash(meta, peer, isolationLevel, commandPri);
263+
return Objects.hash(meta, leader, isolationLevel, commandPri);
215264
}
216265

217266
@Override

src/main/java/org/tikv/txn/AbstractLockResolverClient.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.tikv.common.util.BackOffer;
3030
import org.tikv.common.util.ChannelFactory;
3131
import org.tikv.kvproto.Kvrpcpb;
32-
import org.tikv.kvproto.Metapb;
3332
import org.tikv.kvproto.TikvGrpc;
3433

3534
public interface AbstractLockResolverClient {
@@ -67,7 +66,7 @@ static Lock extractLockFromKeyErr(Kvrpcpb.KeyError keyError) {
6766
}
6867

6968
static AbstractLockResolverClient getInstance(
70-
Metapb.Store store,
69+
String storeVersion,
7170
TiConfiguration conf,
7271
TiRegion region,
7372
TikvGrpc.TikvBlockingStub blockingStub,
@@ -76,10 +75,10 @@ static AbstractLockResolverClient getInstance(
7675
RegionManager regionManager,
7776
PDClient pdClient,
7877
RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
79-
if (StoreVersion.compareTo(store.getVersion(), Version.RESOLVE_LOCK_V3) < 0) {
78+
if (StoreVersion.compareTo(storeVersion, Version.RESOLVE_LOCK_V3) < 0) {
8079
return new LockResolverClientV2(
8180
conf, region, blockingStub, asyncStub, channelFactory, regionManager);
82-
} else if (StoreVersion.compareTo(store.getVersion(), Version.RESOLVE_LOCK_V4) < 0) {
81+
} else if (StoreVersion.compareTo(storeVersion, Version.RESOLVE_LOCK_V4) < 0) {
8382
return new LockResolverClientV3(
8483
conf,
8584
region,

src/test/java/org/tikv/raw/RawKVClientTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ private static ByteString getRandomValue() {
6969
@Before
7070
public void setup() throws IOException {
7171
try {
72-
session = TiSession.create(TiConfiguration.createRawDefault(DEFAULT_PD_ADDRESS));
72+
TiConfiguration conf = TiConfiguration.createRawDefault(DEFAULT_PD_ADDRESS);
73+
session = TiSession.create(conf);
7374
initialized = false;
7475
if (client == null) {
7576
client = session.createRawClient();

0 commit comments

Comments
 (0)