Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix fs info reporting negative available size ([#11573](https://github.com/opensearch-project/OpenSearch/pull/11573))
- Add ListPitInfo::getKeepAlive() getter ([#14495](https://github.com/opensearch-project/OpenSearch/pull/14495))
- Fix FuzzyQuery in keyword field will use IndexOrDocValuesQuery when both of index and doc_value are true ([#14378](https://github.com/opensearch-project/OpenSearch/pull/14378))
- Fix file cache initialization ([#14004](https://github.com/opensearch-project/OpenSearch/pull/14004))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.index.Index;
Expand Down Expand Up @@ -65,10 +67,13 @@
import java.util.stream.StreamSupport;

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS;
import static org.opensearch.common.util.FeatureFlags.TIERED_REMOTE_INDEX;
import static org.opensearch.core.common.util.CollectionUtils.iterableAsArrayList;
import static org.opensearch.index.store.remote.filecache.FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;
import static org.opensearch.test.NodeRoles.clusterManagerOnlyNode;
import static org.opensearch.test.NodeRoles.dataNode;
import static org.opensearch.test.NodeRoles.onlyRole;
import static org.opensearch.test.NodeRoles.onlyRoles;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -1009,6 +1014,26 @@ public void cleanup() throws Exception {
);
}

public void testStartSearchNode() throws Exception {
// test start dedicated search node
internalCluster().startNode(Settings.builder().put(onlyRole(DiscoveryNodeRole.SEARCH_ROLE)));
// test start node without search role
internalCluster().startNode(Settings.builder().put(onlyRole(DiscoveryNodeRole.DATA_ROLE)));
// test start non-dedicated search node with TIERED_REMOTE_INDEX feature enabled
internalCluster().startNode(
Settings.builder()
.put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.DATA_ROLE)))
.put(TIERED_REMOTE_INDEX, true)
);
// test start non-dedicated search node
assertThrows(
SettingsException.class,
() -> internalCluster().startNode(
Settings.builder().put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.DATA_ROLE)))
)
);
}

private void assertSearchableSnapshotIndexDirectoryExistence(String nodeName, Index index, boolean exists) throws Exception {
final Node node = internalCluster().getInstance(Node.class, nodeName);
final ShardId shardId = new ShardId(index, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ public static boolean isSearchNode(Settings settings) {
return hasRole(settings, DiscoveryNodeRole.SEARCH_ROLE);
}

public static boolean isDedicatedSearchNode(Settings settings) {
return getRolesFromSettings(settings).stream().allMatch(DiscoveryNodeRole.SEARCH_ROLE::equals);
}

private final String nodeName;
private final String nodeId;
private final String ephemeralId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ private static final int ceilingNextPowerOfTwo(int x) {
private final Weigher<V> weigher;

public SegmentedCache(Builder<K, V> builder) {
this.capacity = builder.capacity;
final int segments = ceilingNextPowerOfTwo(builder.concurrencyLevel);
this.segmentMask = segments - 1;
this.table = newSegmentArray(segments);
this.perSegmentCapacity = (capacity + (segments - 1)) / segments;
this.perSegmentCapacity = (builder.capacity + (segments - 1)) / segments;
this.weigher = builder.weigher;
for (int i = 0; i < table.length; i++) {
table[i] = new LRUCache<>(perSegmentCapacity, builder.listener, builder.weigher);
}
this.capacity = perSegmentCapacity * segments;
}

@SuppressWarnings("unchecked")
Expand Down
13 changes: 12 additions & 1 deletion server/src/main/java/org/opensearch/monitor/fs/FsProbe.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ public FsInfo stats(FsInfo previous) throws IOException {
if (fileCache != null && dataLocations[i].fileCacheReservedSize != ByteSizeValue.ZERO) {
paths[i].fileCacheReserved = adjustForHugeFilesystems(dataLocations[i].fileCacheReservedSize.getBytes());
paths[i].fileCacheUtilized = adjustForHugeFilesystems(fileCache.usage().usage());
paths[i].available -= (paths[i].fileCacheReserved - paths[i].fileCacheUtilized);
// fileCacheFree will be less than zero if the cache being over-subscribed
long fileCacheFree = paths[i].fileCacheReserved - paths[i].fileCacheUtilized;
if (fileCacheFree > 0) {
paths[i].available -= fileCacheFree;
}
// occurs if reserved file cache space is occupied by other files, like local indices
if (paths[i].available < 0) {
paths[i].available = 0;
Expand Down Expand Up @@ -215,4 +219,11 @@ public static FsInfo.Path getFSInfo(NodePath nodePath) throws IOException {
return fsPath;
}

public static long getTotalSize(NodePath nodePath) throws IOException {
return adjustForHugeFilesystems(nodePath.fileStore.getTotalSpace());
}

public static long getAvailableSize(NodePath nodePath) throws IOException {
return adjustForHugeFilesystems(nodePath.fileStore.getUsableSpace());
}
}
86 changes: 53 additions & 33 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.Build;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.Version;
import org.opensearch.action.ActionModule;
Expand Down Expand Up @@ -107,6 +108,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.settings.SettingsModule;
import org.opensearch.common.unit.RatioValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.FeatureFlags;
Expand Down Expand Up @@ -175,7 +177,6 @@
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.MonitorService;
import org.opensearch.monitor.fs.FsHealthService;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
Expand Down Expand Up @@ -371,9 +372,12 @@ public class Node implements Closeable {
}
}, Setting.Property.NodeScope);

public static final Setting<ByteSizeValue> NODE_SEARCH_CACHE_SIZE_SETTING = Setting.byteSizeSetting(
private static final String ZERO = "0";

public static final Setting<String> NODE_SEARCH_CACHE_SIZE_SETTING = new Setting<>(
"node.search.cache.size",
ByteSizeValue.ZERO,
s -> (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX_SETTING) || DiscoveryNode.isDedicatedSearchNode(s)) ? "80%" : ZERO,
Node::validateFileCacheSize,
Property.NodeScope
);

Expand Down Expand Up @@ -1997,43 +2001,59 @@ DiscoveryNode getNode() {
* Initializes the search cache with a defined capacity.
* The capacity of the cache is based on user configuration for {@link Node#NODE_SEARCH_CACHE_SIZE_SETTING}.
* If the user doesn't configure the cache size, it fails if the node is a data + search node.
* Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined.
* Else it configures the size to 80% of total capacity for a dedicated search node, if not explicitly defined.
*/
private void initializeFileCache(Settings settings, CircuitBreaker circuitBreaker) throws IOException {
boolean isWritableRemoteIndexEnabled = FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX_SETTING);
if (DiscoveryNode.isSearchNode(settings) || isWritableRemoteIndexEnabled) {
NodeEnvironment.NodePath fileCacheNodePath = nodeEnvironment.fileCacheNodePath();
long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes();
FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(fileCacheNodePath));
long availableCapacity = info.getAvailable().getBytes();

// Initialize default values for cache if NODE_SEARCH_CACHE_SIZE_SETTING is not set.
if (capacity == 0) {
// If node is not a dedicated search node without configuration, prevent cache initialization
if (!isWritableRemoteIndexEnabled
&& DiscoveryNode.getRolesFromSettings(settings)
.stream()
.anyMatch(role -> !DiscoveryNodeRole.SEARCH_ROLE.equals(role))) {
throw new SettingsException(
"Unable to initialize the "
+ DiscoveryNodeRole.SEARCH_ROLE.roleName()
+ "-"
+ DiscoveryNodeRole.DATA_ROLE.roleName()
+ " node: Missing value for configuration "
+ NODE_SEARCH_CACHE_SIZE_SETTING.getKey()
);
} else {
capacity = 80 * availableCapacity / 100;
}
if (DiscoveryNode.isSearchNode(settings) == false && isWritableRemoteIndexEnabled == false) {
return;
}

String capacityRaw = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings);
logger.info("cache size [{}]", capacityRaw);
if (capacityRaw.equals(ZERO)) {
throw new SettingsException(
"Unable to initialize the "
+ DiscoveryNodeRole.SEARCH_ROLE.roleName()
+ "-"
+ DiscoveryNodeRole.DATA_ROLE.roleName()
+ " node: Missing value for configuration "
+ NODE_SEARCH_CACHE_SIZE_SETTING.getKey()
);
}

NodeEnvironment.NodePath fileCacheNodePath = nodeEnvironment.fileCacheNodePath();
long totalSpace = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getTotalSize(fileCacheNodePath));
long capacity = calculateFileCacheSize(capacityRaw, totalSpace);
if (capacity <= 0 || totalSpace <= capacity) {
throw new SettingsException("Cache size must be larger than zero and less than total capacity");
}

this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker);
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(this.fileCache.capacity(), ByteSizeUnit.BYTES);
List<Path> fileCacheDataPaths = collectFileCacheDataPath(fileCacheNodePath);
this.fileCache.restoreFromDirectory(fileCacheDataPaths);
}

private static long calculateFileCacheSize(String capacityRaw, long totalSpace) {
try {
RatioValue ratioValue = RatioValue.parseRatioValue(capacityRaw);
return Math.round(totalSpace * ratioValue.getAsRatio());
} catch (OpenSearchParseException e) {
try {
return ByteSizeValue.parseBytesSizeValue(capacityRaw, NODE_SEARCH_CACHE_SIZE_SETTING.getKey()).getBytes();
} catch (OpenSearchParseException ex) {
ex.addSuppressed(e);
throw ex;
}
capacity = Math.min(capacity, availableCapacity);
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES);
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker);
List<Path> fileCacheDataPaths = collectFileCacheDataPath(fileCacheNodePath);
this.fileCache.restoreFromDirectory(fileCacheDataPaths);
}
}

private static String validateFileCacheSize(String capacityRaw) {
calculateFileCacheSize(capacityRaw, 0L);
return capacityRaw;
}

/**
* Returns the {@link FileCache} instance for remote search node
* Note: Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void createNodePaths() throws IOException {
dataClusterManagerSettings = buildEnvSettings(Settings.EMPTY);
Settings defaultSearchSettings = Settings.builder()
.put(dataClusterManagerSettings)
.put(NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(16, ByteSizeUnit.GB))
.put(NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(16, ByteSizeUnit.GB).toString())
.build();

searchNoDataNoClusterManagerSettings = onlyRole(dataClusterManagerSettings, DiscoveryNodeRole.SEARCH_ROLE);
Expand Down
2 changes: 1 addition & 1 deletion server/src/test/java/org/opensearch/node/NodeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public void testCreateWithFileCache() throws Exception {
List<Class<? extends Plugin>> plugins = basePlugins();
ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB);
Settings searchRoleSettingsWithConfig = baseSettings().put(searchRoleSettings)
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize)
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString())
.build();
Settings onlySearchRoleSettings = Settings.builder()
.put(searchRoleSettingsWithConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
import static org.opensearch.test.NodeRoles.onlyRoles;
import static org.opensearch.test.NodeRoles.removeRoles;
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
import static org.opensearch.test.OpenSearchTestCase.randomBoolean;
import static org.opensearch.test.OpenSearchTestCase.randomFrom;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -216,7 +217,8 @@ public final class InternalTestCluster extends TestCluster {
nodeAndClient.node.settings()
);

private static final ByteSizeValue DEFAULT_SEARCH_CACHE_SIZE = new ByteSizeValue(2, ByteSizeUnit.GB);
private static final String DEFAULT_SEARCH_CACHE_SIZE_BYTES = "2gb";
private static final String DEFAULT_SEARCH_CACHE_SIZE_PERCENT = "5%";

public static final int DEFAULT_LOW_NUM_CLUSTER_MANAGER_NODES = 1;
public static final int DEFAULT_HIGH_NUM_CLUSTER_MANAGER_NODES = 3;
Expand Down Expand Up @@ -700,8 +702,10 @@ public synchronized void ensureAtLeastNumSearchAndDataNodes(int n) {
logger.info("increasing cluster size from {} to {}", size, n);
Set<DiscoveryNodeRole> searchAndDataRoles = Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.SEARCH_ROLE);
Settings settings = Settings.builder()
.put(Settings.EMPTY)
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), DEFAULT_SEARCH_CACHE_SIZE)
.put(
Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(),
randomBoolean() ? DEFAULT_SEARCH_CACHE_SIZE_PERCENT : DEFAULT_SEARCH_CACHE_SIZE_BYTES
)
.build();
startNodes(n - size, Settings.builder().put(onlyRoles(settings, searchAndDataRoles)).build());
validateClusterFormed();
Expand Down