Skip to content

Commit

Permalink
[CACRBONDATA-484] Implement LRU cache for B-Tree. This closes apache#454
Browse files Browse the repository at this point in the history
  • Loading branch information
gvramana committed Jan 3, 2017
2 parents cb21480 + d53feef commit b966043
Show file tree
Hide file tree
Showing 34 changed files with 1,533 additions and 566 deletions.
14 changes: 12 additions & 2 deletions core/src/main/java/org/apache/carbondata/core/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@
* either evicted or manually invalidated.
* Implementations of this interface are expected to be thread-safe, and can be safely accessed
* by multiple concurrent threads.
* This class also responsible for incrementing and decrementing access count during get operation
*/
public interface Cache<K, V> {

/**
* This method will get the value for the given key. If value does not exist
* for the given key, it will check and load the value.
*
* Access count of Cacheable entry will be incremented
*
* @param key
* @return
* @throws CarbonUtilException in case memory is not sufficient to load data into memory
Expand All @@ -45,7 +48,7 @@ public interface Cache<K, V> {
/**
* This method will return a list of values for the given list of keys.
* For each key, this method will check and load the data if required.
*
* Access count of Cacheable entry will be incremented
* @param keys
* @return
* @throws CarbonUtilException in case memory is not sufficient to load data into memory
Expand All @@ -55,7 +58,7 @@ public interface Cache<K, V> {
/**
* This method will return the value for the given key. It will not check and load
* the data for the given key
*
* Access count of Cacheable entry will be incremented
* @param key
* @return
*/
Expand All @@ -67,5 +70,12 @@ public interface Cache<K, V> {
* @param key
*/
void invalidate(K key);

/**
* Access count of Cacheable entry will be decremented
*
* @param keys
*/
void clearAccessCount(List<K> keys);
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,18 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.core.cache.dictionary.ForwardDictionaryCache;
import org.apache.carbondata.core.cache.dictionary.ReverseDictionaryCache;
import org.apache.carbondata.core.carbon.datastore.BlockIndexStore;
import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore;
import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.util.CarbonProperties;

/**
* Cache provider class which will create a cache based on given type
Expand All @@ -45,15 +52,19 @@ public class CacheProvider {
new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);

/**
* a map that will hold the mapping of cache type to LRU cache instance
* object lock instance to be used in synchronization block
*/
private Map<CacheType, CarbonLRUCache> cacheTypeToLRUCacheMap =
new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
private final Object lock = new Object();
/**
* LRU cache instance
*/
private CarbonLRUCache carbonLRUCache;

/**
* object lock instance to be used in synchronization block
* instance for CacheProvider LOGGER
*/
private final Object lock = new Object();
private static final LogService LOGGER =
LogServiceFactory.getLogService(CacheProvider.class.getName());

/**
* private constructor to follow singleton design pattern for this class
Expand Down Expand Up @@ -85,7 +96,7 @@ public <K, V> Cache<K, V> createCache(CacheType cacheType, String carbonStorePat
if (!dictionaryCacheAlreadyExists(cacheType)) {
synchronized (lock) {
if (!dictionaryCacheAlreadyExists(cacheType)) {
if (null == cacheTypeToLRUCacheMap.get(cacheType)) {
if (null == carbonLRUCache) {
createLRULevelCacheInstance(cacheType);
}
createDictionaryCacheForGivenType(cacheType, carbonStorePath);
Expand All @@ -106,11 +117,17 @@ private void createDictionaryCacheForGivenType(CacheType cacheType, String carbo
if (cacheType.equals(CacheType.REVERSE_DICTIONARY)) {
cacheObject =
new ReverseDictionaryCache<DictionaryColumnUniqueIdentifier, Dictionary>(carbonStorePath,
cacheTypeToLRUCacheMap.get(cacheType));
carbonLRUCache);
} else if (cacheType.equals(CacheType.FORWARD_DICTIONARY)) {
cacheObject =
new ForwardDictionaryCache<DictionaryColumnUniqueIdentifier, Dictionary>(carbonStorePath,
cacheTypeToLRUCacheMap.get(cacheType));
carbonLRUCache);
} else if (cacheType.equals(cacheType.EXECUTOR_BTREE)) {
cacheObject = new BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex>(carbonStorePath,
carbonLRUCache);
} else if (cacheType.equals(cacheType.DRIVER_BTREE)) {
cacheObject =
new SegmentTaskIndexStore(carbonStorePath, carbonLRUCache);
}
cacheTypeToCacheMap.put(cacheType, cacheObject);
}
Expand All @@ -121,15 +138,25 @@ private void createDictionaryCacheForGivenType(CacheType cacheType, String carbo
* @param cacheType
*/
private void createLRULevelCacheInstance(CacheType cacheType) {
CarbonLRUCache carbonLRUCache = null;
// if cache type is dictionary cache, then same lru cache instance has to be shared
// between forward and reverse cache
if (cacheType.equals(CacheType.REVERSE_DICTIONARY) || cacheType
.equals(CacheType.FORWARD_DICTIONARY)) {
carbonLRUCache = new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_LEVEL_CACHE_SIZE,
CarbonCommonConstants.CARBON_MAX_LEVEL_CACHE_SIZE_DEFAULT);
cacheTypeToLRUCacheMap.put(CacheType.REVERSE_DICTIONARY, carbonLRUCache);
cacheTypeToLRUCacheMap.put(CacheType.FORWARD_DICTIONARY, carbonLRUCache);
boolean isDriver = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "false"));
if (isDriver) {
carbonLRUCache = new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE,
CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT);
} else {
// if executor cache size is not configured then driver cache conf will be used
String executorCacheSize = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE);
if (null != executorCacheSize) {
carbonLRUCache =
new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE,
CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT);
} else {
LOGGER.info(
"Executor LRU cache size not configured. Initializing with driver LRU cache size.");
carbonLRUCache = new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE,
CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT);
}
}
}

