Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ public MockDiskCachePlugin() {}

@Override
public Map<String, ICache.Factory> getCacheFactoryMap() {
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 10000, false, 1));
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 10000, false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.function.ToLongBiFunction;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORAGE_PATH;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_SIZE;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS;
Expand All @@ -76,6 +77,7 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
private static final Logger logger = LogManager.getLogger(TieredSpilloverCache.class);

static final String ZERO_SEGMENT_COUNT_EXCEPTION_MESSAGE = "Segment count cannot be less than one for tiered cache";
static final String INVALID_STORAGE_PATH_EXCEPTION_MESSAGE = "Storage path for disk cache cannot be null or empty";

// In future we want to just read the stats from the individual tiers' statsHolder objects, but this isn't
// possible right now because of the way computeIfAbsent is implemented.
Expand All @@ -85,6 +87,7 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
private final int numberOfSegments;

final TieredSpilloverCacheSegment<K, V>[] tieredSpilloverCacheSegments;
final String diskCacheStoragePath;

/**
* This map is used to handle concurrent requests for same key in computeIfAbsent() to ensure we load the value
Expand All @@ -99,6 +102,10 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
if (builder.numberOfSegments <= 0) {
throw new IllegalArgumentException(ZERO_SEGMENT_COUNT_EXCEPTION_MESSAGE);
}
if (builder.diskCacheStoragePath == null || builder.diskCacheStoragePath.isBlank()) {
throw new IllegalArgumentException(INVALID_STORAGE_PATH_EXCEPTION_MESSAGE);
}
this.diskCacheStoragePath = builder.diskCacheStoragePath;
this.numberOfSegments = builder.numberOfSegments;
Boolean isDiskCacheEnabled = DISK_CACHE_ENABLED_SETTING_MAP.get(builder.cacheType).get(builder.cacheConfig.getSettings());
this.dimensionNames = builder.cacheConfig.getDimensionNames();
Expand Down Expand Up @@ -168,7 +175,6 @@ static class TieredSpilloverCacheSegment<K, V> implements ICache<K, V> {
Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null");
Objects.requireNonNull(builder.cacheConfig, "cache config can't be null");
Objects.requireNonNull(builder.cacheConfig.getClusterSettings(), "cluster settings can't be null");
Objects.requireNonNull(builder.cacheConfig.getStoragePath(), "Storage path shouldn't be null");
this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null");
this.statsHolder = statsHolder;

Expand Down Expand Up @@ -205,7 +211,7 @@ static class TieredSpilloverCacheSegment<K, V> implements ICache<K, V> {
.setSegmentCount(1) // We don't need to make underlying caches multi-segmented
.setStatsTrackingEnabled(false)
.setMaxSizeInBytes(diskCacheSizeInBytes)
.setStoragePath(builder.cacheConfig.getStoragePath() + "/" + segmentNumber)
.setStoragePath(builder.diskCacheStoragePath + "/" + segmentNumber)
.setCacheAlias("tiered_disk_cache#" + segmentNumber)
.build(),
builder.cacheType,
Expand Down Expand Up @@ -561,6 +567,11 @@ public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {
}
}

// Package private for testing.
String getDiskCacheStoragePath() {
return this.diskCacheStoragePath;
}

// Package private for testing.
void enableDisableDiskCache(Boolean isDiskCacheEnabled) {
for (int iter = 0; iter < this.numberOfSegments; iter++) {
Expand Down Expand Up @@ -818,6 +829,17 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,

int numberOfSegments = TIERED_SPILLOVER_SEGMENTS.getConcreteSettingForNamespace(cacheType.getSettingPrefix()).get(settings);

String storagePath = TIERED_SPILLOVER_DISK_STORAGE_PATH.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
.get(settings);
// If we read the storage path directly from the setting, we have to add the segment number at the end.
if (storagePath == null || storagePath.isBlank()) {
// In case storage path is not explicitly set by user, use default path.
if (config.getStoragePath() == null || config.getStoragePath().isBlank()) {
throw new IllegalArgumentException(INVALID_STORAGE_PATH_EXCEPTION_MESSAGE);
}
storagePath = config.getStoragePath();
}

if (!VALID_SEGMENT_COUNT_VALUES.contains(numberOfSegments)) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, INVALID_SEGMENT_COUNT_EXCEPTION_MESSAGE, TIERED_SPILLOVER_CACHE_NAME)
Expand All @@ -839,6 +861,7 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
.addPolicy(new TookTimePolicy<V>(diskPolicyThreshold, cachedResultParser, config.getClusterSettings(), cacheType))
.setOnHeapCacheSizeInBytes(onHeapCacheSize)
.setDiskCacheSize(diskCacheSize)
.setDiskCacheStoragePath(storagePath)
.build();
}

Expand All @@ -865,6 +888,7 @@ public static class Builder<K, V> {
private int numberOfSegments;
private long onHeapCacheSizeInBytes;
private long diskCacheSizeInBytes;
private String diskCacheStoragePath;

/**
* Default constructor
Expand Down Expand Up @@ -972,7 +996,7 @@ public Builder<K, V> setOnHeapCacheSizeInBytes(long onHeapCacheSizeInBytes) {
}

/**
* Sets disk cache siz
* Sets disk cache size
* @param diskCacheSizeInBytes size of diskCache in bytes
* @return buider
*/
Expand All @@ -981,6 +1005,16 @@ public Builder<K, V> setDiskCacheSize(long diskCacheSizeInBytes) {
return this;
}

/**
* Sets disk cache storage path.
* @param diskCacheStoragePath storage path for disk cache
* @return builder
*/
public Builder<K, V> setDiskCacheStoragePath(String diskCacheStoragePath) {
this.diskCacheStoragePath = diskCacheStoragePath;
return this;
}

/**
* Build tiered spillover cache.
* @return TieredSpilloverCache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public List<Setting<?>> getSettings() {
settingList.add(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_SIZE.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
settingList.add(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORAGE_PATH.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
}
return settingList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ public class TieredSpilloverCacheSettings {
(key) -> Setting.longSetting(key, DEFAULT_DISK_CACHE_SIZE_IN_BYTES, MIN_DISK_CACHE_SIZE_IN_BYTES, NodeScope)
);

/**
* Storage path for disk cache.
*/
public static final Setting.AffixSetting<String> TIERED_SPILLOVER_DISK_STORAGE_PATH = Setting.suffixKeySetting(
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.storage.path",
(key) -> Setting.simpleString(key, "", NodeScope)
);

/**
* Setting defining the minimum took time for a query to be allowed into the disk cache.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,11 @@ public static class MockDiskCacheFactory implements Factory {
final long delay;
final int maxSize;
final boolean statsTrackingEnabled;
final int keyValueSize;

public MockDiskCacheFactory(long delay, int maxSize, boolean statsTrackingEnabled, int keyValueSize) {
public MockDiskCacheFactory(long delay, int maxSize, boolean statsTrackingEnabled) {
this.delay = delay;
this.maxSize = maxSize;
this.statsTrackingEnabled = statsTrackingEnabled;
this.keyValueSize = keyValueSize;
}

@Override
Expand Down
Loading
Loading