Skip to content

Commit 5dc4dec

Browse files
author
Jiaming Lu
committed
Implement reverseScan
1 parent d278e3a commit 5dc4dec

File tree

8 files changed

+149
-37
lines changed

8 files changed

+149
-37
lines changed

src/main/java/org/tikv/common/KVClient.java

+26-6
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,25 @@ public List<KvPair> batchGet(BackOffer backOffer, List<ByteString> keys, long ve
101101
*/
102102
public List<KvPair> scan(ByteString startKey, ByteString endKey, long version)
103103
throws GrpcException {
104-
Iterator<KvPair> iterator = scanIterator(conf, clientBuilder, startKey, endKey, version);
104+
return scan(startKey, endKey, version, false);
105+
}
106+
107+
/**
108+
* Scan key-value pairs from TiKV in range [startKey, endKey) or if reversely, [endKey, startKey)
109+
*
110+
* @param startKey start key, inclusive
111+
* @param endKey end key, exclusive
112+
* @param reverse whether to scan reversely
113+
* @return list of key-value pairs in range
114+
*/
115+
public List<KvPair> scan(ByteString startKey, ByteString endKey, long version, boolean reverse)
116+
throws GrpcException {
117+
Iterator<KvPair> iterator;
118+
if (reverse) {
119+
iterator = scanIterator(conf, clientBuilder, endKey, startKey, version, reverse);
120+
} else {
121+
iterator = scanIterator(conf, clientBuilder, startKey, endKey, version, reverse);
122+
}
105123
List<KvPair> result = new ArrayList<>();
106124
iterator.forEachRemaining(result::add);
107125
return result;
@@ -115,7 +133,7 @@ public List<KvPair> scan(ByteString startKey, ByteString endKey, long version)
115133
* @return list of key-value pairs in range
116134
*/
117135
public List<KvPair> scan(ByteString startKey, long version, int limit) throws GrpcException {
118-
Iterator<KvPair> iterator = scanIterator(conf, clientBuilder, startKey, version, limit);
136+
Iterator<KvPair> iterator = scanIterator(conf, clientBuilder, startKey, version, limit, false);
119137
List<KvPair> result = new ArrayList<>();
120138
iterator.forEachRemaining(result::add);
121139
return result;
@@ -183,16 +201,18 @@ private Iterator<KvPair> scanIterator(
183201
RegionStoreClientBuilder builder,
184202
ByteString startKey,
185203
ByteString endKey,
186-
long version) {
187-
return new ConcreteScanIterator(conf, builder, startKey, endKey, version);
204+
long version,
205+
boolean reverse) {
206+
return new ConcreteScanIterator(conf, builder, startKey, endKey, version, reverse);
188207
}
189208

190209
private Iterator<KvPair> scanIterator(
191210
TiConfiguration conf,
192211
RegionStoreClientBuilder builder,
193212
ByteString startKey,
194213
long version,
195-
int limit) {
196-
return new ConcreteScanIterator(conf, builder, startKey, version, limit);
214+
int limit,
215+
boolean reverse) {
216+
return new ConcreteScanIterator(conf, builder, startKey, version, limit, reverse);
197217
}
198218
}

src/main/java/org/tikv/common/Snapshot.java

+36-2
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,24 @@ public Iterator<KvPair> scan(ByteString startKey) {
157157
session.getRegionStoreClientBuilder(),
158158
startKey,
159159
timestamp.getVersion(),
160-
Integer.MAX_VALUE);
160+
Integer.MAX_VALUE,
161+
false);
162+
}
163+
164+
/**
165+
* scan all keys becofe startKey, inclusive
166+
*
167+
* @param startKey start of keys
168+
* @return iterator of kvPair
169+
*/
170+
public Iterator<KvPair> reverseScan(ByteString startKey) {
171+
return new ConcreteScanIterator(
172+
session.getConf(),
173+
session.getRegionStoreClientBuilder(),
174+
startKey,
175+
timestamp.getVersion(),
176+
Integer.MAX_VALUE,
177+
true);
161178
}
162179

163180
/**
@@ -173,7 +190,24 @@ public Iterator<KvPair> scanPrefix(ByteString prefix) {
173190
session.getRegionStoreClientBuilder(),
174191
prefix,
175192
nextPrefix,
176-
timestamp.getVersion());
193+
timestamp.getVersion(),
194+
false);
195+
}
196+
/**
197+
* scan all keys with prefix, reversely
198+
*
199+
* @param prefix prefix of keys
200+
* @return iterator of kvPair
201+
*/
202+
public Iterator<KvPair> reverseScanPrefix(ByteString prefix) {
203+
ByteString nextPrefix = Key.toRawKey(prefix).nextPrefix().toByteString();
204+
return new ConcreteScanIterator(
205+
session.getConf(),
206+
session.getRegionStoreClientBuilder(),
207+
nextPrefix,
208+
prefix,
209+
timestamp.getVersion(),
210+
true);
177211
}
178212

