|
22 | 22 |
|
23 | 23 | import java.io.IOException;
|
24 | 24 | import java.lang.management.ManagementFactory;
|
25 |
| -import java.lang.management.MemoryUsage; |
| 25 | +import java.util.concurrent.Executor; |
| 26 | +import java.util.concurrent.ForkJoinPool; |
26 | 27 |
|
27 | 28 | import org.apache.commons.logging.Log;
|
28 | 29 | import org.apache.commons.logging.LogFactory;
|
|
45 | 46 | public class CacheConfig {
|
46 | 47 | private static final Log LOG = LogFactory.getLog(CacheConfig.class.getName());
|
47 | 48 |
|
| 49 | + /** |
| 50 | + * Configuration key to cache block policy (Lru, TinyLfu). |
| 51 | + */ |
| 52 | + public static final String HFILE_BLOCK_CACHE_POLICY_KEY = "hfile.block.cache.policy"; |
| 53 | + public static final String HFILE_BLOCK_CACHE_POLICY_DEFAULT = "LRU"; |
| 54 | + |
48 | 55 | /**
|
49 | 56 | * Configuration key to cache data blocks on write. There are separate
|
50 | 57 | * switches for bloom blocks and non-root index blocks.
|
@@ -92,19 +99,19 @@ public class CacheConfig {
|
92 | 99 | * is an in-memory map that needs to be persisted across restarts. Where to store this
|
93 | 100 | * in-memory state is what you supply here: e.g. <code>/tmp/bucketcache.map</code>.
|
94 | 101 | */
|
95 |
| - public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = |
| 102 | + public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = |
96 | 103 | "hbase.bucketcache.persistent.path";
|
97 | 104 |
|
98 | 105 | /**
|
99 | 106 | * If the bucket cache is used in league with the lru on-heap block cache (meta blocks such
|
100 | 107 | * as indices and blooms are kept in the lru blockcache and the data blocks in the
|
101 | 108 | * bucket cache).
|
102 | 109 | */
|
103 |
| - public static final String BUCKET_CACHE_COMBINED_KEY = |
| 110 | + public static final String BUCKET_CACHE_COMBINED_KEY = |
104 | 111 | "hbase.bucketcache.combinedcache.enabled";
|
105 | 112 |
|
106 | 113 | public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads";
|
107 |
| - public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = |
| 114 | + public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = |
108 | 115 | "hbase.bucketcache.writer.queuelength";
|
109 | 116 |
|
110 | 117 | /**
|
@@ -177,6 +184,7 @@ private static enum ExternalBlockCaches {
|
177 | 184 | memcached("org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache");
|
178 | 185 | // TODO(eclark): Consider more. Redis, etc.
|
179 | 186 | Class<? extends BlockCache> clazz;
|
| 187 | + @SuppressWarnings("unchecked") |
180 | 188 | ExternalBlockCaches(String clazzName) {
|
181 | 189 | try {
|
182 | 190 | clazz = (Class<? extends BlockCache>) Class.forName(clazzName);
|
@@ -511,7 +519,9 @@ public boolean shouldCacheDataCompressed() {
|
511 | 519 | * @return true if this {@link BlockCategory} should be compressed in blockcache, false otherwise
|
512 | 520 | */
|
513 | 521 | public boolean shouldCacheCompressed(BlockCategory category) {
|
514 |
| - if (!isBlockCacheEnabled()) return false; |
| 522 | + if (!isBlockCacheEnabled()) { |
| 523 | + return false; |
| 524 | + } |
515 | 525 | switch (category) {
|
516 | 526 | case DATA:
|
517 | 527 | return this.cacheDataCompressed;
|
@@ -614,28 +624,62 @@ public String toString() {
|
614 | 624 | // Clear this if in tests you'd make more than one block cache instance.
|
615 | 625 | @VisibleForTesting
|
616 | 626 | static BlockCache GLOBAL_BLOCK_CACHE_INSTANCE;
|
617 |
| - private static LruBlockCache GLOBAL_L1_CACHE_INSTANCE = null; |
618 |
| - private static BlockCache GLOBAL_L2_CACHE_INSTANCE = null; |
| 627 | + private static FirstLevelBlockCache GLOBAL_L1_CACHE_INSTANCE; |
| 628 | + private static BlockCache GLOBAL_L2_CACHE_INSTANCE; |
| 629 | + private static ForkJoinPool GLOBAL_FORKJOIN_POOL; |
619 | 630 |
|
620 | 631 | /** Boolean whether we have disabled the block cache entirely. */
|
621 | 632 | @VisibleForTesting
|
622 | 633 | static boolean blockCacheDisabled = false;
|
623 | 634 |
|
624 | 635 | /**
|
625 |
| - * @param c Configuration to use. |
626 |
| - * @return An L1 instance. Currently an instance of LruBlockCache. |
| 636 | + * @param c Configuration to use |
| 637 | + * @return An L1 instance |
| 638 | + */ |
| 639 | + public static FirstLevelBlockCache getL1(final Configuration c) { |
| 640 | + long xmx = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); |
| 641 | + long l1CacheSize = HeapMemorySizeUtil.getFirstLevelCacheSize(c, xmx); |
| 642 | + return getL1(l1CacheSize, c); |
| 643 | + } |
| 644 | + |
| 645 | + /** |
| 646 | + * @param c Configuration to use |
| 647 | + * @param xmx Max heap memory |
| 648 | + * @return An L1 instance |
627 | 649 | */
|
628 |
| - private static synchronized LruBlockCache getL1(final Configuration c) { |
| 650 | + |
| 651 | + private synchronized static FirstLevelBlockCache getL1(long cacheSize, Configuration c) { |
629 | 652 | if (GLOBAL_L1_CACHE_INSTANCE != null) return GLOBAL_L1_CACHE_INSTANCE;
|
630 |
| - final long lruCacheSize = HeapMemorySizeUtil.getLruCacheSize(c); |
631 |
| - if (lruCacheSize < 0) { |
632 |
| - blockCacheDisabled = true; |
| 653 | + if (cacheSize < 0) { |
| 654 | + return null; |
633 | 655 | }
|
634 |
| - if (blockCacheDisabled) return null; |
| 656 | + String policy = c.get(HFILE_BLOCK_CACHE_POLICY_KEY, HFILE_BLOCK_CACHE_POLICY_DEFAULT); |
635 | 657 | int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE);
|
636 |
| - LOG.info("Allocating LruBlockCache size=" + |
637 |
| - StringUtils.byteDesc(lruCacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); |
638 |
| - GLOBAL_L1_CACHE_INSTANCE = new LruBlockCache(lruCacheSize, blockSize, true, c); |
| 658 | + LOG.info("Allocating BlockCache size=" + |
| 659 | + StringUtils.byteDesc(cacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); |
| 660 | + if (policy.equalsIgnoreCase("LRU")) { |
| 661 | + GLOBAL_L1_CACHE_INSTANCE = new LruBlockCache(cacheSize, blockSize, true, c); |
| 662 | + } else if (policy.equalsIgnoreCase("TinyLFU")) { |
| 663 | + if (GLOBAL_FORKJOIN_POOL == null) { |
| 664 | + GLOBAL_FORKJOIN_POOL = new ForkJoinPool(); |
| 665 | + } |
| 666 | + Class<?> tinyLFUClass; |
| 667 | + try { |
| 668 | + tinyLFUClass = Class.forName("org.apache.hadoop.hbase.io.hfile.TinyLfuBlockCache"); |
| 669 | + GLOBAL_L1_CACHE_INSTANCE = (FirstLevelBlockCache) |
| 670 | + tinyLFUClass.getDeclaredConstructor(long.class, long.class, Executor.class, |
| 671 | + Configuration.class) |
| 672 | + .newInstance(cacheSize, blockSize, GLOBAL_FORKJOIN_POOL, c); |
| 673 | + } catch (Exception e) { |
| 674 | + throw new RuntimeException( |
| 675 | + "Unable to instantiate the TinyLfuBlockCache block cache policy." + |
| 676 | + "If you want to use TinyLFU you must build with JDK8+, run with JRE8+, and have both " + |
| 677 | + "the hbase-tinylfu-blockcache module and its dependency the caffiene library " + |
| 678 | + "installed into the classpath.", e); |
| 679 | + } |
| 680 | + } else { |
| 681 | + throw new IllegalArgumentException("Unknown block cache policy " + policy); |
| 682 | + } |
639 | 683 | return GLOBAL_L1_CACHE_INSTANCE;
|
640 | 684 | }
|
641 | 685 |
|
@@ -676,7 +720,7 @@ public CacheStats getL2Stats() {
|
676 | 720 | }
|
677 | 721 |
|
678 | 722 | private static BlockCache getExternalBlockcache(Configuration c) {
|
679 |
| - Class klass = null; |
| 723 | + Class<?> klass = null; |
680 | 724 |
|
681 | 725 | // Get the class, from the config. s
|
682 | 726 | try {
|
@@ -704,7 +748,9 @@ private static BlockCache getExternalBlockcache(Configuration c) {
|
704 | 748 | private static BlockCache getBucketCache(Configuration c) {
|
705 | 749 | // Check for L2. ioengine name must be non-null.
|
706 | 750 | String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null);
|
707 |
| - if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) return null; |
| 751 | + if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) { |
| 752 | + return null; |
| 753 | + } |
708 | 754 |
|
709 | 755 | int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE);
|
710 | 756 | final long bucketCacheSize = HeapMemorySizeUtil.getBucketCacheSize(c);
|
@@ -762,33 +808,35 @@ private static BlockCache getBucketCache(Configuration c) {
|
762 | 808 | * @return The block cache or <code>null</code>.
|
763 | 809 | */
|
764 | 810 | public static synchronized BlockCache instantiateBlockCache(Configuration conf) {
|
765 |
| - if (GLOBAL_BLOCK_CACHE_INSTANCE != null) return GLOBAL_BLOCK_CACHE_INSTANCE; |
766 |
| - if (blockCacheDisabled) return null; |
| 811 | + if (GLOBAL_BLOCK_CACHE_INSTANCE != null) { |
| 812 | + return GLOBAL_BLOCK_CACHE_INSTANCE; |
| 813 | + } |
| 814 | + if (blockCacheDisabled) { |
| 815 | + return null; |
| 816 | + } |
767 | 817 | if (conf.get(DEPRECATED_BLOCKCACHE_BLOCKSIZE_KEY) != null) {
|
768 | 818 | LOG.warn("The config key " + DEPRECATED_BLOCKCACHE_BLOCKSIZE_KEY +
|
769 | 819 | " is deprecated now, instead please use " + BLOCKCACHE_BLOCKSIZE_KEY +". "
|
770 | 820 | + "In future release we will remove the deprecated config.");
|
771 | 821 | }
|
772 |
| - LruBlockCache l1 = getL1(conf); |
773 |
| - // blockCacheDisabled is set as a side-effect of getL1Internal(), so check it again after the call. |
774 |
| - if (blockCacheDisabled) return null; |
| 822 | + FirstLevelBlockCache l1 = getL1(conf); |
775 | 823 | BlockCache l2 = getL2(conf);
|
776 | 824 | if (l2 == null) {
|
777 | 825 | GLOBAL_BLOCK_CACHE_INSTANCE = l1;
|
778 | 826 | } else {
|
779 | 827 | boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT);
|
780 |
| - boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, |
| 828 | + boolean combinedWithL1 = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, |
781 | 829 | DEFAULT_BUCKET_CACHE_COMBINED);
|
782 | 830 | if (useExternal) {
|
783 | 831 | GLOBAL_BLOCK_CACHE_INSTANCE = new InclusiveCombinedBlockCache(l1, l2);
|
784 | 832 | } else {
|
785 |
| - if (combinedWithLru) { |
| 833 | + if (combinedWithL1) { |
786 | 834 | GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2);
|
787 | 835 | } else {
|
788 |
| - // L1 and L2 are not 'combined'. They are connected via the LruBlockCache victimhandler |
789 |
| - // mechanism. It is a little ugly but works according to the following: when the |
790 |
| - // background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get |
791 |
| - // a block from the L1 cache, if not in L1, we will search L2. |
| 836 | + // L1 and L2 are not 'combined'. They are connected via the FirstLevelBlockCache |
| 837 | + // victimhandler mechanism. It is a little ugly but works according to the following: |
| 838 | + // when the background eviction thread runs, blocks evicted from L1 will go to L2 AND when |
| 839 | + // we get a block from the L1 cache, if not in L1, we will search L2. |
792 | 840 | GLOBAL_BLOCK_CACHE_INSTANCE = l1;
|
793 | 841 | }
|
794 | 842 | }
|
|
0 commit comments