|
22 | 22 |
|
23 | 23 | import java.io.IOException;
|
24 | 24 | import java.lang.management.ManagementFactory;
|
25 |
| -import java.lang.management.MemoryUsage; |
| 25 | +import java.util.concurrent.Executor; |
| 26 | +import java.util.concurrent.ForkJoinPool; |
26 | 27 |
|
27 | 28 | import org.apache.commons.logging.Log;
|
28 | 29 | import org.apache.commons.logging.LogFactory;
|
|
43 | 44 | public class CacheConfig {
|
44 | 45 | private static final Log LOG = LogFactory.getLog(CacheConfig.class.getName());
|
45 | 46 |
|
| 47 | + /** |
| 48 | + * Configuration key to cache block policy (Lru, TinyLfu). |
| 49 | + */ |
| 50 | + public static final String HFILE_BLOCK_CACHE_POLICY_KEY = "hfile.block.cache.policy"; |
| 51 | + public static final String HFILE_BLOCK_CACHE_POLICY_DEFAULT = "LRU"; |
| 52 | + |
46 | 53 | /**
|
47 | 54 | * Configuration key to cache data blocks on write. There are separate
|
48 | 55 | * switches for bloom blocks and non-root index blocks.
|
@@ -90,19 +97,19 @@ public class CacheConfig {
|
90 | 97 | * is an in-memory map that needs to be persisted across restarts. Where to store this
|
91 | 98 | * in-memory state is what you supply here: e.g. <code>/tmp/bucketcache.map</code>.
|
92 | 99 | */
|
93 |
| - public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = |
| 100 | + public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = |
94 | 101 | "hbase.bucketcache.persistent.path";
|
95 | 102 |
|
96 | 103 | /**
|
97 | 104 | * If the bucket cache is used in league with the lru on-heap block cache (meta blocks such
|
98 | 105 | * as indices and blooms are kept in the lru blockcache and the data blocks in the
|
99 | 106 | * bucket cache).
|
100 | 107 | */
|
101 |
| - public static final String BUCKET_CACHE_COMBINED_KEY = |
| 108 | + public static final String BUCKET_CACHE_COMBINED_KEY = |
102 | 109 | "hbase.bucketcache.combinedcache.enabled";
|
103 | 110 |
|
104 | 111 | public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads";
|
105 |
| - public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = |
| 112 | + public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = |
106 | 113 | "hbase.bucketcache.writer.queuelength";
|
107 | 114 |
|
108 | 115 | /**
|
@@ -175,6 +182,7 @@ private static enum ExternalBlockCaches {
|
175 | 182 | memcached("org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache");
|
176 | 183 | // TODO(eclark): Consider more. Redis, etc.
|
177 | 184 | Class<? extends BlockCache> clazz;
|
| 185 | + @SuppressWarnings("unchecked") |
178 | 186 | ExternalBlockCaches(String clazzName) {
|
179 | 187 | try {
|
180 | 188 | clazz = (Class<? extends BlockCache>) Class.forName(clazzName);
|
@@ -507,7 +515,9 @@ public boolean shouldCacheDataCompressed() {
|
507 | 515 | * @return true if this {@link BlockCategory} should be compressed in blockcache, false otherwise
|
508 | 516 | */
|
509 | 517 | public boolean shouldCacheCompressed(BlockCategory category) {
|
510 |
| - if (!isBlockCacheEnabled()) return false; |
| 518 | + if (!isBlockCacheEnabled()) { |
| 519 | + return false; |
| 520 | + } |
511 | 521 | switch (category) {
|
512 | 522 | case DATA:
|
513 | 523 | return this.cacheDataCompressed;
|
@@ -609,27 +619,61 @@ public String toString() {
|
609 | 619 | */
|
610 | 620 | // Clear this if in tests you'd make more than one block cache instance.
|
611 | 621 | static BlockCache GLOBAL_BLOCK_CACHE_INSTANCE;
|
612 |
| - private static LruBlockCache GLOBAL_L1_CACHE_INSTANCE = null; |
613 |
| - private static BlockCache GLOBAL_L2_CACHE_INSTANCE = null; |
| 622 | + private static FirstLevelBlockCache GLOBAL_L1_CACHE_INSTANCE; |
| 623 | + private static BlockCache GLOBAL_L2_CACHE_INSTANCE; |
| 624 | + private static ForkJoinPool GLOBAL_FORKJOIN_POOL; |
614 | 625 |
|
615 | 626 | /** Boolean whether we have disabled the block cache entirely. */
|
616 | 627 | static boolean blockCacheDisabled = false;
|
617 | 628 |
|
618 | 629 | /**
|
619 |
| - * @param c Configuration to use. |
620 |
| - * @return An L1 instance. Currently an instance of LruBlockCache. |
| 630 | + * @param c Configuration to use |
| 631 | + * @return An L1 instance |
| 632 | + */ |
| 633 | + public static FirstLevelBlockCache getL1(final Configuration c) { |
| 634 | + long xmx = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); |
| 635 | + long l1CacheSize = HeapMemorySizeUtil.getFirstLevelCacheSize(c, xmx); |
| 636 | + return getL1(l1CacheSize, c); |
| 637 | + } |
| 638 | + |
| 639 | + /** |
| 640 | + * @param c Configuration to use |
| 641 | + * @param xmx Max heap memory |
| 642 | + * @return An L1 instance |
621 | 643 | */
|
622 |
| - private static synchronized LruBlockCache getL1(final Configuration c) { |
| 644 | + |
| 645 | + private synchronized static FirstLevelBlockCache getL1(long cacheSize, Configuration c) { |
623 | 646 | if (GLOBAL_L1_CACHE_INSTANCE != null) return GLOBAL_L1_CACHE_INSTANCE;
|
624 |
| - final long lruCacheSize = HeapMemorySizeUtil.getLruCacheSize(c); |
625 |
| - if (lruCacheSize < 0) { |
626 |
| - blockCacheDisabled = true; |
| 647 | + if (cacheSize < 0) { |
| 648 | + return null; |
627 | 649 | }
|
628 |
| - if (blockCacheDisabled) return null; |
| 650 | + String policy = c.get(HFILE_BLOCK_CACHE_POLICY_KEY, HFILE_BLOCK_CACHE_POLICY_DEFAULT); |
629 | 651 | int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE);
|
630 |
| - LOG.info("Allocating LruBlockCache size=" + |
631 |
| - StringUtils.byteDesc(lruCacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); |
632 |
| - GLOBAL_L1_CACHE_INSTANCE = new LruBlockCache(lruCacheSize, blockSize, true, c); |
| 652 | + LOG.info("Allocating BlockCache size=" + |
| 653 | + StringUtils.byteDesc(cacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); |
| 654 | + if (policy.equalsIgnoreCase("LRU")) { |
| 655 | + GLOBAL_L1_CACHE_INSTANCE = new LruBlockCache(cacheSize, blockSize, true, c); |
| 656 | + } else if (policy.equalsIgnoreCase("TinyLFU")) { |
| 657 | + if (GLOBAL_FORKJOIN_POOL == null) { |
| 658 | + GLOBAL_FORKJOIN_POOL = new ForkJoinPool(); |
| 659 | + } |
| 660 | + Class<?> tinyLFUClass; |
| 661 | + try { |
| 662 | + tinyLFUClass = Class.forName("org.apache.hadoop.hbase.io.hfile.TinyLfuBlockCache"); |
| 663 | + GLOBAL_L1_CACHE_INSTANCE = (FirstLevelBlockCache) |
| 664 | + tinyLFUClass.getDeclaredConstructor(long.class, long.class, Executor.class, |
| 665 | + Configuration.class) |
| 666 | + .newInstance(cacheSize, blockSize, GLOBAL_FORKJOIN_POOL, c); |
| 667 | + } catch (Exception e) { |
| 668 | + throw new RuntimeException( |
| 669 | + "Unable to instantiate the TinyLfuBlockCache block cache policy." + |
| 670 | + "If you want to use TinyLFU you must build with JDK8+, run with JRE8+, and have both " + |
| 671 | + "the hbase-tinylfu-blockcache module and its dependency the caffiene library " + |
| 672 | + "installed into the classpath.", e); |
| 673 | + } |
| 674 | + } else { |
| 675 | + throw new IllegalArgumentException("Unknown block cache policy " + policy); |
| 676 | + } |
633 | 677 | return GLOBAL_L1_CACHE_INSTANCE;
|
634 | 678 | }
|
635 | 679 |
|
@@ -669,7 +713,7 @@ public CacheStats getL2Stats() {
|
669 | 713 | }
|
670 | 714 |
|
671 | 715 | private static BlockCache getExternalBlockcache(Configuration c) {
|
672 |
| - Class klass = null; |
| 716 | + Class<?> klass = null; |
673 | 717 |
|
674 | 718 | // Get the class, from the config. s
|
675 | 719 | try {
|
@@ -697,7 +741,9 @@ private static BlockCache getExternalBlockcache(Configuration c) {
|
697 | 741 | private static BlockCache getBucketCache(Configuration c) {
|
698 | 742 | // Check for L2. ioengine name must be non-null.
|
699 | 743 | String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null);
|
700 |
| - if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) return null; |
| 744 | + if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) { |
| 745 | + return null; |
| 746 | + } |
701 | 747 |
|
702 | 748 | int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE);
|
703 | 749 | final long bucketCacheSize = HeapMemorySizeUtil.getBucketCacheSize(c);
|
@@ -755,33 +801,35 @@ private static BlockCache getBucketCache(Configuration c) {
|
755 | 801 | * @return The block cache or <code>null</code>.
|
756 | 802 | */
|
757 | 803 | public static synchronized BlockCache instantiateBlockCache(Configuration conf) {
|
758 |
| - if (GLOBAL_BLOCK_CACHE_INSTANCE != null) return GLOBAL_BLOCK_CACHE_INSTANCE; |
759 |
| - if (blockCacheDisabled) return null; |
| 804 | + if (GLOBAL_BLOCK_CACHE_INSTANCE != null) { |
| 805 | + return GLOBAL_BLOCK_CACHE_INSTANCE; |
| 806 | + } |
| 807 | + if (blockCacheDisabled) { |
| 808 | + return null; |
| 809 | + } |
760 | 810 | if (conf.get(DEPRECATED_BLOCKCACHE_BLOCKSIZE_KEY) != null) {
|
761 | 811 | LOG.warn("The config key " + DEPRECATED_BLOCKCACHE_BLOCKSIZE_KEY +
|
762 | 812 | " is deprecated now, instead please use " + BLOCKCACHE_BLOCKSIZE_KEY +". "
|
763 | 813 | + "In future release we will remove the deprecated config.");
|
764 | 814 | }
|
765 |
| - LruBlockCache l1 = getL1(conf); |
766 |
| - // blockCacheDisabled is set as a side-effect of getL1Internal(), so check it again after the call. |
767 |
| - if (blockCacheDisabled) return null; |
| 815 | + FirstLevelBlockCache l1 = getL1(conf); |
768 | 816 | BlockCache l2 = getL2(conf);
|
769 | 817 | if (l2 == null) {
|
770 | 818 | GLOBAL_BLOCK_CACHE_INSTANCE = l1;
|
771 | 819 | } else {
|
772 | 820 | boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT);
|
773 |
| - boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, |
| 821 | + boolean combinedWithL1 = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, |
774 | 822 | DEFAULT_BUCKET_CACHE_COMBINED);
|
775 | 823 | if (useExternal) {
|
776 | 824 | GLOBAL_BLOCK_CACHE_INSTANCE = new InclusiveCombinedBlockCache(l1, l2);
|
777 | 825 | } else {
|
778 |
| - if (combinedWithLru) { |
| 826 | + if (combinedWithL1) { |
779 | 827 | GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2);
|
780 | 828 | } else {
|
781 |
| - // L1 and L2 are not 'combined'. They are connected via the LruBlockCache victimhandler |
782 |
| - // mechanism. It is a little ugly but works according to the following: when the |
783 |
| - // background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get |
784 |
| - // a block from the L1 cache, if not in L1, we will search L2. |
| 829 | + // L1 and L2 are not 'combined'. They are connected via the FirstLevelBlockCache |
| 830 | + // victimhandler mechanism. It is a little ugly but works according to the following: |
| 831 | + // when the background eviction thread runs, blocks evicted from L1 will go to L2 AND when |
| 832 | + // we get a block from the L1 cache, if not in L1, we will search L2. |
785 | 833 | GLOBAL_BLOCK_CACHE_INSTANCE = l1;
|
786 | 834 | l1.setVictimCache(l2);
|
787 | 835 | }
|
|
0 commit comments