Expand All @@ -148,7 +175,10 @@ private boolean dictionaryCacheAlreadyExists(CacheType cacheType) {
* Below method will be used to clear the cache
*/
public void dropAllCache() {
cacheTypeToLRUCacheMap.clear();
if(null != carbonLRUCache) {
carbonLRUCache.clear();
carbonLRUCache= null;
}
cacheTypeToCacheMap.clear();
}
}
16 changes: 16 additions & 0 deletions core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier;
import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper;
import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier;

/**
* class which defines different cache types. cache type can be dictionary cache for
Expand All @@ -41,6 +45,18 @@ public class CacheType<K, V> {
public static final CacheType<DictionaryColumnUniqueIdentifier, Dictionary> REVERSE_DICTIONARY =
new CacheType("reverse_dictionary");

/**
* Executor BTree cache which maintains size of BTree metadata
*/
public static final CacheType<TableBlockUniqueIdentifier, AbstractIndex> EXECUTOR_BTREE =
new CacheType("executor_btree");

/**
* Executor BTree cache which maintains size of BTree metadata
*/
public static final CacheType<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
DRIVER_BTREE = new CacheType("driver_btree");

/**
* cacheName which is unique name for a cache
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ public CarbonLRUCache(String propertyName, String defaultPropertyName) {
}
initCache();
if (lruCacheMemorySize > 0) {
LOGGER.info("Configured level cahce size is " + lruCacheMemorySize + " MB");
LOGGER.info("Configured LRU cache size is " + lruCacheMemorySize + " MB");
// convert in bytes
lruCacheMemorySize = lruCacheMemorySize * BYTE_CONVERSION_CONSTANT;
} else {
LOGGER.info("Column cache size not configured. Therefore default behavior will be "
LOGGER.info("LRU cache size not configured. Therefore default behavior will be "
+ "considered and no LRU based eviction of columns will be done");
}
}
Expand Down Expand Up @@ -159,8 +159,10 @@ private void removeKey(String key) {
if (null != cacheable) {
currentSize = currentSize - cacheable.getMemorySize();
}
lruCacheMap.remove(key);
LOGGER.info("Removed level entry from InMemory level lru cache :: " + key);
Cacheable remove = lruCacheMap.remove(key);
if(null != remove) {
LOGGER.info("Removed entry from InMemory lru cache :: " + key);
}
}

/**
Expand All @@ -171,24 +173,53 @@ private void removeKey(String key) {
* @param cacheInfo
*/
public boolean put(String columnIdentifier, Cacheable cacheInfo, long requiredSize) {
LOGGER.debug("Required size for entry " + columnIdentifier + " :: " + requiredSize
+ " Current cache size :: " + currentSize);
boolean columnKeyAddedSuccessfully = false;
if (freeMemorySizeForAddingCache(requiredSize)) {
if (isLRUCacheSizeConfigured()) {
synchronized (lruCacheMap) {
currentSize = currentSize + requiredSize;
if (null == lruCacheMap.get(columnIdentifier)) {
lruCacheMap.put(columnIdentifier, cacheInfo);
if (freeMemorySizeForAddingCache(requiredSize)) {
currentSize = currentSize + requiredSize;
addEntryToLRUCacheMap(columnIdentifier, cacheInfo);
columnKeyAddedSuccessfully = true;
} else {
LOGGER.error(
"Size not available. Entry cannot be added to lru cache :: " + columnIdentifier
+ " .Required Size = " + requiredSize + " Size available " + (lruCacheMemorySize
- currentSize));
}
columnKeyAddedSuccessfully = true;
}
LOGGER.debug("Added level entry to InMemory level lru cache :: " + columnIdentifier);
} else {
LOGGER.error("Size not available. Column cannot be added to level lru cache :: "
+ columnIdentifier + " .Required Size = " + requiredSize + " Size available "
+ (lruCacheMemorySize - currentSize));
synchronized (lruCacheMap) {
addEntryToLRUCacheMap(columnIdentifier, cacheInfo);
}
columnKeyAddedSuccessfully = true;
}
return columnKeyAddedSuccessfully;
}

/**
* The method will add the cache entry to LRU cache map
*
* @param columnIdentifier
* @param cacheInfo
*/
private void addEntryToLRUCacheMap(String columnIdentifier, Cacheable cacheInfo) {
if (null == lruCacheMap.get(columnIdentifier)) {
lruCacheMap.put(columnIdentifier, cacheInfo);
}
LOGGER.debug("Added entry to InMemory lru cache :: " + columnIdentifier);
}

/**
* this will check whether the LRU cache size is configured
*
* @return <Boolean> value
*/
private boolean isLRUCacheSizeConfigured() {
return lruCacheMemorySize > 0;
}

/**
* This method will check a required column can be loaded into memory or not. If required
* this method will call for eviction of existing data from memory
Expand All @@ -198,24 +229,18 @@ public boolean put(String columnIdentifier, Cacheable cacheInfo, long requiredSi
*/
private boolean freeMemorySizeForAddingCache(long requiredSize) {
boolean memoryAvailable = false;
if (lruCacheMemorySize > 0) {
if (isSizeAvailableToLoadColumnDictionary(requiredSize)) {
memoryAvailable = true;
} else {
// get the keys that can be removed from memory
List<String> keysToBeRemoved = getKeysToBeRemoved(requiredSize);
for (String cacheKey : keysToBeRemoved) {
removeKey(cacheKey);
}
// after removing the keys check again if required size is available
if (isSizeAvailableToLoadColumnDictionary(requiredSize)) {
memoryAvailable = true;
} else {
synchronized (lruCacheMap) {
// get the keys that can be removed from memory
List<String> keysToBeRemoved = getKeysToBeRemoved(requiredSize);
for (String cacheKey : keysToBeRemoved) {
removeKey(cacheKey);
}
// after removing the keys check again if required size is available
if (isSizeAvailableToLoadColumnDictionary(requiredSize)) {
memoryAvailable = true;
}
}
}
} else {
memoryAvailable = true;
}
return memoryAvailable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private Dictionary getDictionary(
throws CarbonUtilException {
Dictionary forwardDictionary = null;
// dictionary is only for primitive data type
assert(!dictionaryColumnUniqueIdentifier.getDataType().isComplexType());
assert (!dictionaryColumnUniqueIdentifier.getDataType().isComplexType());
String columnIdentifier = dictionaryColumnUniqueIdentifier.getColumnIdentifier().getColumnId();
ColumnDictionaryInfo columnDictionaryInfo =
getColumnDictionaryInfo(dictionaryColumnUniqueIdentifier, columnIdentifier);
Expand Down Expand Up @@ -202,4 +202,13 @@ private ColumnDictionaryInfo getColumnDictionaryInfo(
}
return columnDictionaryInfo;
}

@Override public void clearAccessCount(List<DictionaryColumnUniqueIdentifier> keys) {
for (DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier : keys) {
Dictionary cacheable = (Dictionary) carbonLRUCache.get(
getLruCacheKey(dictionaryColumnUniqueIdentifier.getColumnIdentifier().getColumnId(),
CacheType.FORWARD_DICTIONARY));
cacheable.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,13 @@ private ColumnReverseDictionaryInfo getColumnReverseDictionaryInfo(
}
return columnReverseDictionaryInfo;
}

@Override public void clearAccessCount(List<DictionaryColumnUniqueIdentifier> keys) {
for (DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier : keys) {
Dictionary cacheable = (Dictionary) carbonLRUCache.get(
getLruCacheKey(dictionaryColumnUniqueIdentifier.getColumnIdentifier().getColumnId(),
CacheType.REVERSE_DICTIONARY));
cacheable.clear();
}
}
}
Loading

0 comments on commit b966043

Please sign in to comment.