Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into wait_until
Browse files Browse the repository at this point in the history
  • Loading branch information
Rishikesh1159 authored Feb 24, 2023
2 parents 055f225 + 95bd076 commit 36f3851
Show file tree
Hide file tree
Showing 28 changed files with 627 additions and 121 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- Add GeoTile and GeoHash Grid aggregations on GeoShapes. ([#5589](https://github.com/opensearch-project/OpenSearch/pull/5589))
- Disallow multiple data paths for search nodes ([#6427](https://github.com/opensearch-project/OpenSearch/pull/6427))

### Dependencies

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,17 +419,6 @@ public void testNewlyAddedReplicaIsUpdated() throws Exception {
assertAcked(
client().admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 1))
);

ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("2")
.setWaitForGreenStatus()
.setTimeout(TimeValue.timeValueSeconds(2))
.execute()
.actionGet();
assertFalse(clusterHealthResponse.isTimedOut());
ensureGreen(INDEX_NAME);
flushAndRefresh(INDEX_NAME);
waitForSearchableDocs(20, primary, replica);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.hamcrest.MatcherAssert;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
Expand All @@ -29,6 +31,7 @@
import org.opensearch.index.Index;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.repositories.fs.FsRepository;

Expand Down Expand Up @@ -413,4 +416,49 @@ private void assertIndexDirectoryDoesNotExist(String... indexNames) {
}
}
}

public void testFileCacheStats() throws Exception {
final String snapshotName = "test-snap";
final String repoName = "test-repo";
final String indexName1 = "test-idx-1";
final Client client = client();
final int numNodes = 2;

internalCluster().ensureAtLeastNumSearchNodes(numNodes);
createIndexWithDocsAndEnsureGreen(1, 100, indexName1);

createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName1);
deleteIndicesAndEnsureGreen(client, indexName1);
assertAllNodesFileCacheEmpty();

restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertNodesFileCacheNonEmpty(numNodes);
}

private void assertAllNodesFileCacheEmpty() {
NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
for (NodeStats stats : response.getNodes()) {
FileCacheStats fcstats = stats.getFileCacheStats();
assertNotNull(fcstats);
assertTrue(isFileCacheEmpty(fcstats));
}
}

private void assertNodesFileCacheNonEmpty(int numNodes) {
NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
int nonEmptyFileCacheNodes = 0;
for (NodeStats stats : response.getNodes()) {
FileCacheStats fcstats = stats.getFileCacheStats();
assertNotNull(fcstats);
if (!isFileCacheEmpty(fcstats)) {
nonEmptyFileCacheNodes++;
}
}
assertEquals(numNodes, nonEmptyFileCacheNodes);
}

private boolean isFileCacheEmpty(FileCacheStats stats) {
return stats.getUsed().getBytes() == 0L && stats.getActive().getBytes() == 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.http.HttpStats;
import org.opensearch.index.stats.IndexingPressureStats;
import org.opensearch.index.stats.ShardIndexingPressureStats;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.indices.NodeIndicesStats;
import org.opensearch.indices.breaker.AllCircuitBreakerStats;
import org.opensearch.ingest.IngestStats;
Expand Down Expand Up @@ -130,6 +132,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private WeightedRoutingStats weightedRoutingStats;

@Nullable
private FileCacheStats fileCacheStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -171,6 +176,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
weightedRoutingStats = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0) && FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
fileCacheStats = in.readOptionalWriteable(FileCacheStats::new);
} else {
fileCacheStats = null;
}
}

