From b6ab4ef654ee148a8b9cddf89554e1f46a5759f7 Mon Sep 17 00:00:00 2001 From: mohammadshahidkhan Date: Wed, 5 Oct 2016 10:15:09 +0530 Subject: [PATCH 1/2] [CARBONDATA-484] Implement LRU cache for B-Tree + fixed impacted test cases --- .../carbondata/core/cache/CacheProvider.java | 69 ++- .../carbondata/core/cache/CacheType.java | 16 + .../carbondata/core/cache/CarbonLRUCache.java | 81 ++-- .../AbstractBlockIndexStoreCache.java | 117 +++++ .../carbon/datastore/BlockIndexStore.java | 456 ++++++++---------- .../datastore/SegmentTaskIndexStore.java | 344 ++++++------- .../TableSegmentUniqueIdentifier.java | 136 ++++++ .../carbon/datastore/block/AbstractIndex.java | 70 ++- .../carbon/datastore/block/BlockInfo.java | 27 ++ .../block/SegmentTaskIndexWrapper.java | 121 +++++ .../block/TableBlockUniqueIdentifier.java | 72 +++ .../core/carbon/path/CarbonTablePath.java | 15 +- .../core/constants/CarbonCommonConstants.java | 17 +- .../carbondata/core/util/CarbonUtil.java | 65 +++ .../executor/impl/AbstractQueryExecutor.java | 33 +- .../AbstractDetailQueryResultIterator.java | 1 + .../core/cache/CacheProviderTest.java | 2 +- .../ForwardDictionaryCacheTest.java | 2 +- .../ReverseDictionaryCacheTest.java | 2 +- .../datastore/SegmentTaskIndexStoreTest.java | 37 +- .../carbon/datastore/block/BlockInfoTest.java | 16 +- .../carbondata/core/util/CarbonUtilTest.java | 2 +- .../util/DataFileFooterConverterTest.java | 4 +- .../apache/carbondata/hadoop/CacheClient.java | 96 ++++ .../carbondata/hadoop/CarbonInputFormat.java | 50 +- .../index/impl/InMemoryBTreeIndex.java | 45 +- .../spark/rdd/CarbonMergerRDD.scala | 3 - .../org/apache/spark/sql/CarbonEnv.scala | 3 + .../MajorCompactionIgnoreInMinorTest.scala | 21 +- .../carbon/datastore/BlockIndexStoreTest.java | 78 ++- 30 files changed, 1443 insertions(+), 558 deletions(-) create mode 100644 core/src/main/java/org/apache/carbondata/core/carbon/datastore/AbstractBlockIndexStoreCache.java create mode 100644 core/src/main/java/org/apache/carbondata/core/carbon/datastore/TableSegmentUniqueIdentifier.java create mode 100644 core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentTaskIndexWrapper.java create mode 100644 core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockUniqueIdentifier.java create mode 100644 hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java index fa505bf14cb..7d92ca27edf 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java @@ -22,11 +22,20 @@ import java.util.HashMap; import java.util.Map; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.cache.dictionary.ForwardDictionaryCache; import org.apache.carbondata.core.cache.dictionary.ReverseDictionaryCache; +import org.apache.carbondata.core.carbon.datastore.BlockIndexStore; +import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore; +import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier; +import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; +import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper; +import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; /** * Cache provider class which will create a cache based on given type @@ -45,15 +54,19 @@ public class CacheProvider { new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); /** - * a map that will hold the mapping of cache type to LRU cache instance + * object lock instance to be used in synchronization block */ - private Map cacheTypeToLRUCacheMap = - new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + private final Object lock = new Object(); + /** + * LRU cache instance + */ + private CarbonLRUCache carbonLRUCache; /** - * object lock instance to be used in synchronization block + * instance for CacheProvider LOGGER */ - private final Object lock = new Object(); + private static final LogService LOGGER = + LogServiceFactory.getLogService(CacheProvider.class.getName()); /** * private constructor to follow singleton design pattern for this class @@ -85,7 +98,7 @@ public Cache createCache(CacheType cacheType, String carbonStorePat if (!dictionaryCacheAlreadyExists(cacheType)) { synchronized (lock) { if (!dictionaryCacheAlreadyExists(cacheType)) { - if (null == cacheTypeToLRUCacheMap.get(cacheType)) { + if (null == carbonLRUCache) { createLRULevelCacheInstance(cacheType); } createDictionaryCacheForGivenType(cacheType, carbonStorePath); @@ -106,11 +119,18 @@ private void createDictionaryCacheForGivenType(CacheType cacheType, String carbo if (cacheType.equals(CacheType.REVERSE_DICTIONARY)) { cacheObject = new ReverseDictionaryCache(carbonStorePath, - cacheTypeToLRUCacheMap.get(cacheType)); + carbonLRUCache); } else if (cacheType.equals(CacheType.FORWARD_DICTIONARY)) { cacheObject = new ForwardDictionaryCache(carbonStorePath, - cacheTypeToLRUCacheMap.get(cacheType)); + carbonLRUCache); + } else if (cacheType.equals(cacheType.EXECUTOR_BTREE)) { + cacheObject = new BlockIndexStore(carbonStorePath, + carbonLRUCache); + } else if (cacheType.equals(cacheType.DRIVER_BTREE)) { + cacheObject = + new SegmentTaskIndexStore( + carbonStorePath, carbonLRUCache); } cacheTypeToCacheMap.put(cacheType, cacheObject); } @@ -121,15 +141,25 @@ private void createDictionaryCacheForGivenType(CacheType cacheType, String carbo * @param cacheType */ private void createLRULevelCacheInstance(CacheType cacheType) { - CarbonLRUCache carbonLRUCache = null; - // if cache type is dictionary cache, then same lru cache instance has to be shared - // between forward and reverse cache - if (cacheType.equals(CacheType.REVERSE_DICTIONARY) || cacheType - .equals(CacheType.FORWARD_DICTIONARY)) { - carbonLRUCache = new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_LEVEL_CACHE_SIZE, - CarbonCommonConstants.CARBON_MAX_LEVEL_CACHE_SIZE_DEFAULT); - cacheTypeToLRUCacheMap.put(CacheType.REVERSE_DICTIONARY, carbonLRUCache); - cacheTypeToLRUCacheMap.put(CacheType.FORWARD_DICTIONARY, carbonLRUCache); + boolean isDriver = Boolean.parseBoolean(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "false")); + if (isDriver) { + carbonLRUCache = new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE, + CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT); + } else { + // if executor cache size is not configured then driver cache conf will be used + String executorCacheSize = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE); + if (null != executorCacheSize) { + carbonLRUCache = + new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE, + CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT); + } else { + LOGGER.info( + "Executor LRU cache size not configured. Initializing with driver LRU cache size."); + carbonLRUCache = new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE, + CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT); + } } } @@ -148,7 +178,10 @@ private boolean dictionaryCacheAlreadyExists(CacheType cacheType) { * Below method will be used to clear the cache */ public void dropAllCache() { - cacheTypeToLRUCacheMap.clear(); + if(null != carbonLRUCache) { + carbonLRUCache.clear(); + carbonLRUCache= null; + } cacheTypeToCacheMap.clear(); } } diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java index ea511e94dd4..5cc0282ebcb 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java @@ -21,6 +21,10 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; +import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier; +import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; +import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper; +import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier; /** * class which defines different cache types. cache type can be dictionary cache for @@ -41,6 +45,18 @@ public class CacheType { public static final CacheType REVERSE_DICTIONARY = new CacheType("reverse_dictionary"); + /** + * Executor BTree cache which maintains size of BTree metadata + */ + public static final CacheType EXECUTOR_BTREE = + new CacheType("executor_btree"); + + /** + * Executor BTree cache which maintains size of BTree metadata + */ + public static final CacheType + DRIVER_BTREE = new CacheType("driver_btree"); + /** * cacheName which is unique name for a cache */ diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java index 4ba38e4f366..ca77c9cbd8d 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java @@ -70,11 +70,11 @@ public CarbonLRUCache(String propertyName, String defaultPropertyName) { } initCache(); if (lruCacheMemorySize > 0) { - LOGGER.info("Configured level cahce size is " + lruCacheMemorySize + " MB"); + LOGGER.info("Configured LRU cache size is " + lruCacheMemorySize + " MB"); // convert in bytes lruCacheMemorySize = lruCacheMemorySize * BYTE_CONVERSION_CONSTANT; } else { - LOGGER.info("Column cache size not configured. Therefore default behavior will be " + LOGGER.info("LRU cache size not configured. Therefore default behavior will be " + "considered and no LRU based eviction of columns will be done"); } } @@ -159,8 +159,10 @@ private void removeKey(String key) { if (null != cacheable) { currentSize = currentSize - cacheable.getMemorySize(); } - lruCacheMap.remove(key); - LOGGER.info("Removed level entry from InMemory level lru cache :: " + key); + Cacheable remove = lruCacheMap.remove(key); + if(null != remove) { + LOGGER.info("Removed entry from InMemory lru cache :: " + key); + } } /** @@ -171,24 +173,53 @@ private void removeKey(String key) { * @param cacheInfo */ public boolean put(String columnIdentifier, Cacheable cacheInfo, long requiredSize) { + LOGGER.debug("Required size for entry " + columnIdentifier + " :: " + requiredSize + + " Current cache size :: " + currentSize); boolean columnKeyAddedSuccessfully = false; - if (freeMemorySizeForAddingCache(requiredSize)) { + if (isLRUCacheSizeConfigured()) { synchronized (lruCacheMap) { - currentSize = currentSize + requiredSize; - if (null == lruCacheMap.get(columnIdentifier)) { - lruCacheMap.put(columnIdentifier, cacheInfo); + if (freeMemorySizeForAddingCache(requiredSize)) { + currentSize = currentSize + requiredSize; + addEntryToLRUCacheMap(columnIdentifier, cacheInfo); + columnKeyAddedSuccessfully = true; + } else { + LOGGER.error( + "Size not available. Entry cannot be added to lru cache :: " + columnIdentifier + + " .Required Size = " + requiredSize + " Size available " + (lruCacheMemorySize + - currentSize)); } - columnKeyAddedSuccessfully = true; } - LOGGER.debug("Added level entry to InMemory level lru cache :: " + columnIdentifier); } else { - LOGGER.error("Size not available. Column cannot be added to level lru cache :: " - + columnIdentifier + " .Required Size = " + requiredSize + " Size available " - + (lruCacheMemorySize - currentSize)); + synchronized (lruCacheMap) { + addEntryToLRUCacheMap(columnIdentifier, cacheInfo); + } + columnKeyAddedSuccessfully = true; } return columnKeyAddedSuccessfully; } + /** + * The method will add the cache entry to LRU cache map + * + * @param columnIdentifier + * @param cacheInfo + */ + private void addEntryToLRUCacheMap(String columnIdentifier, Cacheable cacheInfo) { + if (null == lruCacheMap.get(columnIdentifier)) { + lruCacheMap.put(columnIdentifier, cacheInfo); + } + LOGGER.debug("Added entry to InMemory lru cache :: " + columnIdentifier); + } + + /** + * this will check whether the LRU cache size is configured + * + * @return value + */ + private boolean isLRUCacheSizeConfigured() { + return lruCacheMemorySize > 0; + } + /** * This method will check a required column can be loaded into memory or not. If required * this method will call for eviction of existing data from memory @@ -198,24 +229,18 @@ public boolean put(String columnIdentifier, Cacheable cacheInfo, long requiredSi */ private boolean freeMemorySizeForAddingCache(long requiredSize) { boolean memoryAvailable = false; - if (lruCacheMemorySize > 0) { + if (isSizeAvailableToLoadColumnDictionary(requiredSize)) { + memoryAvailable = true; + } else { + // get the keys that can be removed from memory + List keysToBeRemoved = getKeysToBeRemoved(requiredSize); + for (String cacheKey : keysToBeRemoved) { + removeKey(cacheKey); + } + // after removing the keys check again if required size is available if (isSizeAvailableToLoadColumnDictionary(requiredSize)) { memoryAvailable = true; - } else { - synchronized (lruCacheMap) { - // get the keys that can be removed from memory - List keysToBeRemoved = getKeysToBeRemoved(requiredSize); - for (String cacheKey : keysToBeRemoved) { - removeKey(cacheKey); - } - // after removing the keys check again if required size is available - if (isSizeAvailableToLoadColumnDictionary(requiredSize)) { - memoryAvailable = true; - } - } } - } else { - memoryAvailable = true; } return memoryAvailable; } diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/AbstractBlockIndexStoreCache.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/AbstractBlockIndexStoreCache.java new file mode 100644 index 00000000000..c700a06ed74 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/AbstractBlockIndexStoreCache.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.carbon.datastore; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; +import org.apache.carbondata.core.carbon.datastore.block.BlockInfo; +import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier; +import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.CarbonUtilException; + +/** + * This class validate and load the B-Tree in the executor lru cache + * @param cache key + * @param Block Meta data details + */ +public abstract class AbstractBlockIndexStoreCache + implements Cache { + /** + * carbon store path + */ + protected String carbonStorePath; + /** + * CarbonLRU cache + */ + protected CarbonLRUCache lruCache; + + /** + * table segment id vs blockInfo list + */ + protected Map> segmentIdToBlockListMap; + + + /** + * map of block info to lock object map, while loading the btree this will be filled + * and removed after loading the tree for that particular block info, this will be useful + * while loading the tree concurrently so only block level lock will be applied another + * block can be loaded concurrently + */ + protected Map blockInfoLock; + + /** + * The object will hold the segment ID lock so that at a time only 1 block that belongs to same + * segment & table can create the list for holding the block info + */ + protected Map segmentIDLock; + + public AbstractBlockIndexStoreCache(String carbonStorePath, CarbonLRUCache lruCache) { + this.carbonStorePath = carbonStorePath; + this.lruCache = lruCache; + blockInfoLock = new ConcurrentHashMap(); + segmentIDLock= new ConcurrentHashMap(); + segmentIdToBlockListMap = new ConcurrentHashMap<>(); + } + + /** + * This method will get the value for the given key. If value does not exist + * for the given key, it will check and load the value. + * + * @param tableBlock + * @param tableBlockUniqueIdentifier + * @param lruCacheKey + */ + protected void checkAndLoadTableBlocks(AbstractIndex tableBlock, + TableBlockUniqueIdentifier tableBlockUniqueIdentifier, String lruCacheKey) + throws CarbonUtilException { + // calculate the required size is + TableBlockInfo blockInfo = tableBlockUniqueIdentifier.getTableBlockInfo(); + long requiredMetaSize = CarbonUtil + .calculateMetaSize(blockInfo.getFilePath(), blockInfo.getBlockOffset(), + blockInfo.getBlockLength()); + if (requiredMetaSize > 0) { + tableBlock.setMemorySize(requiredMetaSize); + tableBlock.incrementAccessCount(); + boolean isTableBlockAddedToLruCache = lruCache.put(lruCacheKey, tableBlock, requiredMetaSize); + // if column is successfully added to lru cache then only load the + // table blocks data + if (isTableBlockAddedToLruCache) { + // load table blocks data + // getting the data file meta data of the block + DataFileFooter footer = CarbonUtil + .readMetadatFile(blockInfo); + footer.setBlockInfo(new BlockInfo(blockInfo)); + // building the block + tableBlock.buildIndex(Arrays.asList(footer)); + } else { + throw new CarbonUtilException( + "Cannot load table blocks into memory. Not enough memory available"); + } + } + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java index d7ba318ccf1..9b5818fa063 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java @@ -16,113 +16,152 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.carbondata.core.carbon.datastore; import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.CarbonLRUCache; import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; +import org.apache.carbondata.core.carbon.CarbonTableIdentifier; import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; import org.apache.carbondata.core.carbon.datastore.block.BlockIndex; import org.apache.carbondata.core.carbon.datastore.block.BlockInfo; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier; import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException; -import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.CarbonUtilException; /** - * Singleton Class to handle loading, unloading,clearing,storing of the table - * blocks + * This class is used to load the B-Tree in Executor LRU Cache */ -public class BlockIndexStore { - - /** - * singleton instance - */ - private static final BlockIndexStore CARBONTABLEBLOCKSINSTANCE = new BlockIndexStore(); - - /** - * map to hold the table and its list of blocks - */ - private Map> tableBlocksMap; - - /** - * map to maintain segment id to block info map, this map will be used to - * while removing the block from memory when segment is compacted or deleted - */ - private Map>> segmentIdToBlockListMap; - - /** - * map of block info to lock object map, while loading the btree this will be filled - * and removed after loading the tree for that particular block info, this will be useful - * while loading the tree concurrently so only block level lock will be applied another - * block can be loaded concurrently - */ - private Map blockInfoLock; +public class BlockIndexStore extends AbstractBlockIndexStoreCache { /** - * table and its lock object to this will be useful in case of concurrent - * query scenario when more than one query comes for same table and in that - * case it will ensure that only one query will able to load the blocks + * LOGGER instance */ - private Map tableLockMap; + private static final LogService LOGGER = + LogServiceFactory.getLogService(BlockIndexStore.class.getName()); + public BlockIndexStore(String carbonStorePath, CarbonLRUCache lruCache) { + super(carbonStorePath, lruCache); + } /** - * block info to future task mapping - * useful when blocklet distribution is enabled and - * same block is loaded by multiple thread + * The method loads the block meta in B-tree lru cache and returns the block meta. + * + * @param tableBlockUniqueIdentifier Uniquely identifies the block + * @return returns the blocks B-Tree meta + * @throws CarbonUtilException */ - private Map> mapOfBlockInfoToFuture; + @Override public AbstractIndex get(TableBlockUniqueIdentifier tableBlockUniqueIdentifier) + throws CarbonUtilException { + TableBlockInfo tableBlockInfo = tableBlockUniqueIdentifier.getTableBlockInfo(); + BlockInfo blockInfo = new BlockInfo(tableBlockInfo); + String lruCacheKey = + getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo); + AbstractIndex tableBlock = (AbstractIndex) lruCache.get(lruCacheKey); - private BlockIndexStore() { - tableBlocksMap = new ConcurrentHashMap>( - CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - tableLockMap = new ConcurrentHashMap( - CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - blockInfoLock = new ConcurrentHashMap(); - segmentIdToBlockListMap = new ConcurrentHashMap<>(); - mapOfBlockInfoToFuture = new ConcurrentHashMap<>(); + // if block is not loaded + if (null == tableBlock) { + // check any lock object is present in + // block info lock map + Object blockInfoLockObject = blockInfoLock.get(blockInfo); + // if lock object is not present then acquire + // the lock in block info lock and add a lock object in the map for + // particular block info, added double checking mechanism to add the lock + // object so in case of concurrent query we for same block info only one lock + // object will be added + if (null == blockInfoLockObject) { + synchronized (blockInfoLock) { + // again checking the block info lock, to check whether lock object is present + // or not if now also not present then add a lock object + blockInfoLockObject = blockInfoLock.get(blockInfo); + if (null == blockInfoLockObject) { + blockInfoLockObject = new Object(); + blockInfoLock.put(blockInfo, blockInfoLockObject); + } + } + } + //acquire the lock for particular block info + synchronized (blockInfoLockObject) { + // check again whether block is present or not to avoid the + // same block is loaded + //more than once in case of concurrent query + tableBlock = (AbstractIndex) lruCache.get( + getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo)); + // if still block is not present then load the block + if (null == tableBlock) { + tableBlock = loadBlock(tableBlockUniqueIdentifier); + fillSegmentIdToBlockListMap(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), + blockInfo); + } + } + } else { + tableBlock.incrementAccessCount(); + } + return tableBlock; } /** - * Return the instance of this class - * - * @return singleton instance + * @param absoluteTableIdentifier + * @param blockInfo */ - public static BlockIndexStore getInstance() { - return CARBONTABLEBLOCKSINSTANCE; + private void fillSegmentIdToBlockListMap(AbsoluteTableIdentifier absoluteTableIdentifier, + BlockInfo blockInfo) { + TableSegmentUniqueIdentifier segmentIdentifier = + new TableSegmentUniqueIdentifier(absoluteTableIdentifier, + blockInfo.getTableBlockInfo().getSegmentId()); + String uniqueTableSegmentIdentifier = segmentIdentifier.getUniqueTableSegmentIdentifier(); + List blockInfos = + segmentIdToBlockListMap.get(uniqueTableSegmentIdentifier); + if (null == blockInfos) { + Object segmentLockObject = segmentIDLock.get(uniqueTableSegmentIdentifier); + if (null == segmentLockObject) { + synchronized (segmentIDLock) { + segmentLockObject = segmentIDLock.get(uniqueTableSegmentIdentifier); + if (null == segmentLockObject) { + segmentLockObject = new Object(); + segmentIDLock.put(uniqueTableSegmentIdentifier, segmentLockObject); + } + } + } + synchronized (segmentLockObject) { + blockInfos = + segmentIdToBlockListMap.get(segmentIdentifier.getUniqueTableSegmentIdentifier()); + if (null == blockInfos) { + blockInfos = new CopyOnWriteArrayList<>(); + segmentIdToBlockListMap.put(uniqueTableSegmentIdentifier, blockInfos); + } + blockInfos.add(blockInfo); + } + } else { + blockInfos.add(blockInfo); + } } /** - * below method will be used to load the block which are not loaded and to - * get the loaded blocks if all the blocks which are passed is loaded then - * it will not load , else it will load. + * The method takes list of tableblocks as input and load them in btree lru cache + * and returns the list of data blocks meta * - * @param tableBlocksInfos list of blocks to be loaded - * @param absoluteTableIdentifier absolute Table Identifier to identify the table - * @throws IndexBuilderException + * @param tableBlocksInfos List of unique table blocks + * @return List + * @throws CarbonUtilException */ - public List loadAndGetBlocks(List tableBlocksInfos, - AbsoluteTableIdentifier absoluteTableIdentifier) throws IndexBuilderException { + @Override public List getAll(List tableBlocksInfos) + throws CarbonUtilException { AbstractIndex[] loadedBlock = new AbstractIndex[tableBlocksInfos.size()]; - addTableLockObject(absoluteTableIdentifier); - - // get the instance - Object lockObject = tableLockMap.get(absoluteTableIdentifier); - Map tableBlockMapTemp = null; int numberOfCores = 1; try { numberOfCores = Integer.parseInt(CarbonProperties.getInstance() @@ -132,110 +171,59 @@ public List loadAndGetBlocks(List tableBlocksInfo numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); } ExecutorService executor = Executors.newFixedThreadPool(numberOfCores); - // Acquire the lock to ensure only one query is loading the table blocks - // if same block is assigned to both the queries - List blockInfosNeedToLoad = null; - synchronized (lockObject) { - tableBlockMapTemp = tableBlocksMap.get(absoluteTableIdentifier); - // if it is loading for first time - if (null == tableBlockMapTemp) { - tableBlockMapTemp = new ConcurrentHashMap(); - tableBlocksMap.put(absoluteTableIdentifier, tableBlockMapTemp); - } - blockInfosNeedToLoad = fillSegmentIdToTableInfoMap(tableBlocksInfos, absoluteTableIdentifier); - } - AbstractIndex tableBlock = null; - int counter = -1; - for (BlockInfo blockInfo : blockInfosNeedToLoad) { - counter++; - // if table block is already loaded then do not load - // that block - tableBlock = tableBlockMapTemp.get(blockInfo); - // if block is not loaded - if (null == tableBlock) { - // check any lock object is present in - // block info lock map - Object blockInfoLockObject = blockInfoLock.get(blockInfo); - // if lock object is not present then acquire - // the lock in block info lock and add a lock object in the map for - // particular block info, added double checking mechanism to add the lock - // object so in case of concurrent query we for same block info only one lock - // object will be added - if (null == blockInfoLockObject) { - synchronized (blockInfoLock) { - // again checking the block info lock, to check whether lock object is present - // or not if now also not present then add a lock object - blockInfoLockObject = blockInfoLock.get(blockInfo); - if (null == blockInfoLockObject) { - blockInfoLockObject = new Object(); - blockInfoLock.put(blockInfo, blockInfoLockObject); - } - } - } - //acquire the lock for particular block info - synchronized (blockInfoLockObject) { - // check again whether block is present or not to avoid the - // same block is loaded - //more than once in case of concurrent query - tableBlock = tableBlockMapTemp.get(blockInfo); - // if still block is not present then load the block - if (null == tableBlock) { - if (null == mapOfBlockInfoToFuture.get(blockInfo)) { - mapOfBlockInfoToFuture.put(blockInfo, executor - .submit(new BlockLoaderThread(blockInfo, tableBlockMapTemp))); - } - } else { - loadedBlock[counter] = tableBlock; - } - } - } else { - // if blocks is already loaded then directly set the block at particular position - //so block will be present in sorted order - loadedBlock[counter] = tableBlock; - } + List> blocksList = new ArrayList>(); + for (TableBlockUniqueIdentifier tableBlockUniqueIdentifier : tableBlocksInfos) { + blocksList.add(executor.submit(new BlockLoaderThread(tableBlockUniqueIdentifier))); } // shutdown the executor gracefully and wait until all the task is finished executor.shutdown(); try { executor.awaitTermination(1, TimeUnit.HOURS); } catch (InterruptedException e) { - throw new IndexBuilderException(e); + IndexBuilderException indexBuilderException = new IndexBuilderException(e); + throw new CarbonUtilException(indexBuilderException.getMessage(), indexBuilderException); } // fill the block which were not loaded before to loaded blocks array - fillLoadedBlocks(loadedBlock, blockInfosNeedToLoad); + fillLoadedBlocks(loadedBlock, blocksList); return Arrays.asList(loadedBlock); } + private String getLruCacheKey(AbsoluteTableIdentifier absoluteTableIdentifier, + BlockInfo blockInfo) { + CarbonTableIdentifier carbonTableIdentifier = + absoluteTableIdentifier.getCarbonTableIdentifier(); + return carbonTableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + + carbonTableIdentifier.getTableName() + CarbonCommonConstants.UNDERSCORE + + carbonTableIdentifier.getTableId() + CarbonCommonConstants.FILE_SEPARATOR + blockInfo + .getBlockUniqueName(); + } + /** - * Below method will be used to fill segment id to its block mapping map. - * it will group all the table block info based on segment id and it will fill + * method returns the B-Tree meta * - * @param tableBlockInfos table block infos - * @param absoluteTableIdentifier absolute table identifier + * @param tableBlockUniqueIdentifier Unique table block info + * @return */ - private List fillSegmentIdToTableInfoMap(List tableBlockInfos, - AbsoluteTableIdentifier absoluteTableIdentifier) { - Map> map = segmentIdToBlockListMap.get(absoluteTableIdentifier); - if (null == map) { - map = new ConcurrentHashMap>(); - segmentIdToBlockListMap.put(absoluteTableIdentifier, map); + @Override public AbstractIndex getIfPresent( + TableBlockUniqueIdentifier tableBlockUniqueIdentifier) { + BlockInfo blockInfo = new BlockInfo(tableBlockUniqueIdentifier.getTableBlockInfo()); + BlockIndex cacheable = (BlockIndex) lruCache + .get(getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo)); + if (null != cacheable) { + cacheable.incrementAccessCount(); } - BlockInfo temp = null; - List blockInfosNeedToLoad = new ArrayList<>(); + return cacheable; + } - for (TableBlockInfo info : tableBlockInfos) { - List tempTableBlockInfos = map.get(info.getSegmentId()); - if (null == tempTableBlockInfos) { - tempTableBlockInfos = new ArrayList<>(); - map.put(info.getSegmentId(), tempTableBlockInfos); - } - temp = new BlockInfo(info); - if (!tempTableBlockInfos.contains(temp)) { - tempTableBlockInfos.add(temp); - } - blockInfosNeedToLoad.add(temp); - } - return blockInfosNeedToLoad; + /** + * the method removes the entry from cache. + * + * @param tableBlockUniqueIdentifier + */ + @Override public void invalidate(TableBlockUniqueIdentifier tableBlockUniqueIdentifier) { + BlockInfo blockInfo = new BlockInfo(tableBlockUniqueIdentifier.getTableBlockInfo()); + lruCache + .remove(getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo)); } /** @@ -244,124 +232,96 @@ private List fillSegmentIdToTableInfoMap(List tableBl * * @param loadedBlockArray array of blocks which will be filled * @param blocksList blocks loaded in thread - * @throws IndexBuilderException in case of any failure + * @throws CarbonUtilException in case of any failure */ - private void fillLoadedBlocks(AbstractIndex[] loadedBlockArray, List blockInfos) - throws IndexBuilderException { + private void fillLoadedBlocks(AbstractIndex[] loadedBlockArray, + List> blocksList) throws CarbonUtilException { + int blockCounter = 0; + boolean exceptionOccurred = false; + Throwable exceptionRef = null; for (int i = 0; i < loadedBlockArray.length; i++) { - if (null == loadedBlockArray[i]) { - try { - loadedBlockArray[i] = mapOfBlockInfoToFuture.get(blockInfos.get(i)).get(); - } catch (InterruptedException | ExecutionException e) { - throw new IndexBuilderException(e); - } + try { + loadedBlockArray[i] = blocksList.get(blockCounter++).get(); + } catch (Throwable e) { + exceptionOccurred = true; + exceptionRef = e; } - + } + if (exceptionOccurred) { + LOGGER.error("Block B-Tree loading failed. Clearing the access count of the loaded blocks."); + // in case of any failure clear the access count for the valid loaded blocks + clearAccessCountForLoadedBlocks(loadedBlockArray); + throw new CarbonUtilException("Block B-tree loading failed", exceptionRef); } } - private AbstractIndex loadBlock(Map tableBlockMapTemp, - BlockInfo blockInfo) throws CarbonUtilException { - AbstractIndex tableBlock; - DataFileFooter footer; - // getting the data file meta data of the block - footer = CarbonUtil.readMetadatFile(blockInfo.getTableBlockInfo()); - tableBlock = new BlockIndex(); - footer.setBlockInfo(blockInfo); - // building the block - tableBlock.buildIndex(Arrays.asList(footer)); - tableBlockMapTemp.put(blockInfo, tableBlock); - // finally remove the lock object from block info lock as once block is loaded - // it will not come inside this if condition - blockInfoLock.remove(blockInfo); - return tableBlock; + /** + * This method will clear the access count for the loaded blocks + * + * @param loadedBlockArray + */ + private void clearAccessCountForLoadedBlocks(AbstractIndex[] loadedBlockArray) { + for (int i = 0; i < loadedBlockArray.length; i++) { + if (null != loadedBlockArray[i]) { + loadedBlockArray[i].clear(); + } + } } /** - * Method to add table level lock if lock is not present for the table - * - * @param absoluteTableIdentifier + * Thread class which will be used to load the blocks */ - private synchronized void addTableLockObject(AbsoluteTableIdentifier absoluteTableIdentifier) { - // add the instance to lock map if it is not present - if (null == tableLockMap.get(absoluteTableIdentifier)) { - tableLockMap.put(absoluteTableIdentifier, new Object()); + private class BlockLoaderThread implements Callable { + // table block unique identifier + private TableBlockUniqueIdentifier tableBlockUniqueIdentifier; + + private BlockLoaderThread(TableBlockUniqueIdentifier tableBlockUniqueIdentifier) { + this.tableBlockUniqueIdentifier = tableBlockUniqueIdentifier; + } + + @Override public AbstractIndex call() throws Exception { + // load and return the loaded blocks + return get(tableBlockUniqueIdentifier); } } + private AbstractIndex loadBlock(TableBlockUniqueIdentifier tableBlockUniqueIdentifier) + throws CarbonUtilException { + AbstractIndex tableBlock = new BlockIndex(); + BlockInfo blockInfo = new BlockInfo(tableBlockUniqueIdentifier.getTableBlockInfo()); + String lruCacheKey = + getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo); + checkAndLoadTableBlocks(tableBlock, tableBlockUniqueIdentifier, lruCacheKey); + // finally remove the lock object from block info lock as once block is loaded + // it will not come inside this if condition + blockInfoLock.remove(blockInfo); + return tableBlock; + } + /** * This will be used to remove a particular blocks useful in case of * deletion of some of the blocks in case of retention or may be some other * scenario * - * @param segmentsToBeRemoved list of segments to be removed + * @param segmentIds list of table blocks to be removed * @param absoluteTableIdentifier absolute table identifier */ - public void removeTableBlocks(List segmentsToBeRemoved, + public void removeTableBlocks(List segmentIds, AbsoluteTableIdentifier absoluteTableIdentifier) { - // get the lock object if lock object is not present then it is not - // loaded at all - // we can return from here - Object lockObject = tableLockMap.get(absoluteTableIdentifier); - if (null == lockObject) { - return; - } - Map map = tableBlocksMap.get(absoluteTableIdentifier); - // if there is no loaded blocks then return - if (null == map || map.isEmpty()) { + if (null == segmentIds) { return; } - Map> segmentIdToBlockInfoMap = - segmentIdToBlockListMap.get(absoluteTableIdentifier); - if (null == segmentIdToBlockInfoMap || segmentIdToBlockInfoMap.isEmpty()) { - return; - } - synchronized (lockObject) { - for (String segmentId : segmentsToBeRemoved) { - List tableBlockInfoList = segmentIdToBlockInfoMap.remove(segmentId); - if (null == tableBlockInfoList) { - continue; - } - Iterator tableBlockInfoIterator = tableBlockInfoList.iterator(); - while (tableBlockInfoIterator.hasNext()) { - BlockInfo info = tableBlockInfoIterator.next(); - map.remove(info); + for (String segmentId : segmentIds) { + TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = + new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId); + List blockInfos = segmentIdToBlockListMap + .remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); + if (null != blockInfos) { + for (BlockInfo blockInfo : blockInfos) { + String lruCacheKey = getLruCacheKey(absoluteTableIdentifier, blockInfo); + lruCache.remove(lruCacheKey); } } } } - - /** - * remove all the details of a table this will be used in case of drop table - * - * @param absoluteTableIdentifier absolute table identifier to find the table - */ - public void clear(AbsoluteTableIdentifier absoluteTableIdentifier) { - // removing all the details of table - tableLockMap.remove(absoluteTableIdentifier); - tableBlocksMap.remove(absoluteTableIdentifier); - } - - /** - * Thread class which will be used to load the blocks - */ - private class BlockLoaderThread implements Callable { - /** - * table block info to block index map - */ - private Map tableBlockMap; - - // block info - private BlockInfo blockInfo; - - private BlockLoaderThread(BlockInfo blockInfo, Map tableBlockMap) { - this.tableBlockMap = tableBlockMap; - this.blockInfo = blockInfo; - } - - @Override public AbstractIndex call() throws Exception { - // load and return the loaded blocks - return loadBlock(tableBlockMap, blockInfo); - } - } } diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java index 6ab18bb0cf5..83e06e98aa6 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java @@ -29,37 +29,35 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CarbonLRUCache; import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndex; +import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException; import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.carbon.path.CarbonTablePath.DataFileUtil; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.CarbonUtilException; /** - * Singleton Class to handle loading, unloading,clearing,storing of the table + * Class to handle loading, unloading,clearing,storing of the table * blocks */ -public class SegmentTaskIndexStore { - +public class SegmentTaskIndexStore + implements Cache { private static final LogService LOGGER = LogServiceFactory.getLogService(SegmentTaskIndexStore.class.getName()); /** - * singleton instance + * carbon store path */ - private static final SegmentTaskIndexStore SEGMENTTASKINDEXSTORE = new SegmentTaskIndexStore(); - + protected String carbonStorePath; /** - * mapping of table identifier to map of segmentId_taskId to table segment - * reason of so many map as each segment can have multiple data file and - * each file will have its own btree + * CarbonLRU cache */ - private Map>> tableSegmentMap; + protected CarbonLRUCache lruCache; /** * map of block info to lock object map, while loading the btree this will be filled @@ -70,28 +68,82 @@ public class SegmentTaskIndexStore { private Map segmentLockMap; /** - * table and its lock object to this will be useful in case of concurrent - * query scenario when more than one query comes for same table and in that - * case it will ensure that only one query will able to load the blocks + * constructor to initialize the SegmentTaskIndexStore + * + * @param carbonStorePath + * @param lruCache */ - private Map tableLockMap; - - private SegmentTaskIndexStore() { - tableSegmentMap = - new ConcurrentHashMap<>( - CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - tableLockMap = new ConcurrentHashMap( - CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + public SegmentTaskIndexStore(String carbonStorePath, CarbonLRUCache lruCache) { + this.carbonStorePath = carbonStorePath; + this.lruCache = lruCache; segmentLockMap = new ConcurrentHashMap(); } + @Override + public SegmentTaskIndexWrapper get(TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier) + throws CarbonUtilException { + SegmentTaskIndexWrapper segmentTaskIndexWrapper = null; + try { + segmentTaskIndexWrapper = + loadAndGetTaskIdToSegmentsMap(tableSegmentUniqueIdentifier.getSegmentToTableBlocksInfos(), + tableSegmentUniqueIdentifier.getAbsoluteTableIdentifier(), + tableSegmentUniqueIdentifier); + } catch (IndexBuilderException e) { + throw new CarbonUtilException(e.getMessage(), e); + } + if (null != segmentTaskIndexWrapper) { + segmentTaskIndexWrapper.incrementAccessCount(); + } + return segmentTaskIndexWrapper; + } + /** - * Return the instance of this class + * returns all the segments taskid_to_Blcoks map wrapper. * - * @return singleton instance + * @param tableSegmentUniqueIdentifiers + * @return + * @throws CarbonUtilException */ - public static SegmentTaskIndexStore getInstance() { - return SEGMENTTASKINDEXSTORE; + @Override public List getAll( + List tableSegmentUniqueIdentifiers) throws CarbonUtilException { + List segmentTaskIndexWrappers = + new ArrayList<>(tableSegmentUniqueIdentifiers.size()); + try { + for (TableSegmentUniqueIdentifier segmentUniqueIdentifier : tableSegmentUniqueIdentifiers) { + segmentTaskIndexWrappers.add(get(segmentUniqueIdentifier)); + } + } catch (CarbonUtilException e) { + for (SegmentTaskIndexWrapper segmentTaskIndexWrapper : segmentTaskIndexWrappers) { + segmentTaskIndexWrapper.clear(); + } + throw new CarbonUtilException("Problem in loading segment blocks.", e); + } + return segmentTaskIndexWrappers; + } + + /** + * returns the SegmentTaskIndexWrapper + * + * @param tableSegmentUniqueIdentifier + * @return + */ + @Override public SegmentTaskIndexWrapper getIfPresent( + TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier) { + SegmentTaskIndexWrapper segmentTaskIndexWrapper = (SegmentTaskIndexWrapper) lruCache + .get(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); + if (null != segmentTaskIndexWrapper) { + segmentTaskIndexWrapper.incrementAccessCount(); + } + return segmentTaskIndexWrapper; + } + + /** + * method invalidate the segment cache for segment + * + * @param tableSegmentUniqueIdentifier + */ + @Override public void invalidate(TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier) { + lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); } /** @@ -105,148 +157,97 @@ public static SegmentTaskIndexStore getInstance() { * @return map of taks id to segment mapping * @throws IndexBuilderException */ - public Map loadAndGetTaskIdToSegmentsMap( + private SegmentTaskIndexWrapper loadAndGetTaskIdToSegmentsMap( Map> segmentToTableBlocksInfos, - AbsoluteTableIdentifier absoluteTableIdentifier) throws IndexBuilderException { + AbsoluteTableIdentifier absoluteTableIdentifier, + TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier) + throws IndexBuilderException, CarbonUtilException { // task id to segment map - Map taskIdToTableSegmentMap = - new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - addLockObject(absoluteTableIdentifier); - Iterator>> iteratorOverSegmentBlocksInfos = + Iterator>> iteratorOverSegmentBlocksInfos = segmentToTableBlocksInfos.entrySet().iterator(); - Map> tableSegmentMapTemp = - addTableSegmentMap(absoluteTableIdentifier); Map taskIdToSegmentIndexMap = null; - String segmentId = null; - TaskBucketHolder taskId = null; + SegmentTaskIndexWrapper segmentTaskIndexWrapper = null; + TaskBucketHolder taskBucketHolder = null; try { while (iteratorOverSegmentBlocksInfos.hasNext()) { // segment id to table block mapping - Entry> next = iteratorOverSegmentBlocksInfos.next(); + iteratorOverSegmentBlocksInfos.next(); // group task id to table block info mapping for the segment Map> taskIdToTableBlockInfoMap = mappedAndGetTaskIdToTableBlockInfo(segmentToTableBlocksInfos); // get the existing map of task id to table segment map - segmentId = next.getKey(); // check if segment is already loaded, if segment is already loaded //no need to load the segment block - taskIdToSegmentIndexMap = tableSegmentMapTemp.get(segmentId); - if (taskIdToSegmentIndexMap == null) { + String lruCacheKey = tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier(); + segmentTaskIndexWrapper = (SegmentTaskIndexWrapper) lruCache.get(lruCacheKey); + if (segmentTaskIndexWrapper == null) { // get the segment loader lock object this is to avoid // same segment is getting loaded multiple times // in case of concurrent query - Object segmentLoderLockObject = segmentLockMap.get(segmentId); + Object segmentLoderLockObject = segmentLockMap.get(lruCacheKey); if (null == segmentLoderLockObject) { - segmentLoderLockObject = addAndGetSegmentLock(segmentId); + segmentLoderLockObject = addAndGetSegmentLock(lruCacheKey); } // acquire lock to lod the segment synchronized (segmentLoderLockObject) { - taskIdToSegmentIndexMap = tableSegmentMapTemp.get(segmentId); - if (null == taskIdToSegmentIndexMap) { - // creating a map of task id to table segment - taskIdToSegmentIndexMap = new ConcurrentHashMap(); - Iterator>> iterator = + segmentTaskIndexWrapper = (SegmentTaskIndexWrapper) lruCache.get(lruCacheKey); + if (null == segmentTaskIndexWrapper) { + // creating a map of take if to table segment + taskIdToSegmentIndexMap = new HashMap(); + segmentTaskIndexWrapper = new SegmentTaskIndexWrapper(taskIdToSegmentIndexMap); + Iterator>> iterator = taskIdToTableBlockInfoMap.entrySet().iterator(); - while (iterator.hasNext()) { - Entry> taskToBlockInfoList = iterator.next(); - taskId = taskToBlockInfoList.getKey(); - taskIdToSegmentIndexMap.put(taskId, - loadBlocks(taskId, taskToBlockInfoList.getValue(), absoluteTableIdentifier)); + long requiredSize = + calculateRequiredSize(taskIdToTableBlockInfoMap, absoluteTableIdentifier); + segmentTaskIndexWrapper.setMemorySize(requiredSize); + boolean isAddedToLruCache = + lruCache.put(lruCacheKey, segmentTaskIndexWrapper, requiredSize); + if (isAddedToLruCache) { + while (iterator.hasNext()) { + Map.Entry> taskToBlockInfoList = + iterator.next(); + taskBucketHolder = taskToBlockInfoList.getKey(); + taskIdToSegmentIndexMap.put(taskBucketHolder, + loadBlocks(taskBucketHolder, taskToBlockInfoList.getValue(), + absoluteTableIdentifier)); + } + } else { + throw new IndexBuilderException( + "Can not load the segment. No Enough space available."); } - tableSegmentMapTemp.put(next.getKey(), taskIdToSegmentIndexMap); + //tableSegmentMapTemp.put(next.getKey(), taskIdToSegmentIndexMap); // removing from segment lock map as once segment is loaded //if concurrent query is coming for same segment // it will wait on the lock so after this segment will be already //loaded so lock is not required, that is why removing the // the lock object as it wont be useful - segmentLockMap.remove(segmentId); + segmentLockMap.remove(lruCacheKey); } } - taskIdToTableSegmentMap.putAll(taskIdToSegmentIndexMap); } } - } catch (CarbonUtilException e) { + } catch (IndexBuilderException e) { LOGGER.error("Problem while loading the segment"); throw new IndexBuilderException(e); } - return taskIdToTableSegmentMap; + return segmentTaskIndexWrapper; } - /** - * Below method will be used to get the segment level lock object - * - * @param segmentId - * @return lock object - */ - private synchronized Object addAndGetSegmentLock(String segmentId) { - // get the segment lock object if it is present then return - // otherwise add the new lock and return - Object segmentLoderLockObject = segmentLockMap.get(segmentId); - if (null == segmentLoderLockObject) { - segmentLoderLockObject = new Object(); - segmentLockMap.put(segmentId, segmentLoderLockObject); - } - return segmentLoderLockObject; - } - - /** - * Below code is to add table lock map which will be used to - * add - * - * @param absoluteTableIdentifier - */ - private synchronized void addLockObject(AbsoluteTableIdentifier absoluteTableIdentifier) { - // add the instance to lock map if it is not present - if (null == tableLockMap.get(absoluteTableIdentifier)) { - tableLockMap.put(absoluteTableIdentifier, new Object()); - } - } - - /** - * Below method will be used to get the table segment map - * if table segment is not present then it will add and return - * - * @param absoluteTableIdentifier - * @return table segment map - */ - private Map> addTableSegmentMap( + private long calculateRequiredSize( + Map> taskIdToTableBlockInfoMap, AbsoluteTableIdentifier absoluteTableIdentifier) { - // get the instance of lock object - Object lockObject = tableLockMap.get(absoluteTableIdentifier); - Map> tableSegmentMapTemp = - tableSegmentMap.get(absoluteTableIdentifier); - if (null == tableSegmentMapTemp) { - synchronized (lockObject) { - // segment id to task id to table segment map - tableSegmentMapTemp = tableSegmentMap.get(absoluteTableIdentifier); - if (null == tableSegmentMapTemp) { - tableSegmentMapTemp = new ConcurrentHashMap<>(); - tableSegmentMap.put(absoluteTableIdentifier, tableSegmentMapTemp); - } - } + Iterator>> iterator = + taskIdToTableBlockInfoMap.entrySet().iterator(); + TaskBucketHolder taskBucketHolder; + long driverBTreeSize = 0; + while (iterator.hasNext()) { + Map.Entry> taskToBlockInfoList = iterator.next(); + taskBucketHolder = taskToBlockInfoList.getKey(); + driverBTreeSize += CarbonUtil + .calculateDriverBTreeSize(taskBucketHolder.taskNo, taskBucketHolder.bucketNumber, + taskToBlockInfoList.getValue(), absoluteTableIdentifier); } - return tableSegmentMapTemp; - } - - /** - * Below method will be used to load the blocks - * - * @param tableBlockInfoList - * @return loaded segment - * @throws CarbonUtilException - */ - private AbstractIndex loadBlocks(TaskBucketHolder holder, List tableBlockInfoList, - AbsoluteTableIdentifier tableIdentifier) throws CarbonUtilException { - // all the block of one task id will be loaded together - // so creating a list which will have all the data file meta data to of one task - List footerList = CarbonUtil - .readCarbonIndexFile(holder.taskNo, holder.bucketNumber, tableBlockInfoList, - tableIdentifier); - AbstractIndex segment = new SegmentTaskIndex(); - // file path of only first block is passed as it all table block info path of - // same task id will be same - segment.buildIndex(footerList); - return segment; + return driverBTreeSize; } /** @@ -282,60 +283,71 @@ private Map> mappedAndGetTaskIdToTableBlo } /** - * remove all the details of a table this will be used in case of drop table + * Below method will be used to get the segment level lock object + * + * @param segmentId + * @return lock object + */ + private synchronized Object addAndGetSegmentLock(String segmentId) { + // get the segment lock object if it is present then return + // otherwise add the new lock and return + Object segmentLoderLockObject = segmentLockMap.get(segmentId); + if (null == segmentLoderLockObject) { + segmentLoderLockObject = new Object(); + segmentLockMap.put(segmentId, segmentLoderLockObject); + } + return segmentLoderLockObject; + } + + /** + * Below method will be used to load the blocks * - * @param absoluteTableIdentifier absolute table identifier to find the table + * @param tableBlockInfoList + * @return loaded segment + * @throws CarbonUtilException */ - public void clear(AbsoluteTableIdentifier absoluteTableIdentifier) { - // removing all the details of table - tableLockMap.remove(absoluteTableIdentifier); - tableSegmentMap.remove(absoluteTableIdentifier); + private AbstractIndex loadBlocks(TaskBucketHolder taskBucketHolder, + List tableBlockInfoList, AbsoluteTableIdentifier tableIdentifier) + throws CarbonUtilException { + // all the block of one task id will be loaded together + // so creating a list which will have all the data file meta data to of one task + List footerList = CarbonUtil + .readCarbonIndexFile(taskBucketHolder.taskNo, taskBucketHolder.bucketNumber, + tableBlockInfoList, tableIdentifier); + AbstractIndex segment = new SegmentTaskIndex(); + // file path of only first block is passed as it all table block info path of + // same task id will be same + segment.buildIndex(footerList); + return segment; } /** - * Below method will be used to remove the segment block based on + * Below method will be used to remove the segment based on * segment id is passed * * @param segmentToBeRemoved segment to be removed * @param absoluteTableIdentifier absoluteTableIdentifier */ - public void removeTableBlocks(List segmentToBeRemoved, + public void removeSegments(List segmentToBeRemoved, AbsoluteTableIdentifier absoluteTableIdentifier) { - // get the lock object if lock object is not present then it is not - // loaded at all - // we can return from here - Object lockObject = tableLockMap.get(absoluteTableIdentifier); - if (null == lockObject) { - return; - } - // Acquire the lock and remove only those instance which was loaded - Map> map = - tableSegmentMap.get(absoluteTableIdentifier); - // if there is no loaded blocks then return - if (null == map) { - return; - } for (String segmentId : segmentToBeRemoved) { - map.remove(segmentId); + TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = + new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId); + lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); } } /** - * Below method will be used to check if segment blocks - * is already loaded or not + * The method clears the access count of table segments * - * @param absoluteTableIdentifier - * @param segmentId - * @return is loaded then return the loaded blocks otherwise null + * @param tableSegmentUniqueIdentifiers */ - public Map getSegmentBTreeIfExists( - AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) { - Map> tableSegment = - tableSegmentMap.get(absoluteTableIdentifier); - if (null == tableSegment) { - return null; + public void clear(List tableSegmentUniqueIdentifiers) { + for (TableSegmentUniqueIdentifier segmentUniqueIdentifier : tableSegmentUniqueIdentifiers) { + SegmentTaskIndexWrapper cacheable = (SegmentTaskIndexWrapper) lruCache + .get(segmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); + cacheable.clear(); } - return tableSegment.get(segmentId); } public static class TaskBucketHolder implements Serializable { diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/TableSegmentUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/TableSegmentUniqueIdentifier.java new file mode 100644 index 00000000000..ffde93f1119 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/TableSegmentUniqueIdentifier.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.carbon.datastore; + +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; +import org.apache.carbondata.core.carbon.CarbonTableIdentifier; +import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.constants.CarbonCommonConstants; + +/** + * Class holds the absoluteTableIdentifier and segmentId to uniquely identify a segment + */ +public class TableSegmentUniqueIdentifier { + /** + * table fully qualified identifier + */ + private AbsoluteTableIdentifier absoluteTableIdentifier; + + /** + * segment to tableBlockInfo map + */ + Map> segmentToTableBlocksInfos; + + private String segmentId; + + /** + * Constructor to initialize the class instance + * @param absoluteTableIdentifier + * @param segmentId + */ + public TableSegmentUniqueIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier, + String segmentId) { + this.absoluteTableIdentifier = absoluteTableIdentifier; + this.segmentId = segmentId; + } + + public TableSegmentUniqueIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier, + Map> segmentToTableBlocksInfos, String segmentId) { + this.absoluteTableIdentifier = absoluteTableIdentifier; + this.segmentToTableBlocksInfos = segmentToTableBlocksInfos; + this.segmentId = segmentId; + } + + /** + * returns AbsoluteTableIdentifier + * @return + */ + public AbsoluteTableIdentifier getAbsoluteTableIdentifier() { + return absoluteTableIdentifier; + } + + public void setAbsoluteTableIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier) { + this.absoluteTableIdentifier = absoluteTableIdentifier; + } + + /** + * returns the segment to tableBlockInfo map + * @return + */ + public Map> getSegmentToTableBlocksInfos() { + return segmentToTableBlocksInfos; + } + + /** + * set the segment to tableBlockInfo map + * @param segmentToTableBlocksInfos + */ + public void setSegmentToTableBlocksInfos( + Map> segmentToTableBlocksInfos) { + this.segmentToTableBlocksInfos = segmentToTableBlocksInfos; + } + + public String getSegmentId() { + return segmentId; + } + + /** + * method returns the id to uniquely identify a key + * + * @return + */ + public String getUniqueTableSegmentIdentifier() { + CarbonTableIdentifier carbonTableIdentifier = + absoluteTableIdentifier.getCarbonTableIdentifier(); + return carbonTableIdentifier.getDatabaseName() + + CarbonCommonConstants.FILE_SEPARATOR + carbonTableIdentifier + .getTableId() + CarbonCommonConstants.FILE_SEPARATOR + segmentId; + } + + /** + * equals method to compare two objects having same + * absoluteIdentifier and segmentId + * @param o + * @return + */ + @Override public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TableSegmentUniqueIdentifier that = (TableSegmentUniqueIdentifier) o; + + if (!absoluteTableIdentifier.equals(that.absoluteTableIdentifier)) return false; + return segmentId.equals(that.segmentId); + + } + + /** + * Returns hashcode for the TableSegmentIdentifier + * @return + */ + @Override public int hashCode() { + int result = absoluteTableIdentifier.hashCode(); + result = 31 * result + segmentId.hashCode(); + return result; + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/AbstractIndex.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/AbstractIndex.java index 7e1ed8c0a16..dd712fa5868 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/AbstractIndex.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/AbstractIndex.java @@ -19,11 +19,13 @@ package org.apache.carbondata.core.carbon.datastore.block; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.carbondata.core.cache.Cacheable; import org.apache.carbondata.core.carbon.datastore.DataRefNode; import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter; -public abstract class AbstractIndex { +public abstract class AbstractIndex implements Cacheable { /** * vo class which will hold the RS information of the block @@ -40,6 +42,16 @@ public abstract class AbstractIndex { */ protected long totalNumberOfRows; + /** + * atomic integer to maintain the access count for a column access + */ + protected AtomicInteger accessCount = new AtomicInteger(); + + /** + * Table block meta size. + */ + protected long memorySize; + /** * @return the totalNumberOfRows */ @@ -61,10 +73,64 @@ public DataRefNode getDataRefNode() { return dataRefNode; } + @Override public long getFileTimeStamp() { + return 0; + } + /** * Below method will be used to load the data block * - * @param blockInfo block detail + * @param footerList footer list */ public abstract void buildIndex(List footerList); + + /** + * the method returns the access count + * + * @return + */ + @Override public int getAccessCount() { + return accessCount.get(); + } + + /** + * The method returns table block size + * + * @return + */ + @Override public long getMemorySize() { + return this.memorySize; + } + + /** + * The method is used to set the access count + */ + public void incrementAccessCount() { + accessCount.incrementAndGet(); + } + + /** + * This method will release the objects and set default value for primitive types + */ + public void clear() { + decrementAccessCount(); + } + + /** + * This method will decrement the access count for a column by 1 + * whenever a column usage is complete + */ + private void decrementAccessCount() { + if (accessCount.get() > 0) { + accessCount.decrementAndGet(); + } + } + + /** + * the method is used to set the memory size of the b-tree + * @param memorySize + */ + public void setMemorySize(long memorySize) { + this.memorySize = memorySize; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfo.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfo.java index 96879aa6d3b..2a49daf2313 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfo.java @@ -18,6 +18,9 @@ */ package org.apache.carbondata.core.carbon.datastore.block; +import org.apache.carbondata.core.carbon.path.CarbonTablePath; +import org.apache.carbondata.core.constants.CarbonCommonConstants; + /** * Below class will be used to store table block info * As in blocklet distribution we are dividing the same block @@ -32,6 +35,10 @@ public class BlockInfo { * about the block */ private TableBlockInfo info; + /** + * unique blockName + */ + private String blockUniqueName; /** * Constructor @@ -40,6 +47,18 @@ public class BlockInfo { */ public BlockInfo(TableBlockInfo info) { this.info = info; + init(); + } + + /** + * init the block unique name + */ + private void init() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(this.info.getSegmentId()); + stringBuilder.append(CarbonCommonConstants.FILE_SEPARATOR); + stringBuilder.append(CarbonTablePath.getCarbonDataFileName(this.info.getFilePath())); + this.blockUniqueName = stringBuilder.toString(); } /** @@ -104,4 +123,12 @@ public void setTableBlockInfo(TableBlockInfo info) { } return true; } + + /** + * returns unique blockname + * @return + */ + public String getBlockUniqueName() { + return blockUniqueName; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentTaskIndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentTaskIndexWrapper.java new file mode 100644 index 00000000000..cd278b52595 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentTaskIndexWrapper.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.carbon.datastore.block; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.carbondata.core.cache.Cacheable; +import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore; + +/** + * SegmentTaskIndexWrapper class holds the taskIdToTableSegmentMap + */ +public class SegmentTaskIndexWrapper implements Cacheable { + + /** + * task_id to table segment index map + */ + private Map taskIdToTableSegmentMap; + /** + * atomic integer to maintain the access count for a column access + */ + protected AtomicInteger accessCount = new AtomicInteger(); + + /** + * Table block meta size. + */ + protected long memorySize; + + public SegmentTaskIndexWrapper( + Map taskIdToTableSegmentMap) { + this.taskIdToTableSegmentMap = taskIdToTableSegmentMap; + } + + public Map getTaskIdToTableSegmentMap() { + return taskIdToTableSegmentMap; + } + + public void setTaskIdToTableSegmentMap( + Map taskIdToTableSegmentMap) { + this.taskIdToTableSegmentMap = taskIdToTableSegmentMap; + } + + /** + * return segment size + * + * @param memorySize + */ + public void setMemorySize(long memorySize) { + this.memorySize = memorySize; + } + + /** + * returns the timestamp + * + * @return + */ + @Override public long getFileTimeStamp() { + return 0; + } + + /** + * returns the access count + * + * @return + */ + @Override public int getAccessCount() { + return accessCount.get(); + } + + /** + * returns the memory size + * + * @return + */ + @Override public long getMemorySize() { + return memorySize; + } + + /** + * The method is used to set the access count + */ + public void incrementAccessCount() { + accessCount.incrementAndGet(); + } + + /** + * This method will release the objects and set default value for primitive types + */ + public void clear() { + decrementAccessCount(); + } + + /** + * This method will decrement the access count for a column by 1 + * whenever a column usage is complete + */ + private void decrementAccessCount() { + if (accessCount.get() > 0) { + accessCount.decrementAndGet(); + } + } + +} diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockUniqueIdentifier.java new file mode 100644 index 00000000000..6e57e0fdb51 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockUniqueIdentifier.java @@ -0,0 +1,72 @@ +package org.apache.carbondata.core.carbon.datastore.block; + +import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; +import org.apache.carbondata.core.carbon.CarbonTableIdentifier; +import org.apache.carbondata.core.constants.CarbonCommonConstants; + +/** + * Class : Holds the info to uniquely identify a blocks + */ +public class TableBlockUniqueIdentifier { + + /** + * table fully qualified name + */ + private AbsoluteTableIdentifier absoluteTableIdentifier; + + /** + * table block info + */ + private TableBlockInfo tableBlockInfo; + + public TableBlockUniqueIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier, + TableBlockInfo tableBlockInfo) { + this.absoluteTableIdentifier = absoluteTableIdentifier; + this.tableBlockInfo = tableBlockInfo; + } + + public AbsoluteTableIdentifier getAbsoluteTableIdentifier() { + return absoluteTableIdentifier; + } + + public void setAbsoluteTableIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier) { + this.absoluteTableIdentifier = absoluteTableIdentifier; + } + + public TableBlockInfo getTableBlockInfo() { + return tableBlockInfo; + } + + public void setTableBlockInfo(TableBlockInfo tableBlockInfo) { + this.tableBlockInfo = tableBlockInfo; + } + + @Override public int hashCode() { + return this.absoluteTableIdentifier.hashCode() + this.tableBlockInfo.hashCode(); + } + + @Override public boolean equals(Object other) { + if (this == other) return true; + if (other == null || getClass() != other.getClass()) return false; + TableBlockUniqueIdentifier tableBlockUniqueIdentifier = (TableBlockUniqueIdentifier) other; + return this.absoluteTableIdentifier.equals(tableBlockUniqueIdentifier.absoluteTableIdentifier) + && this.tableBlockInfo.equals(tableBlockUniqueIdentifier.tableBlockInfo); + } + + /** + * returns the String value to uniquely identify a block + * + * @return + */ + public String getUniqueTableBlockName() { + BlockInfo blockInfo = new BlockInfo(this.tableBlockInfo); + CarbonTableIdentifier carbonTableIdentifier = + this.absoluteTableIdentifier.getCarbonTableIdentifier(); + String uniqueTableBlockName = carbonTableIdentifier.getDatabaseName() + + CarbonCommonConstants.FILE_SEPARATOR + carbonTableIdentifier + .getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + + this.tableBlockInfo.getSegmentId() + + CarbonCommonConstants.FILE_SEPARATOR + blockInfo.hashCode(); + return uniqueTableBlockName; + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java index cda971af3ca..8c9f2977748 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java @@ -428,4 +428,17 @@ public CarbonFile[] getSortIndexFiles(CarbonFile sortIndexDir, final String colu }); return files; } -} \ No newline at end of file + + /** + * returns the carbondata file name + * + * @param carbonDataFilePath carbondata file path + * @return + */ + public static String getCarbonDataFileName(String carbonDataFilePath) { + String carbonDataFileName = carbonDataFilePath + .substring(carbonDataFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR) + 1, + carbonDataFilePath.indexOf(CARBON_DATA_EXT)); + return carbonDataFileName; + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index f766042fad5..6c159e6c3f4 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -557,13 +557,18 @@ public final class CarbonCommonConstants { public static final String SCHEMAS_MODIFIED_TIME_FILE = "modifiedTime.mdt"; public static final String DEFAULT_INVISIBLE_DUMMY_MEASURE = "default_dummy_measure"; /** - * max level cache size upto which level cache will be loaded in memory + * max driver lru cache size upto which lru cache will be loaded in memory */ - public static final String CARBON_MAX_LEVEL_CACHE_SIZE = "carbon.max.level.cache.size"; + public static final String CARBON_MAX_DRIVER_LRU_CACHE_SIZE = "carbon.max.driver.lru.cache.size"; /** - * max level cache size default value in GB + * max executor lru cache size upto which lru cache will be loaded in memory */ - public static final String CARBON_MAX_LEVEL_CACHE_SIZE_DEFAULT = "-1"; + public static final String CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE = + "carbon.max.executor.lru.cache.size"; + /** + * max lru cache size default value in MB + */ + public static final String CARBON_MAX_LRU_CACHE_SIZE_DEFAULT = "-1"; /** * DOUBLE_VALUE_MEASURE */ @@ -991,6 +996,10 @@ public final class CarbonCommonConstants { */ public static final String DICTIONARY_SERVER_PORT_DEFAULT = "2030"; + /** + * property to set is IS_DRIVER_INSTANCE + */ + public static final String IS_DRIVER_INSTANCE = "is.driver.instance"; private CarbonCommonConstants() { } } diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 78cfc360207..41af79289c6 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -48,6 +48,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; +import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; import org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk; import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter; @@ -59,6 +60,7 @@ import org.apache.carbondata.core.carbon.path.CarbonStorePath; import org.apache.carbondata.core.carbon.path.CarbonTablePath; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastorage.store.FileHolder; import org.apache.carbondata.core.datastorage.store.columnar.ColumnGroupModel; import org.apache.carbondata.core.datastorage.store.columnar.UnBlockIndexer; import org.apache.carbondata.core.datastorage.store.compression.MeasureMetaDataModel; @@ -889,6 +891,31 @@ public static DataFileFooter readMetadatFile(TableBlockInfo tableBlockInfo) } } + /** + * The method calculate the B-Tree metadata size. + * @param filePath + * @param blockOffset + * @param blockLength + * @return + */ + public static long calculateMetaSize(String filePath, long blockOffset, long blockLength) { + FileHolder fileReader = null; + try { + long completeBlockLength = blockOffset + blockLength; + long footerPointer = completeBlockLength - 8; + fileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath)); + long actualFooterOffset = fileReader.readLong(filePath, footerPointer); + long size = footerPointer - actualFooterOffset; + return size; + } + finally { + if(null != fileReader) { + fileReader.finish(); + } + } + } + + /** * Below method will be used to get the surrogate key * @@ -908,6 +935,44 @@ public static int getSurrogateKey(byte[] data, ByteBuffer buffer) { return surrogate; } + /** + * The method returns the B-Tree for a particular taskId + * + * @param taskId + * @param tableBlockInfoList + * @param absoluteTableIdentifier + */ + public static long calculateDriverBTreeSize(String taskId, String bucketNumber, + List tableBlockInfoList, AbsoluteTableIdentifier absoluteTableIdentifier) { + // need to sort the block info list based for task in ascending order so + // it will be sinkup with block index read from file + Collections.sort(tableBlockInfoList); + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); + // geting the index file path + //TODO need to pass proper partition number when partiton will be supported + String carbonIndexFilePath = carbonTablePath + .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId(), + bucketNumber); + CarbonFile carbonFile = FileFactory + .getCarbonFile(carbonIndexFilePath, FileFactory.getFileType(carbonIndexFilePath)); + // in case of carbonIndex file whole file is meta only so reading complete file. + return carbonFile.getSize(); + } + + /** + * This method will clear the B-Tree Cache in executors for the given list of blocks + * + * @param dataBlocks + */ + public static void clearBlockCache(List dataBlocks) { + if (null != dataBlocks) { + for (AbstractIndex blocks : dataBlocks) { + blocks.clear(); + } + } + } /** * Thread to delete the tables * diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java index f6df17520bb..0ceb80fb57c 100644 --- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java @@ -30,11 +30,15 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.common.logging.impl.StandardLogService; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; import org.apache.carbondata.core.carbon.datastore.BlockIndexStore; import org.apache.carbondata.core.carbon.datastore.IndexKey; import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties; -import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException; +import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier; import org.apache.carbondata.core.carbon.metadata.datatype.DataType; import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension; @@ -47,6 +51,7 @@ import org.apache.carbondata.core.keygenerator.KeyGenerator; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.CarbonUtilException; import org.apache.carbondata.scan.executor.QueryExecutor; import org.apache.carbondata.scan.executor.exception.QueryExecutionException; import org.apache.carbondata.scan.executor.infos.AggregatorInfo; @@ -106,15 +111,19 @@ protected void initQuery(QueryModel queryModel) throws QueryExecutionException { // query execution Collections.sort(queryModel.getTableBlockInfos()); // get the table blocks - BlockIndexStore blockLoaderInstance = BlockIndexStore.getInstance(); + CacheProvider cacheProvider = CacheProvider.getInstance(); + BlockIndexStore cache = + (BlockIndexStore) cacheProvider + .createCache(CacheType.EXECUTOR_BTREE, queryModel.getTable().getStorePath()); // remove the invalid table blocks, block which is deleted or compacted - blockLoaderInstance.removeTableBlocks(queryModel.getInvalidSegmentIds(), + cache.removeTableBlocks(queryModel.getInvalidSegmentIds(), queryModel.getAbsoluteTableIdentifier()); try { - queryProperties.dataBlocks = blockLoaderInstance - .loadAndGetBlocks(queryModel.getTableBlockInfos(), + List tableBlockUniqueIdentifiers = + prepareTableBlockUniqueIdentifier(queryModel.getTableBlockInfos(), queryModel.getAbsoluteTableIdentifier()); - } catch (IndexBuilderException e) { + queryProperties.dataBlocks = cache.getAll(tableBlockUniqueIdentifiers); + } catch (CarbonUtilException e) { throw new QueryExecutionException(e); } queryStatistic @@ -170,6 +179,17 @@ protected void initQuery(QueryModel queryModel) throws QueryExecutionException { queryProperties.sortDimIndexes = new byte[queryModel.getQueryDimension().size()]; } + private List prepareTableBlockUniqueIdentifier( + List tableBlockInfos, AbsoluteTableIdentifier absoluteTableIdentifier) { + List tableBlockUniqueIdentifiers = + new ArrayList<>(tableBlockInfos.size()); + for (TableBlockInfo blockInfo : tableBlockInfos) { + tableBlockUniqueIdentifiers + .add(new TableBlockUniqueIdentifier(absoluteTableIdentifier, blockInfo)); + } + return tableBlockUniqueIdentifiers; + } + /** * Below method will be used to get the key structure info for the query * @@ -470,6 +490,7 @@ private int[] getComplexDimensionParentBlockIndexes(List queryDi * @throws QueryExecutionException */ @Override public void finish() throws QueryExecutionException { + CarbonUtil.clearBlockCache(queryProperties.dataBlocks); if (null != queryProperties.executorService) { queryProperties.executorService.shutdown(); try { diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java index c6e9ff7a47a..ff7a93ca8b7 100644 --- a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java @@ -59,6 +59,7 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator { * execution info of the block */ protected List blockExecutionInfos; + /** * file reader which will be used to execute the query */ diff --git a/core/src/test/java/org/apache/carbondata/core/cache/CacheProviderTest.java b/core/src/test/java/org/apache/carbondata/core/cache/CacheProviderTest.java index 7c5bd446d5b..52cec7d7b06 100644 --- a/core/src/test/java/org/apache/carbondata/core/cache/CacheProviderTest.java +++ b/core/src/test/java/org/apache/carbondata/core/cache/CacheProviderTest.java @@ -40,7 +40,7 @@ public class CacheProviderTest { @Before public void setUp() throws Exception { // enable lru cache by setting cache size CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MAX_LEVEL_CACHE_SIZE, "10"); + .addProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE, "10"); } @Test public void getInstance() throws Exception { diff --git a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java index 559f6535533..953e24fa4f0 100644 --- a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java +++ b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java @@ -70,7 +70,7 @@ public class ForwardDictionaryCacheTest extends AbstractDictionaryCacheTest { private void createDictionaryCacheObject() { // enable lru cache by setting cache size CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MAX_LEVEL_CACHE_SIZE, "10"); + .addProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE, "10"); CacheProvider cacheProvider = CacheProvider.getInstance(); forwardDictionaryCache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, this.carbonStorePath); diff --git a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java index 3d817b6819e..c526f453be7 100644 --- a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java +++ b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java @@ -72,7 +72,7 @@ public class ReverseDictionaryCacheTest extends AbstractDictionaryCacheTest { private void createDictionaryCacheObject() { // enable lru cache by setting cache size CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MAX_LEVEL_CACHE_SIZE, "10"); + .addProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE, "10"); CacheProvider cacheProvider = CacheProvider.getInstance(); cacheProvider.dropAllCache(); reverseDictionaryCache = diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java index 722d030ed59..3490917cb32 100644 --- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java +++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java @@ -24,19 +24,22 @@ import java.util.List; import java.util.Map; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; import org.apache.carbondata.core.carbon.CarbonTableIdentifier; import org.apache.carbondata.core.carbon.ColumnarFormatVersion; import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndex; +import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; -import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException; import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.carbon.metadata.blocklet.SegmentInfo; import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.carbon.path.CarbonTablePath; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.CarbonUtilException; import mockit.Mock; import mockit.MockUp; @@ -56,7 +59,8 @@ public class SegmentTaskIndexStoreTest { private static AbsoluteTableIdentifier absoluteTableIdentifier; @BeforeClass public static void setUp() { - taskIndexStore = SegmentTaskIndexStore.getInstance(); + CacheProvider cacheProvider = CacheProvider.getInstance(); + taskIndexStore = (SegmentTaskIndexStore) cacheProvider.createCache(CacheType.DRIVER_BTREE, ""); tableBlockInfo = new TableBlockInfo("file", 0L, "SG100", locations, 10L, ColumnarFormatVersion.valueOf(version)); absoluteTableIdentifier = new AbsoluteTableIdentifier("/tmp", @@ -81,7 +85,7 @@ private List getDataFileFooters() { return footerList; } - @Test public void loadAndGetTaskIdToSegmentsMap() throws IndexBuilderException { + @Test public void loadAndGetTaskIdToSegmentsMap() throws CarbonUtilException { new MockUp() { @Mock String getTaskNo(String carbonDataFileName) { return "100"; @@ -97,23 +101,40 @@ private List getDataFileFooters() { } }; + new MockUp() { + @Mock public String getCarbonIndexFilePath(final String taskId, final String partitionId, + final String segmentId, final String bucketNumber) { + return "/src/test/resources"; + } + }; + new MockUp() { @Mock void buildIndex(List footerList) { } }; + TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = + new TableSegmentUniqueIdentifier(absoluteTableIdentifier, "SG100"); - Map result = - taskIndexStore.loadAndGetTaskIdToSegmentsMap(new HashMap>() {{ + HashMap> segmentToTableBlocksInfos = + new HashMap>() {{ put("SG100", Arrays.asList(tableBlockInfo)); - }}, absoluteTableIdentifier); + }}; + tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos); + Map result = + taskIndexStore.get(tableSegmentUniqueIdentifier).getTaskIdToTableSegmentMap(); assertEquals(result.size(), 1); assertTrue(result.containsKey(new SegmentTaskIndexStore.TaskBucketHolder("100", "0"))); } @Test public void checkExistenceOfSegmentBTree() { - Map result = - taskIndexStore.getSegmentBTreeIfExists(absoluteTableIdentifier, "SG100"); + TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = + new TableSegmentUniqueIdentifier(absoluteTableIdentifier, "SG100"); + SegmentTaskIndexWrapper segmentTaskIndexWrapper = + taskIndexStore.getIfPresent(tableSegmentUniqueIdentifier); + Map result = segmentTaskIndexWrapper != null ? + segmentTaskIndexWrapper.getTaskIdToTableSegmentMap() : + null; assertNull(result); } diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java index 6d90a36ebed..5e389ea11fc 100644 --- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java +++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java @@ -29,13 +29,13 @@ public class BlockInfoTest { static BlockInfo blockInfo; @BeforeClass public static void setup() { - blockInfo = new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 6, ColumnarFormatVersion.V1)); + blockInfo = new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 6, ColumnarFormatVersion.V1)); } @Test public void hashCodeTest() { int res = blockInfo.hashCode(); - int expectedResult = -520590451; - assertEquals(res, expectedResult); + int expectedResult = 1694768249; + assertEquals(expectedResult, res); } @Test public void equalsTestwithSameObject() { @@ -45,7 +45,7 @@ public class BlockInfoTest { @Test public void equalsTestWithSimilarObject() { BlockInfo blockInfoTest = - new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 6, ColumnarFormatVersion.V1)); + new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 6, ColumnarFormatVersion.V1)); Boolean res = blockInfo.equals(blockInfoTest); assert (res); } @@ -62,28 +62,28 @@ public class BlockInfoTest { @Test public void equalsTestWithDifferentSegmentId() { BlockInfo blockInfoTest = - new BlockInfo(new TableBlockInfo("filePath", 6, "diffSegmentId", null, 6, ColumnarFormatVersion.V1)); + new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "diffSegmentId", null, 6, ColumnarFormatVersion.V1)); Boolean res = blockInfo.equals(blockInfoTest); assert (!res); } @Test public void equalsTestWithDifferentOffset() { BlockInfo blockInfoTest = - new BlockInfo(new TableBlockInfo("filePath", 62, "segmentId", null, 6, ColumnarFormatVersion.V1)); + new BlockInfo(new TableBlockInfo("/filePath.carbondata", 62, "segmentId", null, 6, ColumnarFormatVersion.V1)); Boolean res = blockInfo.equals(blockInfoTest); assert (!res); } @Test public void equalsTestWithDifferentBlockLength() { BlockInfo blockInfoTest = - new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 62, ColumnarFormatVersion.V1)); + new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 62, ColumnarFormatVersion.V1)); Boolean res = blockInfo.equals(blockInfoTest); assert (!res); } @Test public void equalsTestWithDiffFilePath() { BlockInfo blockInfoTest = - new BlockInfo(new TableBlockInfo("diffFilePath", 6, "segmentId", null, 62, ColumnarFormatVersion.V1)); + new BlockInfo(new TableBlockInfo("/diffFilePath.carbondata", 6, "segmentId", null, 62, ColumnarFormatVersion.V1)); Boolean res = blockInfoTest.equals(blockInfo); assert (!res); } diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java index 14d92482d08..4eed73242b0 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java @@ -210,7 +210,7 @@ public void testToDeleteFolderWithInterruptedException() } }; String badLogStoreLocation = CarbonUtil.getBadLogPath("badLogPath"); - assertEquals(badLogStoreLocation, "../unibi-solutions/system/carbon/badRecords/badLogPath"); + assertEquals(badLogStoreLocation.replace("\\", "/"), "../unibi-solutions/system/carbon/badRecords/badLogPath"); } @Test public void testToDeleteFoldersAndFilesForCarbonFileSilently() diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java index b7c48d70c63..38c85fe3ea6 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java @@ -153,7 +153,7 @@ public DataInputStream getDataInputStream(String path, FileFactory.FileType file } }; String[] arr = { "a", "b", "c" }; - TableBlockInfo tableBlockInfo = new TableBlockInfo("file", 3, "id", arr, 3, ColumnarFormatVersion.V1); + TableBlockInfo tableBlockInfo = new TableBlockInfo("/file.carbondata", 3, "id", arr, 3, ColumnarFormatVersion.V1); tableBlockInfo.getBlockletInfos().setNoOfBlockLets(3); List tableBlockInfoList = new ArrayList<>(); tableBlockInfoList.add(tableBlockInfo); @@ -255,7 +255,7 @@ public FileHolder getFileHolder(FileFactory.FileType fileType) { segmentInfo.setNumberOfColumns(segmentInfo1.getNum_cols()); dataFileFooter.setNumberOfRows(3); dataFileFooter.setSegmentInfo(segmentInfo); - TableBlockInfo info = new TableBlockInfo("file", 1, "0", new String[0], 1, ColumnarFormatVersion.V1); + TableBlockInfo info = new TableBlockInfo("/file.carbondata", 1, "0", new String[0], 1, ColumnarFormatVersion.V1); DataFileFooter result = dataFileFooterConverter.readDataFileFooter(info); assertEquals(result.getNumberOfRows(), 3); } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java new file mode 100644 index 00000000000..cbd3511b4f2 --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.hadoop; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; +import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore; +import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier; +import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonUtilException; + +/** + * CacheClient : Class used to request the segments cache + */ +public class CacheClient { + /** + * List of segments + */ + private List segmentList = + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + /** + * absolute table identifier + */ + private AbsoluteTableIdentifier absoluteTableIdentifier; + + private SegmentTaskIndexStore segmentCache; + + /** + * @param absoluteTableIdentifier + */ + public CacheClient(AbsoluteTableIdentifier absoluteTableIdentifier) { + this.absoluteTableIdentifier = absoluteTableIdentifier; + segmentCache = (SegmentTaskIndexStore) CacheProvider.getInstance() + .createCache(CacheType.DRIVER_BTREE, absoluteTableIdentifier.getStorePath()); + } + + /** + * The method returns the SegmentTaskIndexWrapper from the segments cache + * + * @param tableSegmentUniqueIdentifier + * @return + * @throws CarbonUtilException + */ + public SegmentTaskIndexWrapper getSegmentTaskIndexWrapper( + TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier) throws CarbonUtilException { + SegmentTaskIndexWrapper segmentTaskIndexWrapper; + if (null == tableSegmentUniqueIdentifier.getSegmentToTableBlocksInfos()) { + segmentTaskIndexWrapper = segmentCache.getIfPresent(tableSegmentUniqueIdentifier); + } else { + segmentTaskIndexWrapper = segmentCache.get(tableSegmentUniqueIdentifier); + } + if (null != segmentTaskIndexWrapper) { + segmentList.add(tableSegmentUniqueIdentifier); + } + return segmentTaskIndexWrapper; + } + + /** + * the method is used to clear access count of the unused segments cacheable object + */ + public void close() { + segmentCache.clear(segmentList); + segmentCache =null; + } + + /** + * The method removes invalid segments from the segment level cache + * + * @param invalidSegments + */ + public void removeInvalidSegments(List invalidSegments) { + segmentCache.removeSegments(invalidSegments, absoluteTableIdentifier); + } +} diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java index b69df860f43..2e92842e98a 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java @@ -32,9 +32,11 @@ import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder; import org.apache.carbondata.core.carbon.datastore.IndexKey; import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore; +import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier; import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos; import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties; +import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException; import org.apache.carbondata.core.carbon.datastore.impl.btree.BTreeDataRefNodeFinder; @@ -49,6 +51,7 @@ import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.CarbonUtilException; import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl; import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; @@ -208,6 +211,7 @@ private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration */ @Override public List getSplits(JobContext job) throws IOException { AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); + CacheClient cacheClient = new CacheClient(identifier); List invalidSegments = new ArrayList<>(); // get all valid segments and set them into the configuration @@ -222,7 +226,7 @@ private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration // remove entry in the segment index if there are invalid segments invalidSegments.addAll(segments.getInvalidSegments()); if (invalidSegments.size() > 0) { - SegmentTaskIndexStore.getInstance().removeTableBlocks(invalidSegments, identifier); + cacheClient.removeInvalidSegments(invalidSegments); } } @@ -234,10 +238,14 @@ private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration List splits; try { // do block filtering and get split - splits = getSplits(job, filterInterface); - } catch (IndexBuilderException e) { + splits = getSplits(job, filterInterface, cacheClient); + } catch (IndexBuilderException | CarbonUtilException e) { throw new IOException(e); } + finally { + cacheClient.close(); + cacheClient = null; + } // pass the invalid segment to task side in order to remove index entry in task side if (invalidSegments.size() > 0) { for (InputSplit split : splits) { @@ -272,8 +280,8 @@ private List getSplitsInternal(JobContext job) throws IOException { * @return * @throws IOException */ - private List getSplits(JobContext job, FilterResolverIntf filterResolver) - throws IOException, IndexBuilderException { + private List getSplits(JobContext job, FilterResolverIntf filterResolver, + CacheClient cacheClient) throws IOException, IndexBuilderException, CarbonUtilException { List result = new LinkedList(); @@ -281,12 +289,11 @@ private List getSplits(JobContext job, FilterResolverIntf filterReso AbsoluteTableIdentifier absoluteTableIdentifier = getAbsoluteTableIdentifier(job.getConfiguration()); - //for each segment fetch blocks matching filter in Driver BTree for (String segmentNo : getSegmentsToAccess(job)) { List dataRefNodes = getDataBlocksOfSegment(job, filterExpressionProcessor, absoluteTableIdentifier, - filterResolver, segmentNo); + filterResolver, segmentNo, cacheClient); for (DataRefNode dataRefNode : dataRefNodes) { BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode; TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo(); @@ -318,11 +325,12 @@ private Expression getFilterPredicates(Configuration configuration) { private List getDataBlocksOfSegment(JobContext job, FilterExpressionProcessor filterExpressionProcessor, AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver, - String segmentId) throws IndexBuilderException, IOException { + String segmentId, CacheClient cacheClient) + throws IndexBuilderException, IOException, CarbonUtilException { QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder(); QueryStatistic statistic = new QueryStatistic(); Map segmentIndexMap = - getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId); + getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId, cacheClient); List resultFilterredBlocks = new LinkedList(); @@ -391,23 +399,31 @@ private List getTableBlockInfo(JobContext job, String segmentId) * @throws IndexBuilderException */ private Map getSegmentAbstractIndexs( - JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) - throws IOException, IndexBuilderException { - Map segmentIndexMap = - SegmentTaskIndexStore.getInstance() - .getSegmentBTreeIfExists(absoluteTableIdentifier, segmentId); + JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId, + CacheClient cacheClient) throws IOException, IndexBuilderException, CarbonUtilException { + Map segmentIndexMap = null; + TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = + new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId); + SegmentTaskIndexWrapper segmentTaskIndexWrapper = + cacheClient.getSegmentTaskIndexWrapper(tableSegmentUniqueIdentifier); + if (null != segmentTaskIndexWrapper) { + segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); + } // if segment tree is not loaded, load the segment tree if (segmentIndexMap == null) { + // List fileStatusList = new LinkedList(); List tableBlockInfoList = getTableBlockInfo(job, segmentId); + // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList); Map> segmentToTableBlocksInfos = new HashMap<>(); segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList); // get Btree blocks for given segment - segmentIndexMap = SegmentTaskIndexStore.getInstance() - .loadAndGetTaskIdToSegmentsMap(segmentToTableBlocksInfos, absoluteTableIdentifier); - + tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos); + segmentTaskIndexWrapper = + cacheClient.getSegmentTaskIndexWrapper(tableSegmentUniqueIdentifier); + segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); } return segmentIndexMap; } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java index 82bcf1c6a4c..dd16e57b4f0 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java @@ -31,9 +31,11 @@ import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder; import org.apache.carbondata.core.carbon.datastore.IndexKey; import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore; +import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier; import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos; import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties; +import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException; import org.apache.carbondata.core.carbon.datastore.impl.btree.BTreeDataRefNodeFinder; @@ -43,6 +45,8 @@ import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsRecorder; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.CarbonUtilException; +import org.apache.carbondata.hadoop.CacheClient; import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.carbondata.hadoop.internal.index.Block; import org.apache.carbondata.hadoop.internal.index.Index; @@ -103,20 +107,33 @@ public List filter(JobContext job, FilterResolverIntf filter) private Map getSegmentAbstractIndexs( JobContext job, AbsoluteTableIdentifier identifier) throws IOException, IndexBuilderException { - Map segmentIndexMap = - SegmentTaskIndexStore.getInstance().getSegmentBTreeIfExists(identifier, segment.getId()); - - // if segment tree is not loaded, load the segment tree - if (segmentIndexMap == null) { - List tableBlockInfoList = getTableBlockInfo(job); - Map> segmentToTableBlocksInfos = new HashMap<>(); - segmentToTableBlocksInfos.put(segment.getId(), tableBlockInfoList); - - // TODO: loadAndGetTaskIdToSegmentsMap can be optimized, use tableBlockInfoList as input - // get Btree blocks for given segment - segmentIndexMap = SegmentTaskIndexStore.getInstance() - .loadAndGetTaskIdToSegmentsMap(segmentToTableBlocksInfos, identifier); - + Map segmentIndexMap = null; + CacheClient cacheClient = new CacheClient(identifier); + TableSegmentUniqueIdentifier segmentUniqueIdentifier = + new TableSegmentUniqueIdentifier(identifier, segment.getId()); + try { + SegmentTaskIndexWrapper segmentTaskIndexWrapper = + cacheClient.getSegmentTaskIndexWrapper(segmentUniqueIdentifier); + if (null != segmentTaskIndexWrapper) { + segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); + } + // if segment tree is not loaded, load the segment tree + if (segmentIndexMap == null) { + List tableBlockInfoList = getTableBlockInfo(job); + Map> segmentToTableBlocksInfos = new HashMap<>(); + segmentToTableBlocksInfos.put(segment.getId(), tableBlockInfoList); + segmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos); + // TODO: loadAndGetTaskIdToSegmentsMap can be optimized, use tableBlockInfoList as input + // get Btree blocks for given segment + segmentTaskIndexWrapper = cacheClient.getSegmentTaskIndexWrapper(segmentUniqueIdentifier); + segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); + } + } + catch (CarbonUtilException e) { + throw new IndexBuilderException(e.getMessage(), e); + } + finally { + cacheClient.close(); } return segmentIndexMap; } diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 99dc853ba84..e740caa7145 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -134,9 +134,6 @@ class CarbonMergerRDD[K, V]( result2 = exec.processTableBlocks() } catch { case e: Throwable => - if (null != exec) { - exec.finish() - } LOGGER.error(e) if (null != e.getMessage) { sys.error(s"Exception occurred in query execution :: ${ e.getMessage }") diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index ec6f4562d43..f7f0802c37b 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.sql.hive.CarbonMetastore +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.hadoop.readsupport.impl.RawDataReadSupport import org.apache.carbondata.spark.rdd.SparkReadSupport @@ -40,6 +42,7 @@ object CarbonEnv { val catalog = new CarbonMetastore(cc, cc.storePath, cc.hiveClientInterface, "") carbonEnv = CarbonEnv(catalog) initialized = true + CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true") } } diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala index a1664a6b9e6..451b95d5c96 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala @@ -20,19 +20,20 @@ package org.apache.carbondata.spark.testsuite.datacompaction import java.io.File -import org.apache.carbondata.core.carbon.path.CarbonStorePath -import org.apache.spark.sql.Row +import scala.collection.JavaConverters._ + import org.apache.spark.sql.common.util.CarbonHiveContext._ import org.apache.spark.sql.common.util.QueryTest +import org.scalatest.BeforeAndAfterAll + import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier +import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper +import org.apache.carbondata.core.carbon.path.CarbonStorePath import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.load.LoadMetadataDetails import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.lcm.status +import org.apache.carbondata.hadoop.CacheClient import org.apache.carbondata.lcm.status.SegmentStatusManager -import org.scalatest.BeforeAndAfterAll - -import scala.collection.JavaConverters._ /** * FT for compaction scenario where major segment should not be included in minor. @@ -127,7 +128,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll test("delete merged folder and check segments") { // delete merged segments sql("clean files for table ignoremajor") - + sql("select * from ignoremajor").show() val identifier = new AbsoluteTableIdentifier( CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), new CarbonTableIdentifier( @@ -140,6 +141,10 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll assert(segments.contains("2.1")) assert(!segments.contains("2")) assert(!segments.contains("3")) + val cacheClient = new CacheClient(identifier); + val segmentIdentifier = new TableSegmentUniqueIdentifier(identifier, "2") + val wrapper: SegmentTaskIndexWrapper = cacheClient.getSegmentTaskIndexWrapper(segmentIdentifier) + assert(null == wrapper) } diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java index 328e33b78ad..136a8f362f7 100644 --- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java +++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; @@ -28,13 +29,18 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; import org.apache.carbondata.core.carbon.CarbonTableIdentifier; import org.apache.carbondata.core.carbon.ColumnarFormatVersion; import org.apache.carbondata.core.carbon.datastore.BlockIndexStore; import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; -import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException; +import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtilException; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.test.util.StoreCreator; @@ -47,7 +53,8 @@ public class BlockIndexStoreTest extends TestCase { - private BlockIndexStore indexStore; + // private BlockIndexStore indexStore; + BlockIndexStore cache; private String property; @BeforeClass public void setUp() { @@ -55,7 +62,10 @@ public class BlockIndexStoreTest extends TestCase { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "V1"); StoreCreator.createCarbonStore(); - indexStore = BlockIndexStore.getInstance(); + CarbonProperties.getInstance(). + addProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE, "10"); + CacheProvider cacheProvider = CacheProvider.getInstance(); + cache = (BlockIndexStore) cacheProvider.createCache(CacheType.EXECUTOR_BTREE, ""); } @AfterClass public void tearDown() { @@ -66,7 +76,8 @@ public class BlockIndexStoreTest extends TestCase { } } - @Test public void testloadAndGetTaskIdToSegmentsMapForSingleSegment() throws IOException { + @Test public void testLoadAndGetTaskIdToSegmentsMapForSingleSegment() + throws IOException, CarbonUtilException { String canonicalPath = new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath(); File file = getPartFile(); @@ -78,13 +89,26 @@ public class BlockIndexStoreTest extends TestCase { AbsoluteTableIdentifier absoluteTableIdentifier = new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier); try { - List loadAndGetBlocks = indexStore - .loadAndGetBlocks(Arrays.asList(new TableBlockInfo[] { info }), absoluteTableIdentifier); + + List tableBlockInfoList = + getTableBlockUniqueIdentifierList(Arrays.asList(new TableBlockInfo[] { info }), absoluteTableIdentifier); + List loadAndGetBlocks = cache.getAll(tableBlockInfoList); assertTrue(loadAndGetBlocks.size() == 1); - } catch (IndexBuilderException e) { + } catch (CarbonUtilException e) { assertTrue(false); } - indexStore.clear(absoluteTableIdentifier); + List segmentIds = new ArrayList<>(); + segmentIds.add(info.getSegmentId()); + cache.removeTableBlocks(segmentIds, absoluteTableIdentifier); + } + + private List getTableBlockUniqueIdentifierList(List tableBlockInfos, + AbsoluteTableIdentifier absoluteTableIdentifier) { + List tableBlockUniqueIdentifiers = new ArrayList<>(); + for (TableBlockInfo tableBlockInfo : tableBlockInfos) { + tableBlockUniqueIdentifiers.add(new TableBlockUniqueIdentifier(absoluteTableIdentifier, tableBlockInfo)); + } + return tableBlockUniqueIdentifiers; } @Test public void testloadAndGetTaskIdToSegmentsMapForSameBlockLoadedConcurrently() @@ -130,16 +154,21 @@ public class BlockIndexStoreTest extends TestCase { } catch (InterruptedException e) { e.printStackTrace(); } - + List tableBlockInfos = + Arrays.asList(new TableBlockInfo[] { info, info1, info2, info3, info4 }); try { - List loadAndGetBlocks = indexStore.loadAndGetBlocks( - Arrays.asList(new TableBlockInfo[] { info, info1, info2, info3, info4 }), - absoluteTableIdentifier); + List tableBlockUniqueIdentifiers = + getTableBlockUniqueIdentifierList(tableBlockInfos, absoluteTableIdentifier); + List loadAndGetBlocks = cache.getAll(tableBlockUniqueIdentifiers); assertTrue(loadAndGetBlocks.size() == 5); - } catch (IndexBuilderException e) { + } catch (CarbonUtilException e) { assertTrue(false); } - indexStore.clear(absoluteTableIdentifier); + List segmentIds = new ArrayList<>(); + for (TableBlockInfo tableBlockInfo : tableBlockInfos) { + segmentIds.add(tableBlockInfo.getSegmentId()); + } + cache.removeTableBlocks(segmentIds, absoluteTableIdentifier); } @Test public void testloadAndGetTaskIdToSegmentsMapForDifferentSegmentLoadedConcurrently() @@ -197,15 +226,21 @@ public class BlockIndexStoreTest extends TestCase { // TODO Auto-generated catch block e.printStackTrace(); } + List tableBlockInfos = Arrays + .asList(new TableBlockInfo[] { info, info1, info2, info3, info4, info5, info6, info7 }); try { - List loadAndGetBlocks = indexStore.loadAndGetBlocks(Arrays - .asList(new TableBlockInfo[] { info, info1, info2, info3, info4, info5, info6, info7 }), - absoluteTableIdentifier); + List blockUniqueIdentifierList = + getTableBlockUniqueIdentifierList(tableBlockInfos, absoluteTableIdentifier); + List loadAndGetBlocks = cache.getAll(blockUniqueIdentifierList); assertTrue(loadAndGetBlocks.size() == 8); - } catch (IndexBuilderException e) { + } catch (CarbonUtilException e) { assertTrue(false); } - indexStore.clear(absoluteTableIdentifier); + List segmentIds = new ArrayList<>(); + for (TableBlockInfo tableBlockInfo : tableBlockInfos) { + segmentIds.add(tableBlockInfo.getSegmentId()); + } + cache.removeTableBlocks(segmentIds, absoluteTableIdentifier); } private class BlockLoaderThread implements Callable { @@ -214,13 +249,14 @@ private class BlockLoaderThread implements Callable { public BlockLoaderThread(List tableBlockInfoList, AbsoluteTableIdentifier absoluteTableIdentifier) { - // TODO Auto-generated constructor stub this.tableBlockInfoList = tableBlockInfoList; this.absoluteTableIdentifier = absoluteTableIdentifier; } @Override public Void call() throws Exception { - indexStore.loadAndGetBlocks(tableBlockInfoList, absoluteTableIdentifier); + List tableBlockUniqueIdentifierList = + getTableBlockUniqueIdentifierList(tableBlockInfoList, absoluteTableIdentifier); + cache.getAll(tableBlockUniqueIdentifierList); return null; } From d53feefd60dbedd48ec9582751ac694316cf97f2 Mon Sep 17 00:00:00 2001 From: Venkata Ramana G Date: Tue, 3 Jan 2017 22:57:21 +0530 Subject: [PATCH 2/2] [CARBONDATA-484] fixed impacted test cases + refactored design and fixed review comments --- .../apache/carbondata/core/cache/Cache.java | 14 ++- .../carbondata/core/cache/CacheProvider.java | 5 +- .../dictionary/ForwardDictionaryCache.java | 11 ++- .../dictionary/ReverseDictionaryCache.java | 9 ++ .../carbon/datastore/BlockIndexStore.java | 9 ++ .../datastore/SegmentTaskIndexStore.java | 21 +--- .../datastore/SegmentTaskIndexStoreTest.java | 4 +- .../carbondata/hadoop/CacheAccessClient.java | 99 +++++++++++++++++++ .../apache/carbondata/hadoop/CacheClient.java | 73 +++----------- .../carbondata/hadoop/CarbonInputFormat.java | 15 ++- .../index/impl/InMemoryBTreeIndex.java | 6 +- .../MajorCompactionIgnoreInMinorTest.scala | 6 +- 12 files changed, 177 insertions(+), 95 deletions(-) create mode 100644 hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java diff --git a/core/src/main/java/org/apache/carbondata/core/cache/Cache.java b/core/src/main/java/org/apache/carbondata/core/cache/Cache.java index b519deb42da..c87e7d9c2c9 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/Cache.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/Cache.java @@ -29,6 +29,7 @@ * either evicted or manually invalidated. * Implementations of this interface are expected to be thread-safe, and can be safely accessed * by multiple concurrent threads. + * This class also responsible for incrementing and decrementing access count during get operation */ public interface Cache { @@ -36,6 +37,8 @@ public interface Cache { * This method will get the value for the given key. If value does not exist * for the given key, it will check and load the value. * + * Access count of Cacheable entry will be incremented + * * @param key * @return * @throws CarbonUtilException in case memory is not sufficient to load data into memory @@ -45,7 +48,7 @@ public interface Cache { /** * This method will return a list of values for the given list of keys. * For each key, this method will check and load the data if required. - * + * Access count of Cacheable entry will be incremented * @param keys * @return * @throws CarbonUtilException in case memory is not sufficient to load data into memory @@ -55,7 +58,7 @@ public interface Cache { /** * This method will return the value for the given key. It will not check and load * the data for the given key - * + * Access count of Cacheable entry will be incremented * @param key * @return */ @@ -67,5 +70,12 @@ public interface Cache { * @param key */ void invalidate(K key); + + /** + * Access count of Cacheable entry will be decremented + * + * @param keys + */ + void clearAccessCount(List keys); } diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java index 7d92ca27edf..412f0944f82 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java @@ -30,9 +30,7 @@ import org.apache.carbondata.core.cache.dictionary.ReverseDictionaryCache; import org.apache.carbondata.core.carbon.datastore.BlockIndexStore; import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore; -import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier; import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; -import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper; import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; @@ -129,8 +127,7 @@ private void createDictionaryCacheForGivenType(CacheType cacheType, String carbo carbonLRUCache); } else if (cacheType.equals(cacheType.DRIVER_BTREE)) { cacheObject = - new SegmentTaskIndexStore( - carbonStorePath, carbonLRUCache); + new SegmentTaskIndexStore(carbonStorePath, carbonLRUCache); } cacheTypeToCacheMap.put(cacheType, cacheObject); } diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCache.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCache.java index e6fc9d8dc21..ff30c739b20 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCache.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCache.java @@ -167,7 +167,7 @@ private Dictionary getDictionary( throws CarbonUtilException { Dictionary forwardDictionary = null; // dictionary is only for primitive data type - assert(!dictionaryColumnUniqueIdentifier.getDataType().isComplexType()); + assert (!dictionaryColumnUniqueIdentifier.getDataType().isComplexType()); String columnIdentifier = dictionaryColumnUniqueIdentifier.getColumnIdentifier().getColumnId(); ColumnDictionaryInfo columnDictionaryInfo = getColumnDictionaryInfo(dictionaryColumnUniqueIdentifier, columnIdentifier); @@ -202,4 +202,13 @@ private ColumnDictionaryInfo getColumnDictionaryInfo( } return columnDictionaryInfo; } + + @Override public void clearAccessCount(List keys) { + for (DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier : keys) { + Dictionary cacheable = (Dictionary) carbonLRUCache.get( + getLruCacheKey(dictionaryColumnUniqueIdentifier.getColumnIdentifier().getColumnId(), + CacheType.FORWARD_DICTIONARY)); + cacheable.clear(); + } + } } diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCache.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCache.java index 2a0cd38e6ac..fab767f0af5 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCache.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCache.java @@ -203,4 +203,13 @@ private ColumnReverseDictionaryInfo getColumnReverseDictionaryInfo( } return columnReverseDictionaryInfo; } + + @Override public void clearAccessCount(List keys) { + for (DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier : keys) { + Dictionary cacheable = (Dictionary) carbonLRUCache.get( + getLruCacheKey(dictionaryColumnUniqueIdentifier.getColumnIdentifier().getColumnId(), + CacheType.REVERSE_DICTIONARY)); + cacheable.clear(); + } + } } diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java index 9b5818fa063..a4523382ac3 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java @@ -37,6 +37,7 @@ import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; import org.apache.carbondata.core.carbon.datastore.block.BlockIndex; import org.apache.carbondata.core.carbon.datastore.block.BlockInfo; +import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier; import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException; @@ -226,6 +227,14 @@ private String getLruCacheKey(AbsoluteTableIdentifier absoluteTableIdentifier, .remove(getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo)); } + @Override public void clearAccessCount(List keys) { + for (TableBlockUniqueIdentifier tableBlockUniqueIdentifier : keys) { + SegmentTaskIndexWrapper cacheable = (SegmentTaskIndexWrapper) lruCache + .get(tableBlockUniqueIdentifier.getUniqueTableBlockName()); + cacheable.clear(); + } + } + /** * Below method will be used to fill the loaded blocks to the array * which will be used for query execution diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java index 83e06e98aa6..ef0ac627392 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java @@ -46,7 +46,7 @@ * Class to handle loading, unloading,clearing,storing of the table * blocks */ -public class SegmentTaskIndexStore +public class SegmentTaskIndexStore implements Cache { private static final LogService LOGGER = LogServiceFactory.getLogService(SegmentTaskIndexStore.class.getName()); @@ -321,28 +321,13 @@ private AbstractIndex loadBlocks(TaskBucketHolder taskBucketHolder, return segment; } - /** - * Below method will be used to remove the segment based on - * segment id is passed - * - * @param segmentToBeRemoved segment to be removed - * @param absoluteTableIdentifier absoluteTableIdentifier - */ - public void removeSegments(List segmentToBeRemoved, - AbsoluteTableIdentifier absoluteTableIdentifier) { - for (String segmentId : segmentToBeRemoved) { - TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = - new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId); - lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); - } - } - /** * The method clears the access count of table segments * * @param tableSegmentUniqueIdentifiers */ - public void clear(List tableSegmentUniqueIdentifiers) { + @Override + public void clearAccessCount(List tableSegmentUniqueIdentifiers) { for (TableSegmentUniqueIdentifier segmentUniqueIdentifier : tableSegmentUniqueIdentifiers) { SegmentTaskIndexWrapper cacheable = (SegmentTaskIndexWrapper) lruCache .get(segmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java index 3490917cb32..8adb8bb7137 100644 --- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java +++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java @@ -60,7 +60,9 @@ public class SegmentTaskIndexStoreTest { @BeforeClass public static void setUp() { CacheProvider cacheProvider = CacheProvider.getInstance(); - taskIndexStore = (SegmentTaskIndexStore) cacheProvider.createCache(CacheType.DRIVER_BTREE, ""); + taskIndexStore = (SegmentTaskIndexStore) cacheProvider. + + createCache(CacheType.DRIVER_BTREE, ""); tableBlockInfo = new TableBlockInfo("file", 0L, "SG100", locations, 10L, ColumnarFormatVersion.valueOf(version)); absoluteTableIdentifier = new AbsoluteTableIdentifier("/tmp", diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java new file mode 100644 index 00000000000..27600353ae5 --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.hadoop; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonUtilException; + +/** + * CacheClient : Class used to request the segments cache + */ +public class CacheAccessClient { + /** + * List of segments + */ + private List segmentList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + /** + * absolute table identifier + */ + private AbsoluteTableIdentifier absoluteTableIdentifier; + + private Cache cache; + + public CacheAccessClient(Cache cache) { + this.cache = cache; + } + + /** + * This method will return the value for the given key. It will not check and load + * the data for the given key + * + * @param key + * @return + */ + public V getIfPresent(K key) { + V value = cache.getIfPresent(key); + if (value != null) { + segmentList.add(key); + } + return value; + } + + /** + * This method will get the value for the given key. If value does not exist + * for the given key, it will check and load the value. + * + * @param key + * @return + * @throws CarbonUtilException in case memory is not sufficient to load data into memory + */ + public V get(K key) throws CarbonUtilException { + V value = cache.get(key); + if (value != null) { + segmentList.add(key); + } + return value; + } + + /** + * the method is used to clear access count of the unused segments cacheable object + */ + public void close() { + cache.clearAccessCount(segmentList); + cache = null; + } + + /** + * This method will remove the cache for a given key + * + * @param keys + */ + public void invalidateAll(List keys) { + for (K key : keys) { + cache.invalidate(key); + } + } + +} diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java index cbd3511b4f2..1982e2ca9d6 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java @@ -18,79 +18,34 @@ */ package org.apache.carbondata.hadoop; -import java.util.ArrayList; -import java.util.List; - +import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.CacheProvider; import org.apache.carbondata.core.cache.CacheType; -import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; -import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore; import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier; import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.util.CarbonUtilException; /** - * CacheClient : Class used to request the segments cache + * CacheClient : Holds all the Cache access clients for Btree, Dictionary */ public class CacheClient { - /** - * List of segments - */ - private List segmentList = - new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - /** - * absolute table identifier - */ - private AbsoluteTableIdentifier absoluteTableIdentifier; - private SegmentTaskIndexStore segmentCache; + // segment access client for driver LRU cache + private CacheAccessClient + segmentAccessClient; - /** - * @param absoluteTableIdentifier - */ - public CacheClient(AbsoluteTableIdentifier absoluteTableIdentifier) { - this.absoluteTableIdentifier = absoluteTableIdentifier; - segmentCache = (SegmentTaskIndexStore) CacheProvider.getInstance() - .createCache(CacheType.DRIVER_BTREE, absoluteTableIdentifier.getStorePath()); + public CacheClient(String storePath) { + Cache segmentCache = CacheProvider + .getInstance().createCache( + CacheType.DRIVER_BTREE, storePath); + segmentAccessClient = new CacheAccessClient<>(segmentCache); } - /** - * The method returns the SegmentTaskIndexWrapper from the segments cache - * - * @param tableSegmentUniqueIdentifier - * @return - * @throws CarbonUtilException - */ - public SegmentTaskIndexWrapper getSegmentTaskIndexWrapper( - TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier) throws CarbonUtilException { - SegmentTaskIndexWrapper segmentTaskIndexWrapper; - if (null == tableSegmentUniqueIdentifier.getSegmentToTableBlocksInfos()) { - segmentTaskIndexWrapper = segmentCache.getIfPresent(tableSegmentUniqueIdentifier); - } else { - segmentTaskIndexWrapper = segmentCache.get(tableSegmentUniqueIdentifier); - } - if (null != segmentTaskIndexWrapper) { - segmentList.add(tableSegmentUniqueIdentifier); - } - return segmentTaskIndexWrapper; + public CacheAccessClient + getSegmentAccessClient() { + return segmentAccessClient; } - /** - * the method is used to clear access count of the unused segments cacheable object - */ public void close() { - segmentCache.clear(segmentList); - segmentCache =null; - } - - /** - * The method removes invalid segments from the segment level cache - * - * @param invalidSegments - */ - public void removeInvalidSegments(List invalidSegments) { - segmentCache.removeSegments(invalidSegments, absoluteTableIdentifier); + segmentAccessClient.close(); } } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java index 2e92842e98a..7e9dc7ac94e 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java @@ -211,7 +211,7 @@ private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration */ @Override public List getSplits(JobContext job) throws IOException { AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); - CacheClient cacheClient = new CacheClient(identifier); + CacheClient cacheClient = new CacheClient(identifier.getStorePath()); List invalidSegments = new ArrayList<>(); // get all valid segments and set them into the configuration @@ -226,7 +226,12 @@ private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration // remove entry in the segment index if there are invalid segments invalidSegments.addAll(segments.getInvalidSegments()); if (invalidSegments.size() > 0) { - cacheClient.removeInvalidSegments(invalidSegments); + List invalidSegmentsIds + = new ArrayList<>(invalidSegments.size()); + for(String segId: invalidSegments) { + invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(identifier, segId)); + } + cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds); } } @@ -404,8 +409,8 @@ private Map getSegmentAbs Map segmentIndexMap = null; TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId); - SegmentTaskIndexWrapper segmentTaskIndexWrapper = - cacheClient.getSegmentTaskIndexWrapper(tableSegmentUniqueIdentifier); + SegmentTaskIndexWrapper segmentTaskIndexWrapper = (SegmentTaskIndexWrapper) + cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier); if (null != segmentTaskIndexWrapper) { segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); } @@ -422,7 +427,7 @@ private Map getSegmentAbs // get Btree blocks for given segment tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos); segmentTaskIndexWrapper = - cacheClient.getSegmentTaskIndexWrapper(tableSegmentUniqueIdentifier); + cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier); segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); } return segmentIndexMap; diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java index dd16e57b4f0..9d8b13641e3 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java @@ -108,12 +108,12 @@ private Map getSegmentAbs JobContext job, AbsoluteTableIdentifier identifier) throws IOException, IndexBuilderException { Map segmentIndexMap = null; - CacheClient cacheClient = new CacheClient(identifier); + CacheClient cacheClient = new CacheClient(identifier.getStorePath()); TableSegmentUniqueIdentifier segmentUniqueIdentifier = new TableSegmentUniqueIdentifier(identifier, segment.getId()); try { SegmentTaskIndexWrapper segmentTaskIndexWrapper = - cacheClient.getSegmentTaskIndexWrapper(segmentUniqueIdentifier); + cacheClient.getSegmentAccessClient().getIfPresent(segmentUniqueIdentifier); if (null != segmentTaskIndexWrapper) { segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); } @@ -125,7 +125,7 @@ private Map getSegmentAbs segmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos); // TODO: loadAndGetTaskIdToSegmentsMap can be optimized, use tableBlockInfoList as input // get Btree blocks for given segment - segmentTaskIndexWrapper = cacheClient.getSegmentTaskIndexWrapper(segmentUniqueIdentifier); + segmentTaskIndexWrapper = cacheClient.getSegmentAccessClient().get(segmentUniqueIdentifier); segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); } } diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala index 451b95d5c96..37ce089d1b3 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala @@ -141,9 +141,11 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll assert(segments.contains("2.1")) assert(!segments.contains("2")) assert(!segments.contains("3")) - val cacheClient = new CacheClient(identifier); + val cacheClient = new CacheClient(CarbonProperties.getInstance. + getProperty(CarbonCommonConstants.STORE_LOCATION)); val segmentIdentifier = new TableSegmentUniqueIdentifier(identifier, "2") - val wrapper: SegmentTaskIndexWrapper = cacheClient.getSegmentTaskIndexWrapper(segmentIdentifier) + val wrapper: SegmentTaskIndexWrapper = cacheClient.getSegmentAccessClient. + getIfPresent(segmentIdentifier) assert(null == wrapper) }