179213
public TiConfiguration getConf() {

src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,21 @@ public ConcreteScanIterator(
4444
RegionStoreClientBuilder builder,
4545
ByteString startKey,
4646
long version,
47-
int limit) {
47+
int limit,
48+
boolean reverse) {
4849
// Passing endKey as ByteString.EMPTY means that endKey is +INF by default,
49-
this(conf, builder, startKey, ByteString.EMPTY, version, limit);
50+
this(conf, builder, startKey, ByteString.EMPTY, version, limit, reverse);
5051
}
5152

5253
public ConcreteScanIterator(
5354
TiConfiguration conf,
5455
RegionStoreClientBuilder builder,
5556
ByteString startKey,
5657
ByteString endKey,
57-
long version) {
58+
long version,
59+
boolean reverse) {
5860
// Passing endKey as ByteString.EMPTY means that endKey is +INF by default,
59-
this(conf, builder, startKey, endKey, version, Integer.MAX_VALUE);
61+
this(conf, builder, startKey, endKey, version, Integer.MAX_VALUE, reverse);
6062
}
6163

6264
private ConcreteScanIterator(
@@ -65,8 +67,9 @@ private ConcreteScanIterator(
6567
ByteString startKey,
6668
ByteString endKey,
6769
long version,
68-
int limit) {
69-
super(conf, builder, startKey, endKey, limit, false);
70+
int limit,
71+
boolean reverse) {
72+
super(conf, builder, startKey, endKey, limit, false, reverse);
7073
this.version = version;
7174
}
7275

@@ -76,7 +79,7 @@ TiRegion loadCurrentRegionToCache() throws GrpcException {
7679
try (RegionStoreClient client = builder.build(startKey)) {
7780
client.setTimeout(conf.getScanTimeout());
7881
BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff();
79-
currentCache = client.scan(backOffer, startKey, version);
82+
currentCache = client.scan(backOffer, startKey, version, reverse);
8083
// If we get region before scan, we will use region from cache which
8184
// may have wrong end key. This may miss some regions that split from old region.
8285
// Client will get the newest region during scan. So we need to

src/main/java/org/tikv/common/operation/iterator/RawScanIterator.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ public RawScanIterator(
3939
ByteString endKey,
4040
int limit,
4141
boolean keyOnly,
42+
boolean reverse,
4243
BackOffer scanBackOffer) {
43-
super(conf, builder, startKey, endKey, limit, keyOnly);
44+
super(conf, builder, startKey, endKey, limit, keyOnly, reverse);
4445

4546
this.scanBackOffer = scanBackOffer;
4647
}
@@ -56,7 +57,7 @@ TiRegion loadCurrentRegionToCache() throws GrpcException {
5657
currentCache = null;
5758
} else {
5859
try {
59-
currentCache = client.rawScan(backOffer, startKey, limit, keyOnly);
60+
currentCache = client.rawScan(backOffer, startKey, limit, keyOnly, reverse);
6061
// Client will get the newest region during scan. So we need to
6162
// update region after scan.
6263
region = client.getRegion();

src/main/java/org/tikv/common/operation/iterator/ScanIterator.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -43,21 +43,24 @@ public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
4343
protected Key endKey;
4444
protected boolean hasEndKey;
4545
protected boolean processingLastBatch = false;
46+
protected boolean reverse = false;
4647

4748
ScanIterator(
4849
TiConfiguration conf,
4950
RegionStoreClientBuilder builder,
5051
ByteString startKey,
5152
ByteString endKey,
5253
int limit,
53-
boolean keyOnly) {
54+
boolean keyOnly,
55+
boolean reverse) {
5456
this.startKey = requireNonNull(startKey, "start key is null");
5557
this.endKey = Key.toRawKey(requireNonNull(endKey, "end key is null"));
5658
this.hasEndKey = !endKey.isEmpty();
5759
this.limit = limit;
5860
this.keyOnly = keyOnly;
5961
this.conf = conf;
6062
this.builder = builder;
63+
this.reverse = reverse;
6164
}
6265

6366
/**

src/main/java/org/tikv/common/region/RegionStoreClient.java

+15-8
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@
7070
import org.tikv.common.util.HistogramUtils;
7171
import org.tikv.common.util.Pair;
7272
import org.tikv.common.util.RangeSplitter;
73-
import org.tikv.kvproto.Coprocessor;
74-
import org.tikv.kvproto.Errorpb;
73+
import org.tikv.kvproto.*;
7574
import org.tikv.kvproto.Kvrpcpb.BatchGetRequest;
7675
import org.tikv.kvproto.Kvrpcpb.BatchGetResponse;
7776
import org.tikv.kvproto.Kvrpcpb.CommitRequest;
@@ -109,8 +108,6 @@
109108
import org.tikv.kvproto.Kvrpcpb.SplitRegionResponse;
110109
import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatRequest;
111110
import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatResponse;
112-
import org.tikv.kvproto.Metapb;
113-
import org.tikv.kvproto.TikvGrpc;
114111
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
115112
import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
116113
import org.tikv.txn.AbstractLockResolverClient;
@@ -336,7 +333,7 @@ private List<KvPair> handleBatchGetResponse(
336333
}
337334

338335
public List<KvPair> scan(
339-
BackOffer backOffer, ByteString startKey, long version, boolean keyOnly) {
336+
BackOffer backOffer, ByteString startKey, long version, boolean keyOnly, boolean reverse) {
340337
boolean forWrite = false;
341338
while (true) {
342339
Supplier<ScanRequest> request =
@@ -348,6 +345,7 @@ public List<KvPair> scan(
348345
.setStartKey(codec.encodeKey(startKey))
349346
.setVersion(version)
350347
.setKeyOnly(keyOnly)
348+
.setReverse(reverse)
351349
.setLimit(getConf().getScanBatchSize())
352350
.build();
353351

@@ -417,6 +415,11 @@ public List<KvPair> scan(BackOffer backOffer, ByteString startKey, long version)
417415
return scan(backOffer, startKey, version, false);
418416
}
419417

418+
public List<KvPair> scan(
419+
BackOffer backOffer, ByteString startKey, long version, boolean reverse) {
420+
return scan(backOffer, startKey, version, false, reverse);
421+
}
422+
420423
/**
421424
* Prewrite batch keys
422425
*
@@ -1238,9 +1241,11 @@ private void handleRawBatchDelete(RawBatchDeleteResponse resp) {
12381241
* @param backOffer BackOffer
12391242
* @param key startKey
12401243
* @param keyOnly true if value of KvPair is not needed
1244+
* @param reverse
12411245
* @return KvPair list
12421246
*/
1243-
public List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit, boolean keyOnly) {
1247+
public List<KvPair> rawScan(
1248+
BackOffer backOffer, ByteString key, int limit, boolean keyOnly, boolean reverse) {
12441249
Long clusterId = pdClient.getClusterId();
12451250
Histogram.Timer requestTimer =
12461251
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_scan", clusterId.toString()).startTimer();
@@ -1254,6 +1259,7 @@ public List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit, bool
12541259
.setEndKey(range.second)
12551260
.setKeyOnly(keyOnly)
12561261
.setLimit(limit)
1262+
.setReverse(reverse)
12571263
.build();
12581264
};
12591265

@@ -1271,8 +1277,9 @@ public List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit, bool
12711277
}
12721278
}
12731279

1274-
public List<KvPair> rawScan(BackOffer backOffer, ByteString key, boolean keyOnly) {
1275-
return rawScan(backOffer, key, getConf().getScanBatchSize(), keyOnly);
1280+
public List<KvPair> rawScan(
1281+
BackOffer backOffer, ByteString key, boolean keyOnly, boolean reverse) {
1282+
return rawScan(backOffer, key, getConf().getScanBatchSize(), keyOnly, reverse);
12761283
}
12771284

12781285
private List<KvPair> rawScanHelper(RawScanResponse resp) {

src/main/java/org/tikv/raw/RawKVClient.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1011,7 +1011,7 @@ private Iterator<KvPair> rawScanIterator(
10111011
if (limit > MAX_RAW_SCAN_LIMIT) {
10121012
throw ERR_MAX_SCAN_LIMIT_EXCEEDED;
10131013
}
1014-
return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly, backOffer);
1014+
return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly, false, backOffer);
10151015
}
10161016

10171017
/**

src/main/java/org/tikv/txn/KVClient.java

+54-10
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,37 @@ public List<Kvrpcpb.KvPair> batchGet(BackOffer backOffer, List<ByteString> keys,
114114
*/
115115
public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, long version)
116116
throws GrpcException {
117+
return scan(startKey, endKey, version, false);
118+
}
119+
/**
120+
* Scan key-value pairs from TiKV reversely in range (startKey, endKey]
121+
*
122+
* @param startKey start key, inclusive
123+
* @param endKey end key, exclusive
124+
* @return list of key-value pairs in range
125+
*/
126+
public List<Kvrpcpb.KvPair> reverseScan(ByteString startKey, ByteString endKey, long version)
127+
throws GrpcException {
128+
return scan(endKey, startKey, version, true);
129+
}
130+
131+
public List<Kvrpcpb.KvPair> scan(
132+
ByteString startKey, ByteString endKey, long version, boolean reverse) throws GrpcException {
133+
Iterator<Kvrpcpb.KvPair> iterator;
134+
if (reverse) {
135+
iterator = scanIterator(conf, clientBuilder, endKey, startKey, version, reverse);
136+
} else {
137+
iterator = scanIterator(conf, clientBuilder, startKey, endKey, version, reverse);
138+
}
139+
List<Kvrpcpb.KvPair> result = new ArrayList<>();
140+
iterator.forEachRemaining(result::add);
141+
return result;
142+
}
143+
144+
public List<Kvrpcpb.KvPair> scan(ByteString startKey, long version, int limit, boolean reverse)
145+
throws GrpcException {
117146
Iterator<Kvrpcpb.KvPair> iterator =
118-
scanIterator(conf, clientBuilder, startKey, endKey, version);
147+
scanIterator(conf, clientBuilder, startKey, version, limit, reverse);
119148
List<Kvrpcpb.KvPair> result = new ArrayList<>();
120149
iterator.forEachRemaining(result::add);
121150
return result;
@@ -130,14 +159,27 @@ public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, long ve
130159
*/
131160
public List<Kvrpcpb.KvPair> scan(ByteString startKey, long version, int limit)
132161
throws GrpcException {
133-
Iterator<Kvrpcpb.KvPair> iterator = scanIterator(conf, clientBuilder, startKey, version, limit);
134-
List<Kvrpcpb.KvPair> result = new ArrayList<>();
135-
iterator.forEachRemaining(result::add);
136-
return result;
162+
return scan(startKey, version, limit, false);
163+
}
164+
165+
/**
166+
* Scan key-value pairs reversively from TiKV in range ('', endKey], maximum to `limit` pairs
167+
*
168+
* @param endKey start key, inclusive
169+
* @param limit limit of kv pairs
170+
* @return list of key-value pairs in range
171+
*/
172+
public List<Kvrpcpb.KvPair> reverseScan(ByteString endKey, long version, int limit)
173+
throws GrpcException {
174+
return scan(endKey, version, limit, true);
137175
}
138176

139177
public List<Kvrpcpb.KvPair> scan(ByteString startKey, long version) throws GrpcException {
140-
return scan(startKey, version, Integer.MAX_VALUE);
178+
return scan(startKey, version, Integer.MAX_VALUE, false);
179+
}
180+
181+
public List<Kvrpcpb.KvPair> reverseScan(ByteString endKey, long version) throws GrpcException {
182+
return scan(endKey, version, Integer.MAX_VALUE, true);
141183
}
142184

143185
public synchronized void ingest(List<Pair<ByteString, ByteString>> list) throws GrpcException {
@@ -264,17 +306,19 @@ private Iterator<Kvrpcpb.KvPair> scanIterator(
264306
RegionStoreClientBuilder builder,
265307
ByteString startKey,
266308
ByteString endKey,
267-
long version) {
268-
return new ConcreteScanIterator(conf, builder, startKey, endKey, version);
309+
long version,
310+
boolean reverse) {
311+
return new ConcreteScanIterator(conf, builder, startKey, endKey, version, reverse);
269312
}
270313

271314
private Iterator<Kvrpcpb.KvPair> scanIterator(
272315
TiConfiguration conf,
273316
RegionStoreClientBuilder builder,
274317
ByteString startKey,
275318
long version,
276-
int limit) {
277-
return new ConcreteScanIterator(conf, builder, startKey, version, limit);
319+
int limit,
320+
boolean reverse) {
321+
return new ConcreteScanIterator(conf, builder, startKey, version, limit, reverse);
278322
}
279323

280324
private void doIngest(TiRegion region, List<Pair<ByteString, ByteString>> sortedList)

0 commit comments

Comments
 (0)