2323import static org .apache .hadoop .hbase .HConstants .USE_META_REPLICAS ;
2424import static org .apache .hadoop .hbase .HConstants .ZEROES ;
2525import static org .apache .hadoop .hbase .TableName .META_TABLE_NAME ;
26- import static org .apache .hadoop .hbase .client .AsyncRegionLocatorHelper .canUpdateOnError ;
2726import static org .apache .hadoop .hbase .client .AsyncRegionLocatorHelper .createRegionLocations ;
2827import static org .apache .hadoop .hbase .client .AsyncRegionLocatorHelper .isGood ;
29- import static org .apache .hadoop .hbase .client .AsyncRegionLocatorHelper .removeRegionLocation ;
3028import static org .apache .hadoop .hbase .client .ConnectionUtils .createClosestRowAfter ;
3129import static org .apache .hadoop .hbase .client .ConnectionUtils .isEmptyStopRow ;
3230import static org .apache .hadoop .hbase .client .RegionInfo .createRegionName ;
3331import static org .apache .hadoop .hbase .client .RegionLocator .LOCATOR_META_REPLICAS_MODE ;
34- import static org .apache .hadoop .hbase .util .Bytes .BYTES_COMPARATOR ;
3532import static org .apache .hadoop .hbase .util .ConcurrentMapUtils .computeIfAbsent ;
3633
3734import java .io .IOException ;
4744import java .util .concurrent .CompletableFuture ;
4845import java .util .concurrent .ConcurrentHashMap ;
4946import java .util .concurrent .ConcurrentMap ;
50- import java .util .concurrent .ConcurrentNavigableMap ;
51- import java .util .concurrent .ConcurrentSkipListMap ;
5247import java .util .concurrent .TimeUnit ;
5348import org .apache .commons .lang3 .ObjectUtils ;
5449import org .apache .hadoop .hbase .CatalogReplicaMode ;
6661import org .slf4j .Logger ;
6762import org .slf4j .LoggerFactory ;
6863
69- import org .apache .hbase .thirdparty .com .google .common .base .Objects ;
70-
7164/**
7265 * The asynchronous locator for regions other than meta.
7366 */
@@ -146,13 +139,15 @@ public void complete() {
146139
147140 private static final class TableCache {
148141
149- private final ConcurrentNavigableMap <byte [], RegionLocations > cache =
150- new ConcurrentSkipListMap <>(BYTES_COMPARATOR );
151-
152142 private final Set <LocateRequest > pendingRequests = new HashSet <>();
153143
154144 private final Map <LocateRequest , CompletableFuture <RegionLocations >> allRequests =
155145 new LinkedHashMap <>();
146+ private final AsyncRegionLocationCache regionLocationCache ;
147+
148+ public TableCache (TableName tableName ) {
149+ regionLocationCache = new AsyncRegionLocationCache (tableName );
150+ }
156151
157152 public boolean hasQuota (int max ) {
158153 return pendingRequests .size () < max ;
@@ -262,76 +257,7 @@ private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations
262257 }
263258
264259 private TableCache getTableCache (TableName tableName ) {
265- return computeIfAbsent (cache , tableName , TableCache ::new );
266- }
267-
268- private boolean isEqual (RegionLocations locs1 , RegionLocations locs2 ) {
269- HRegionLocation [] locArr1 = locs1 .getRegionLocations ();
270- HRegionLocation [] locArr2 = locs2 .getRegionLocations ();
271- if (locArr1 .length != locArr2 .length ) {
272- return false ;
273- }
274- for (int i = 0 ; i < locArr1 .length ; i ++) {
275- // do not need to compare region info
276- HRegionLocation loc1 = locArr1 [i ];
277- HRegionLocation loc2 = locArr2 [i ];
278- if (loc1 == null ) {
279- if (loc2 != null ) {
280- return false ;
281- }
282- } else {
283- if (loc2 == null ) {
284- return false ;
285- }
286- if (loc1 .getSeqNum () != loc2 .getSeqNum ()) {
287- return false ;
288- }
289- if (!Objects .equal (loc1 .getServerName (), loc2 .getServerName ())) {
290- return false ;
291- }
292- }
293- }
294- return true ;
295- }
296-
297- // if we successfully add the locations to cache, return the locations, otherwise return the one
298- // which prevents us being added. The upper layer can use this value to complete pending requests.
299- private RegionLocations addToCache (TableCache tableCache , RegionLocations locs ) {
300- LOG .trace ("Try adding {} to cache" , locs );
301- byte [] startKey = locs .getRegionLocation ().getRegion ().getStartKey ();
302- for (;;) {
303- RegionLocations oldLocs = tableCache .cache .putIfAbsent (startKey , locs );
304- if (oldLocs == null ) {
305- return locs ;
306- }
307- // check whether the regions are the same, this usually happens when table is split/merged, or
308- // deleted and recreated again.
309- RegionInfo region = locs .getRegionLocation ().getRegion ();
310- RegionInfo oldRegion = oldLocs .getRegionLocation ().getRegion ();
311- if (region .getEncodedName ().equals (oldRegion .getEncodedName ())) {
312- RegionLocations mergedLocs = oldLocs .mergeLocations (locs );
313- if (isEqual (mergedLocs , oldLocs )) {
314- // the merged one is the same with the old one, give up
315- LOG .trace ("Will not add {} to cache because the old value {} "
316- + " is newer than us or has the same server name."
317- + " Maybe it is updated before we replace it" , locs , oldLocs );
318- return oldLocs ;
319- }
320- if (tableCache .cache .replace (startKey , oldLocs , mergedLocs )) {
321- return mergedLocs ;
322- }
323- } else {
324- // the region is different, here we trust the one we fetched. This maybe wrong but finally
325- // the upper layer can detect this and trigger removal of the wrong locations
326- if (LOG .isDebugEnabled ()) {
327- LOG .debug ("The newnly fetch region {} is different from the old one {} for row '{}',"
328- + " try replaing the old one..." , region , oldRegion , Bytes .toStringBinary (startKey ));
329- }
330- if (tableCache .cache .replace (startKey , oldLocs , locs )) {
331- return locs ;
332- }
333- }
334- }
260+ return computeIfAbsent (cache , tableName , () -> new TableCache (tableName ));
335261 }
336262
337263 private void complete (TableName tableName , LocateRequest req , RegionLocations locs ,
@@ -343,7 +269,7 @@ private void complete(TableName tableName, LocateRequest req, RegionLocations lo
343269 Optional <LocateRequest > toSend = Optional .empty ();
344270 TableCache tableCache = getTableCache (tableName );
345271 if (locs != null ) {
346- RegionLocations addedLocs = addToCache ( tableCache , locs );
272+ RegionLocations addedLocs = tableCache . regionLocationCache . add ( locs );
347273 List <RegionLocationsFutureResult > futureResultList = new ArrayList <>();
348274 synchronized (tableCache ) {
349275 tableCache .pendingRequests .remove (req );
@@ -421,62 +347,24 @@ private void recordCacheMiss() {
421347 conn .getConnectionMetrics ().ifPresent (MetricsConnection ::incrMetaCacheMiss );
422348 }
423349
424- private RegionLocations locateRowInCache (TableCache tableCache , TableName tableName , byte [] row ,
425- int replicaId ) {
426- Map .Entry <byte [], RegionLocations > entry = tableCache .cache .floorEntry (row );
427- if (entry == null ) {
428- recordCacheMiss ();
429- return null ;
430- }
431- RegionLocations locs = entry .getValue ();
432- HRegionLocation loc = locs .getRegionLocation (replicaId );
433- if (loc == null ) {
350+ private RegionLocations locateRowInCache (TableCache tableCache , byte [] row , int replicaId ) {
351+ RegionLocations locs = tableCache .regionLocationCache .findForRow (row , replicaId );
352+ if (locs == null ) {
434353 recordCacheMiss ();
435- return null ;
436- }
437- byte [] endKey = loc .getRegion ().getEndKey ();
438- if (isEmptyStopRow (endKey ) || Bytes .compareTo (row , endKey ) < 0 ) {
439- if (LOG .isTraceEnabled ()) {
440- LOG .trace ("Found {} in cache for {}, row='{}', locateType={}, replicaId={}" , loc , tableName ,
441- Bytes .toStringBinary (row ), RegionLocateType .CURRENT , replicaId );
442- }
443- recordCacheHit ();
444- return locs ;
445354 } else {
446- recordCacheMiss ();
447- return null ;
355+ recordCacheHit ();
448356 }
357+ return locs ;
449358 }
450359
451- private RegionLocations locateRowBeforeInCache (TableCache tableCache , TableName tableName ,
452- byte [] row , int replicaId ) {
453- boolean isEmptyStopRow = isEmptyStopRow (row );
454- Map .Entry <byte [], RegionLocations > entry =
455- isEmptyStopRow ? tableCache .cache .lastEntry () : tableCache .cache .lowerEntry (row );
456- if (entry == null ) {
360+ private RegionLocations locateRowBeforeInCache (TableCache tableCache , byte [] row , int replicaId ) {
361+ RegionLocations locs = tableCache .regionLocationCache .findForBeforeRow (row , replicaId );
362+ if (locs == null ) {
457363 recordCacheMiss ();
458- return null ;
459- }
460- RegionLocations locs = entry .getValue ();
461- HRegionLocation loc = locs .getRegionLocation (replicaId );
462- if (loc == null ) {
463- recordCacheMiss ();
464- return null ;
465- }
466- if (
467- isEmptyStopRow (loc .getRegion ().getEndKey ())
468- || (!isEmptyStopRow && Bytes .compareTo (loc .getRegion ().getEndKey (), row ) >= 0 )
469- ) {
470- if (LOG .isTraceEnabled ()) {
471- LOG .trace ("Found {} in cache for {}, row='{}', locateType={}, replicaId={}" , loc , tableName ,
472- Bytes .toStringBinary (row ), RegionLocateType .BEFORE , replicaId );
473- }
474- recordCacheHit ();
475- return locs ;
476364 } else {
477- recordCacheMiss ();
478- return null ;
365+ recordCacheHit ();
479366 }
367+ return locs ;
480368 }
481369
482370 private void locateInMeta (TableName tableName , LocateRequest req ) {
@@ -570,7 +458,7 @@ public void onNext(Result[] results, ScanController controller) {
570458 if (info == null || info .isOffline () || info .isSplitParent ()) {
571459 continue ;
572460 }
573- RegionLocations addedLocs = addToCache ( tableCache , locs );
461+ RegionLocations addedLocs = tableCache . regionLocationCache . add ( locs );
574462 List <RegionLocationsFutureResult > futureResultList = new ArrayList <>();
575463 synchronized (tableCache ) {
576464 futureResultList .addAll (tableCache .clearCompletedRequests (addedLocs ));
@@ -582,11 +470,11 @@ public void onNext(Result[] results, ScanController controller) {
582470 });
583471 }
584472
585- private RegionLocations locateInCache (TableCache tableCache , TableName tableName , byte [] row ,
586- int replicaId , RegionLocateType locateType ) {
473+ private RegionLocations locateInCache (TableCache tableCache , byte [] row , int replicaId ,
474+ RegionLocateType locateType ) {
587475 return locateType .equals (RegionLocateType .BEFORE )
588- ? locateRowBeforeInCache (tableCache , tableName , row , replicaId )
589- : locateRowInCache (tableCache , tableName , row , replicaId );
476+ ? locateRowBeforeInCache (tableCache , row , replicaId )
477+ : locateRowInCache (tableCache , row , replicaId );
590478 }
591479
592480 // locateToPrevious is true means we will use the start key of a region to locate the region
@@ -598,7 +486,7 @@ private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName
598486 assert !locateType .equals (RegionLocateType .AFTER );
599487 TableCache tableCache = getTableCache (tableName );
600488 if (!reload ) {
601- RegionLocations locs = locateInCache (tableCache , tableName , row , replicaId , locateType );
489+ RegionLocations locs = locateInCache (tableCache , row , replicaId , locateType );
602490 if (isGood (locs , replicaId )) {
603491 return CompletableFuture .completedFuture (locs );
604492 }
@@ -609,7 +497,7 @@ private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName
609497 synchronized (tableCache ) {
610498 // check again
611499 if (!reload ) {
612- RegionLocations locs = locateInCache (tableCache , tableName , row , replicaId , locateType );
500+ RegionLocations locs = locateInCache (tableCache , row , replicaId , locateType );
613501 if (isGood (locs , replicaId )) {
614502 return CompletableFuture .completedFuture (locs );
615503 }
@@ -648,51 +536,33 @@ private void recordClearRegionCache() {
648536
649537 private void removeLocationFromCache (HRegionLocation loc ) {
650538 TableCache tableCache = cache .get (loc .getRegion ().getTable ());
651- if (tableCache == null ) {
652- return ;
653- }
654- byte [] startKey = loc .getRegion ().getStartKey ();
655- for (;;) {
656- RegionLocations oldLocs = tableCache .cache .get (startKey );
657- if (oldLocs == null ) {
658- return ;
659- }
660- HRegionLocation oldLoc = oldLocs .getRegionLocation (loc .getRegion ().getReplicaId ());
661- if (!canUpdateOnError (loc , oldLoc )) {
662- return ;
663- }
664- // Tell metaReplicaSelector that the location is stale. It will create a stale entry
665- // with timestamp internally. Next time the client looks up the same location,
666- // it will pick a different meta replica region.
667- if (this .metaReplicaMode == CatalogReplicaMode .LOAD_BALANCE ) {
668- metaReplicaSelector .onError (loc );
539+ if (tableCache != null ) {
540+ if (tableCache .regionLocationCache .remove (loc )) {
541+ recordClearRegionCache ();
542+ updateMetaReplicaSelector (loc );
669543 }
544+ }
545+ }
670546
671- RegionLocations newLocs = removeRegionLocation (oldLocs , loc .getRegion ().getReplicaId ());
672- if (newLocs == null ) {
673- if (tableCache .cache .remove (startKey , oldLocs )) {
674- recordClearRegionCache ();
675- return ;
676- }
677- } else {
678- if (tableCache .cache .replace (startKey , oldLocs , newLocs )) {
679- recordClearRegionCache ();
680- return ;
681- }
682- }
547+ private void updateMetaReplicaSelector (HRegionLocation loc ) {
548+ // Tell metaReplicaSelector that the location is stale. It will create a stale entry
549+ // with timestamp internally. Next time the client looks up the same location,
550+ // it will pick a different meta replica region.
551+ if (metaReplicaMode == CatalogReplicaMode .LOAD_BALANCE ) {
552+ metaReplicaSelector .onError (loc );
683553 }
684554 }
685555
686556 void addLocationToCache (HRegionLocation loc ) {
687- addToCache ( getTableCache (loc .getRegion ().getTable ()), createRegionLocations (loc ));
557+ getTableCache (loc .getRegion ().getTable ()). regionLocationCache . add ( createRegionLocations (loc ));
688558 }
689559
690560 private HRegionLocation getCachedLocation (HRegionLocation loc ) {
691561 TableCache tableCache = cache .get (loc .getRegion ().getTable ());
692562 if (tableCache == null ) {
693563 return null ;
694564 }
695- RegionLocations locs = tableCache .cache .get (loc .getRegion ().getStartKey ());
565+ RegionLocations locs = tableCache .regionLocationCache .get (loc .getRegion ().getStartKey ());
696566 return locs != null ? locs .getRegionLocation (loc .getRegion ().getReplicaId ()) : null ;
697567 }
698568
@@ -717,8 +587,8 @@ void clearCache(TableName tableName) {
717587 }
718588 }
719589 futureResultList .forEach (RegionLocationsFutureResult ::complete );
720- conn .getConnectionMetrics ()
721- . ifPresent ( metrics -> metrics .incrMetaCacheNumClearRegion (tableCache .cache .size ()));
590+ conn .getConnectionMetrics (). ifPresent (
591+ metrics -> metrics .incrMetaCacheNumClearRegion (tableCache .regionLocationCache .size ()));
722592 }
723593
724594 void clearCache () {
@@ -727,19 +597,7 @@ void clearCache() {
727597
728598 void clearCache (ServerName serverName ) {
729599 for (TableCache tableCache : cache .values ()) {
730- for (Map .Entry <byte [], RegionLocations > entry : tableCache .cache .entrySet ()) {
731- byte [] regionName = entry .getKey ();
732- RegionLocations locs = entry .getValue ();
733- RegionLocations newLocs = locs .removeByServer (serverName );
734- if (locs == newLocs ) {
735- continue ;
736- }
737- if (newLocs .isEmpty ()) {
738- tableCache .cache .remove (regionName , locs );
739- } else {
740- tableCache .cache .replace (regionName , locs , newLocs );
741- }
742- }
600+ tableCache .regionLocationCache .removeForServer (serverName );
743601 }
744602 }
745603
@@ -749,6 +607,7 @@ RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
749607 if (tableCache == null ) {
750608 return null ;
751609 }
752- return locateRowInCache (tableCache , tableName , row , RegionReplicaUtil .DEFAULT_REPLICA_ID );
610+ return locateRowInCache (tableCache , row , RegionReplicaUtil .DEFAULT_REPLICA_ID );
753611 }
612+
754613}
0 commit comments