@@ -144,6 +144,8 @@ public class BucketCache implements BlockCache, HeapSize {
144
144
// In this map, store the block's meta data like offset, length
145
145
transient Map <BlockCacheKey , BucketEntry > backingMap ;
146
146
147
+ private AtomicBoolean backingMapValidated = new AtomicBoolean (false );
148
+
147
149
/** Set of files for which prefetch is completed */
148
150
final Map <String , Boolean > fullyCachedFiles = new ConcurrentHashMap <>();
149
151
@@ -301,7 +303,6 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
301
303
302
304
this .allocFailLogPrevTs = 0 ;
303
305
304
- bucketAllocator = new BucketAllocator (capacity , bucketSizes );
305
306
for (int i = 0 ; i < writerThreads .length ; ++i ) {
306
307
writerQueues .add (new ArrayBlockingQueue <>(writerQLen ));
307
308
}
@@ -318,10 +319,14 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
318
319
try {
319
320
retrieveFromFile (bucketSizes );
320
321
} catch (IOException ioex ) {
322
+ LOG .error ("Can't restore from file[{}] because of " , persistencePath , ioex );
321
323
backingMap .clear ();
322
324
fullyCachedFiles .clear ();
323
- LOG .error ("Can't restore from file[" + persistencePath + "] because of " , ioex );
325
+ backingMapValidated .set (true );
326
+ bucketAllocator = new BucketAllocator (capacity , bucketSizes );
324
327
}
328
+ } else {
329
+ bucketAllocator = new BucketAllocator (capacity , bucketSizes );
325
330
}
326
331
final String threadName = Thread .currentThread ().getName ();
327
332
this .cacheEnabled = true ;
@@ -374,6 +379,7 @@ protected void startWriterThreads() {
374
379
}
375
380
376
381
void startBucketCachePersisterThread () {
382
+ LOG .info ("Starting BucketCachePersisterThread" );
377
383
cachePersister = new BucketCachePersister (this , bucketcachePersistInterval );
378
384
cachePersister .setDaemon (true );
379
385
cachePersister .start ();
@@ -529,6 +535,7 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach
529
535
} else {
530
536
this .blockNumber .increment ();
531
537
this .heapSize .add (cachedItem .heapSize ());
538
+ blocksByHFile .add (cacheKey );
532
539
}
533
540
}
534
541
@@ -589,6 +596,7 @@ public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
589
596
// the cache map state might differ from the actual cache. If we reach this block,
590
597
// we should remove the cache key entry from the backing map
591
598
backingMap .remove (key );
599
+ fullyCachedFiles .remove (key .getHfileName ());
592
600
LOG .debug ("Failed to fetch block for cache key: {}." , key , hioex );
593
601
} catch (IOException ioex ) {
594
602
LOG .error ("Failed reading block " + key + " from bucket cache" , ioex );
@@ -684,6 +692,7 @@ private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry,
684
692
} else {
685
693
return bucketEntryToUse .withWriteLock (offsetLock , () -> {
686
694
if (backingMap .remove (cacheKey , bucketEntryToUse )) {
695
+ LOG .debug ("removed key {} from back map in the evict process" , cacheKey );
687
696
blockEvicted (cacheKey , bucketEntryToUse , !existedInRamCache , evictedByEvictionProcess );
688
697
return true ;
689
698
}
@@ -1255,30 +1264,43 @@ static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q,
1255
1264
@ edu .umd .cs .findbugs .annotations .SuppressWarnings (value = "OBL_UNSATISFIED_OBLIGATION" ,
1256
1265
justification = "false positive, try-with-resources ensures close is called." )
1257
1266
void persistToFile () throws IOException {
1267
+ LOG .debug ("Thread {} started persisting bucket cache to file" ,
1268
+ Thread .currentThread ().getName ());
1258
1269
if (!isCachePersistent ()) {
1259
1270
throw new IOException ("Attempt to persist non-persistent cache mappings!" );
1260
1271
}
1261
1272
File tempPersistencePath = new File (persistencePath + EnvironmentEdgeManager .currentTime ());
1262
1273
try (FileOutputStream fos = new FileOutputStream (tempPersistencePath , false )) {
1263
1274
fos .write (ProtobufMagic .PB_MAGIC );
1264
1275
BucketProtoUtils .toPB (this ).writeDelimitedTo (fos );
1276
+ } catch (IOException e ) {
1277
+ LOG .error ("Failed to persist bucket cache to file" , e );
1278
+ throw e ;
1265
1279
}
1280
+ LOG .debug ("Thread {} finished persisting bucket cache to file, renaming" ,
1281
+ Thread .currentThread ().getName ());
1266
1282
if (!tempPersistencePath .renameTo (new File (persistencePath ))) {
1267
1283
LOG .warn ("Failed to commit cache persistent file. We might lose cached blocks if "
1268
1284
+ "RS crashes/restarts before we successfully checkpoint again." );
1269
1285
}
1270
1286
}
1271
1287
1272
- private boolean isCachePersistent () {
1288
+ public boolean isCachePersistent () {
1273
1289
return ioEngine .isPersistent () && persistencePath != null ;
1274
1290
}
1275
1291
1276
1292
/**
1277
1293
* @see #persistToFile()
1278
1294
*/
1279
1295
private void retrieveFromFile (int [] bucketSizes ) throws IOException {
1296
+ LOG .info ("Started retrieving bucket cache from file" );
1280
1297
File persistenceFile = new File (persistencePath );
1281
1298
if (!persistenceFile .exists ()) {
1299
+ LOG .warn ("Persistence file missing! "
1300
+ + "It's ok if it's first run after enabling persistent cache." );
1301
+ bucketAllocator = new BucketAllocator (cacheCapacity , bucketSizes , backingMap , realCacheSize );
1302
+ blockNumber .add (backingMap .size ());
1303
+ backingMapValidated .set (true );
1282
1304
return ;
1283
1305
}
1284
1306
assert !cacheEnabled ;
@@ -1296,6 +1318,7 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException {
1296
1318
parsePB (BucketCacheProtos .BucketCacheEntry .parseDelimitedFrom (in ));
1297
1319
bucketAllocator = new BucketAllocator (cacheCapacity , bucketSizes , backingMap , realCacheSize );
1298
1320
blockNumber .add (backingMap .size ());
1321
+ LOG .info ("Bucket cache retrieved from file successfully" );
1299
1322
}
1300
1323
}
1301
1324
@@ -1368,27 +1391,43 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio
1368
1391
try {
1369
1392
((PersistentIOEngine ) ioEngine ).verifyFileIntegrity (proto .getChecksum ().toByteArray (),
1370
1393
algorithm );
1394
+ backingMapValidated .set (true );
1371
1395
} catch (IOException e ) {
1372
1396
LOG .warn ("Checksum for cache file failed. "
1373
- + "We need to validate each cache key in the backing map. This may take some time..." );
1374
- long startTime = EnvironmentEdgeManager .currentTime ();
1375
- int totalKeysOriginally = backingMap .size ();
1376
- for (Map .Entry <BlockCacheKey , BucketEntry > keyEntry : backingMap .entrySet ()) {
1377
- try {
1378
- ((FileIOEngine ) ioEngine ).checkCacheTime (keyEntry .getValue ());
1379
- } catch (IOException e1 ) {
1380
- LOG .debug ("Check for key {} failed. Removing it from map." , keyEntry .getKey ());
1381
- backingMap .remove (keyEntry .getKey ());
1382
- fullyCachedFiles .remove (keyEntry .getKey ().getHfileName ());
1397
+ + "We need to validate each cache key in the backing map. "
1398
+ + "This may take some time, so we'll do it in a background thread," );
1399
+ Runnable cacheValidator = () -> {
1400
+ while (bucketAllocator == null ) {
1401
+ try {
1402
+ Thread .sleep (50 );
1403
+ } catch (InterruptedException ex ) {
1404
+ throw new RuntimeException (ex );
1405
+ }
1383
1406
}
1384
- }
1385
- LOG .info ("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms." ,
1386
- totalKeysOriginally , backingMap .size (),
1387
- (EnvironmentEdgeManager .currentTime () - startTime ));
1407
+ long startTime = EnvironmentEdgeManager .currentTime ();
1408
+ int totalKeysOriginally = backingMap .size ();
1409
+ for (Map .Entry <BlockCacheKey , BucketEntry > keyEntry : backingMap .entrySet ()) {
1410
+ try {
1411
+ ((FileIOEngine ) ioEngine ).checkCacheTime (keyEntry .getValue ());
1412
+ } catch (IOException e1 ) {
1413
+ LOG .debug ("Check for key {} failed. Evicting." , keyEntry .getKey ());
1414
+ evictBlock (keyEntry .getKey ());
1415
+ fullyCachedFiles .remove (keyEntry .getKey ().getHfileName ());
1416
+ }
1417
+ }
1418
+ backingMapValidated .set (true );
1419
+ LOG .info ("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms." ,
1420
+ totalKeysOriginally , backingMap .size (),
1421
+ (EnvironmentEdgeManager .currentTime () - startTime ));
1422
+ };
1423
+ Thread t = new Thread (cacheValidator );
1424
+ t .setDaemon (true );
1425
+ t .start ();
1388
1426
}
1389
1427
} else {
1390
1428
// if has not checksum, it means the persistence file is old format
1391
1429
LOG .info ("Persistent file is old format, it does not support verifying file integrity!" );
1430
+ backingMapValidated .set (true );
1392
1431
}
1393
1432
verifyCapacityAndClasses (proto .getCacheCapacity (), proto .getIoClass (), proto .getMapClass ());
1394
1433
}
@@ -1417,6 +1456,7 @@ private void checkIOErrorIsTolerated() {
1417
1456
*/
1418
1457
private void disableCache () {
1419
1458
if (!cacheEnabled ) return ;
1459
+ LOG .info ("Disabling cache" );
1420
1460
cacheEnabled = false ;
1421
1461
ioEngine .shutdown ();
1422
1462
this .scheduleThreadPool .shutdown ();
@@ -1441,11 +1481,15 @@ public void shutdown() {
1441
1481
LOG .info ("Shutdown bucket cache: IO persistent=" + ioEngine .isPersistent () + "; path to write="
1442
1482
+ persistencePath );
1443
1483
if (ioEngine .isPersistent () && persistencePath != null ) {
1444
- if (cachePersister != null ) {
1445
- cachePersister .interrupt ();
1446
- }
1447
1484
try {
1448
1485
join ();
1486
+ if (cachePersister != null ) {
1487
+ LOG .info ("Shutting down cache persister thread." );
1488
+ cachePersister .shutdown ();
1489
+ while (cachePersister .isAlive ()) {
1490
+ Thread .sleep (10 );
1491
+ }
1492
+ }
1449
1493
persistToFile ();
1450
1494
} catch (IOException ex ) {
1451
1495
LOG .error ("Unable to persist data on exit: " + ex .toString (), ex );
@@ -1650,17 +1694,17 @@ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator a
1650
1694
HFileBlock block = (HFileBlock ) data ;
1651
1695
ByteBuff sliceBuf = block .getBufferReadOnly ();
1652
1696
block .getMetaData (metaBuff );
1653
- ioEngine .write (sliceBuf , offset );
1654
- // adds the cache time after the block and metadata part
1697
+ // adds the cache time prior to the block and metadata part
1655
1698
if (isCachePersistent ) {
1656
- ioEngine .write (metaBuff , offset + len - metaBuff .limit () - Long .BYTES );
1657
1699
ByteBuffer buffer = ByteBuffer .allocate (Long .BYTES );
1658
1700
buffer .putLong (bucketEntry .getCachedTime ());
1659
1701
buffer .rewind ();
1660
- ioEngine .write (buffer , (offset + len - Long .BYTES ));
1702
+ ioEngine .write (buffer , offset );
1703
+ ioEngine .write (sliceBuf , (offset + Long .BYTES ));
1661
1704
} else {
1662
- ioEngine .write (metaBuff , offset + len - metaBuff . limit () );
1705
+ ioEngine .write (sliceBuf , offset );
1663
1706
}
1707
+ ioEngine .write (metaBuff , offset + len - metaBuff .limit ());
1664
1708
} else {
1665
1709
// Only used for testing.
1666
1710
ByteBuffer bb = ByteBuffer .allocate (len );
@@ -1902,11 +1946,15 @@ public Map<BlockCacheKey, BucketEntry> getBackingMap() {
1902
1946
return backingMap ;
1903
1947
}
1904
1948
1949
+ public AtomicBoolean getBackingMapValidated () {
1950
+ return backingMapValidated ;
1951
+ }
1952
+
1905
1953
public Map <String , Boolean > getFullyCachedFiles () {
1906
1954
return fullyCachedFiles ;
1907
1955
}
1908
1956
1909
- public static Optional <BucketCache > getBuckedCacheFromCacheConfig (CacheConfig cacheConf ) {
1957
+ public static Optional <BucketCache > getBucketCacheFromCacheConfig (CacheConfig cacheConf ) {
1910
1958
if (cacheConf .getBlockCache ().isPresent ()) {
1911
1959
BlockCache bc = cacheConf .getBlockCache ().get ();
1912
1960
if (bc instanceof CombinedBlockCache ) {
0 commit comments