35
35
import static org .apache .hadoop .hbase .util .ConcurrentMapUtils .computeIfAbsent ;
36
36
37
37
import java .io .IOException ;
38
- import java .util .ArrayList ;
39
38
import java .util .Arrays ;
40
39
import java .util .HashSet ;
41
40
import java .util .Iterator ;
42
41
import java .util .LinkedHashMap ;
43
- import java .util .List ;
44
42
import java .util .Map ;
45
43
import java .util .Optional ;
46
44
import java .util .Set ;
@@ -124,26 +122,6 @@ public boolean equals(Object obj) {
124
122
}
125
123
}
126
124
127
- private static final class RegionLocationsFutureResult {
128
- private final CompletableFuture <RegionLocations > future ;
129
- private final RegionLocations result ;
130
- private final Throwable e ;
131
-
132
- public RegionLocationsFutureResult (CompletableFuture <RegionLocations > future ,
133
- RegionLocations result , Throwable e ) {
134
- this .future = future ;
135
- this .result = result ;
136
- this .e = e ;
137
- }
138
-
139
- public void complete () {
140
- if (e != null ) {
141
- future .completeExceptionally (e );
142
- }
143
- future .complete (result );
144
- }
145
- }
146
-
147
125
private static final class TableCache {
148
126
149
127
private final ConcurrentNavigableMap <byte [], RegionLocations > cache =
@@ -170,20 +148,18 @@ public Optional<LocateRequest> getCandidate() {
170
148
return allRequests .keySet ().stream ().filter (r -> !isPending (r )).findFirst ();
171
149
}
172
150
173
- public List <RegionLocationsFutureResult > clearCompletedRequests (RegionLocations locations ) {
174
- List <RegionLocationsFutureResult > futureResultList = new ArrayList <>();
151
+ public void clearCompletedRequests (RegionLocations locations ) {
175
152
for (Iterator <Map .Entry <LocateRequest , CompletableFuture <RegionLocations >>> iter =
176
153
allRequests .entrySet ().iterator (); iter .hasNext ();) {
177
154
Map .Entry <LocateRequest , CompletableFuture <RegionLocations >> entry = iter .next ();
178
- if (tryComplete (entry .getKey (), entry .getValue (), locations , futureResultList )) {
155
+ if (tryComplete (entry .getKey (), entry .getValue (), locations )) {
179
156
iter .remove ();
180
157
}
181
158
}
182
- return futureResultList ;
183
159
}
184
160
185
161
private boolean tryComplete (LocateRequest req , CompletableFuture <RegionLocations > future ,
186
- RegionLocations locations , List < RegionLocationsFutureResult > futureResultList ) {
162
+ RegionLocations locations ) {
187
163
if (future .isDone ()) {
188
164
return true ;
189
165
}
@@ -209,7 +185,7 @@ private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations
209
185
completed = loc .getRegion ().containsRow (req .row );
210
186
}
211
187
if (completed ) {
212
- futureResultList . add ( new RegionLocationsFutureResult ( future , locations , null ) );
188
+ future . complete ( locations );
213
189
return true ;
214
190
} else {
215
191
return false ;
@@ -343,36 +319,32 @@ private void complete(TableName tableName, LocateRequest req, RegionLocations lo
343
319
TableCache tableCache = getTableCache (tableName );
344
320
if (locs != null ) {
345
321
RegionLocations addedLocs = addToCache (tableCache , locs );
346
- List <RegionLocationsFutureResult > futureResultList = new ArrayList <>();
347
322
synchronized (tableCache ) {
348
323
tableCache .pendingRequests .remove (req );
349
- futureResultList . addAll ( tableCache .clearCompletedRequests (addedLocs ) );
324
+ tableCache .clearCompletedRequests (addedLocs );
350
325
// Remove a complete locate request in a synchronized block, so the table cache must have
351
326
// quota to send a candidate request.
352
327
toSend = tableCache .getCandidate ();
353
328
toSend .ifPresent (r -> tableCache .send (r ));
354
329
}
355
- futureResultList .forEach (RegionLocationsFutureResult ::complete );
356
330
toSend .ifPresent (r -> locateInMeta (tableName , r ));
357
331
} else {
358
332
// we meet an error
359
333
assert error != null ;
360
- List <RegionLocationsFutureResult > futureResultList = new ArrayList <>();
361
334
synchronized (tableCache ) {
362
335
tableCache .pendingRequests .remove (req );
363
336
// fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
364
337
// already retried several times
365
- CompletableFuture <RegionLocations > future = tableCache .allRequests .remove (req );
338
+ CompletableFuture <? > future = tableCache .allRequests .remove (req );
366
339
if (future != null ) {
367
- futureResultList . add ( new RegionLocationsFutureResult ( future , null , error ) );
340
+ future . completeExceptionally ( error );
368
341
}
369
- futureResultList . addAll ( tableCache .clearCompletedRequests (null ) );
342
+ tableCache .clearCompletedRequests (null );
370
343
// Remove a complete locate request in a synchronized block, so the table cache must have
371
344
// quota to send a candidate request.
372
345
toSend = tableCache .getCandidate ();
373
346
toSend .ifPresent (r -> tableCache .send (r ));
374
347
}
375
- futureResultList .forEach (RegionLocationsFutureResult ::complete );
376
348
toSend .ifPresent (r -> locateInMeta (tableName , r ));
377
349
}
378
350
}
@@ -570,11 +542,9 @@ public void onNext(Result[] results, ScanController controller) {
570
542
continue ;
571
543
}
572
544
RegionLocations addedLocs = addToCache (tableCache , locs );
573
- List <RegionLocationsFutureResult > futureResultList = new ArrayList <>();
574
545
synchronized (tableCache ) {
575
- futureResultList . addAll ( tableCache .clearCompletedRequests (addedLocs ) );
546
+ tableCache .clearCompletedRequests (addedLocs );
576
547
}
577
- futureResultList .forEach (RegionLocationsFutureResult ::complete );
578
548
}
579
549
}
580
550
}
@@ -706,16 +676,12 @@ void clearCache(TableName tableName) {
706
676
if (tableCache == null ) {
707
677
return ;
708
678
}
709
- List <RegionLocationsFutureResult > futureResultList = new ArrayList <>();
710
679
synchronized (tableCache ) {
711
680
if (!tableCache .allRequests .isEmpty ()) {
712
681
IOException error = new IOException ("Cache cleared" );
713
- tableCache .allRequests .values ().forEach (f -> {
714
- futureResultList .add (new RegionLocationsFutureResult (f , null , error ));
715
- });
682
+ tableCache .allRequests .values ().forEach (f -> f .completeExceptionally (error ));
716
683
}
717
684
}
718
- futureResultList .forEach (RegionLocationsFutureResult ::complete );
719
685
conn .getConnectionMetrics ()
720
686
.ifPresent (metrics -> metrics .incrMetaCacheNumClearRegion (tableCache .cache .size ()));
721
687
}
0 commit comments