public NodeStats(
Expand All @@ -194,7 +204,8 @@ public NodeStats(
@Nullable ShardIndexingPressureStats shardIndexingPressureStats,
@Nullable SearchBackpressureStats searchBackpressureStats,
@Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats,
@Nullable WeightedRoutingStats weightedRoutingStats
@Nullable WeightedRoutingStats weightedRoutingStats,
@Nullable FileCacheStats fileCacheStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -217,6 +228,7 @@ public NodeStats(
this.searchBackpressureStats = searchBackpressureStats;
this.clusterManagerThrottlingStats = clusterManagerThrottlingStats;
this.weightedRoutingStats = weightedRoutingStats;
this.fileCacheStats = fileCacheStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -340,6 +352,10 @@ public WeightedRoutingStats getWeightedRoutingStats() {
return weightedRoutingStats;
}

public FileCacheStats getFileCacheStats() {
return fileCacheStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -374,6 +390,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
out.writeOptionalWriteable(weightedRoutingStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0) && FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
out.writeOptionalWriteable(fileCacheStats);
}
}

@Override
Expand Down Expand Up @@ -455,6 +474,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getWeightedRoutingStats() != null) {
getWeightedRoutingStats().toXContent(builder, params);
}
if (getFileCacheStats() != null && FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
getFileCacheStats().toXContent(builder, params);
}

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ public enum Metric {
SHARD_INDEXING_PRESSURE("shard_indexing_pressure"),
SEARCH_BACKPRESSURE("search_backpressure"),
CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling"),
WEIGHTED_ROUTING_STATS("weighted_routing");
WEIGHTED_ROUTING_STATS("weighted_routing"),
FILE_CACHE_STATS("file_cache");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.SHARD_INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_BACKPRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics),
NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics)
NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics),
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
25 changes: 25 additions & 0 deletions server/src/main/java/org/opensearch/bootstrap/BootstrapChecks.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,18 @@
import org.apache.lucene.util.Constants;
import org.opensearch.bootstrap.jvm.DenyJvmVersionsParser;
import org.opensearch.cluster.coordination.ClusterBootstrapService;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.transport.BoundTransportAddress;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.discovery.DiscoveryModule;
import org.opensearch.env.Environment;
import org.opensearch.index.IndexModule;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.monitor.process.ProcessProbe;
import org.opensearch.node.NodeRoleSettings;
import org.opensearch.node.NodeValidationException;

import java.io.BufferedReader;
Expand Down Expand Up @@ -228,6 +231,7 @@ static List<BootstrapCheck> checks() {
checks.add(new JavaVersionCheck());
checks.add(new AllPermissionCheck());
checks.add(new DiscoveryConfiguredCheck());
checks.add(new MultipleDataPathCheck());
return Collections.unmodifiableList(checks);
}

Expand Down Expand Up @@ -751,4 +755,25 @@ public BootstrapCheckResult check(BootstrapContext context) {
);
}
}

/**
* Bootstrap check that if a search node contains multiple data paths
*/
static class MultipleDataPathCheck implements BootstrapCheck {

@Override
public BootstrapCheckResult check(BootstrapContext context) {
if (NodeRoleSettings.NODE_ROLES_SETTING.get(context.settings()).contains(DiscoveryNodeRole.SEARCH_ROLE)
&& Environment.PATH_DATA_SETTING.get(context.settings()).size() > 1) {
return BootstrapCheckResult.failure("Multiple data paths are not allowed for search nodes");
}
return BootstrapCheckResult.success();
}

@Override
public final boolean alwaysEnforce() {
return true;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,7 @@ private Future<RemoteSnapshotDirectory> createRemoteSnapshotDirectoryFromSnapsho
return threadPool.executor(ThreadPool.Names.SNAPSHOT).submit(() -> {
final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(blobPath);
final BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId);
TransferManager transferManager = new TransferManager(
blobContainer,
threadPool.executor(ThreadPool.Names.SEARCH),
remoteStoreFileCache
);
TransferManager transferManager = new TransferManager(blobContainer, remoteStoreFileCache);
return new RemoteSnapshotDirectory(snapshot, localStoreDir, transferManager);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IndexInput;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import org.opensearch.index.store.remote.utils.BlobFetchRequest;
import org.opensearch.index.store.remote.utils.TransferManager;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

/**
* This is an implementation of {@link OnDemandBlockIndexInput} where this class provides the main IndexInput using shard snapshot files.
Expand Down Expand Up @@ -147,9 +145,10 @@ protected IndexInput fetchBlock(int blockId) throws IOException {
.fileName(blockFileName)
.build();
try {
return transferManager.asyncFetchBlob(blobFetchRequest).get();
} catch (InterruptedException | ExecutionException e) {
logger.error(() -> new ParameterizedMessage("unexpected failure while fetching [{}]", blobFetchRequest), e);
return transferManager.fetchBlob(blobFetchRequest);
} catch (InterruptedException e) {
logger.error("Interrupted while fetching [{}]", blobFetchRequest);
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
Expand Down
Loading

0 comments on commit 36f3851

Please sign in to comment.