Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.16] Implement write and read flow for shard diff file. #14922

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add matchesPluginSystemIndexPattern to SystemIndexRegistry ([#14750](https://github.com/opensearch-project/OpenSearch/pull/14750))
- Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659)))
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
- Add shard-diff path to diff manifest to reduce number of read calls remote store (([#14684](https://github.com/opensearch-project/OpenSearch/pull/14684)))
- Add SortResponseProcessor to Search Pipelines (([#14785](https://github.com/opensearch-project/OpenSearch/issues/14785)))
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
- Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
Expand All @@ -32,16 +33,19 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteRoutingTableServiceIT extends RemoteStoreBaseIntegTestCase {
private static final String INDEX_NAME = "test-index";
private static final String INDEX_NAME_1 = "test-index-1";
BlobPath indexRoutingPath;
AtomicInteger indexRoutingFiles = new AtomicInteger();
private final RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.HASHED_PREFIX;
Expand Down Expand Up @@ -72,7 +76,13 @@ public void testRemoteRoutingTableIndexLifecycle() throws Exception {
RemoteClusterStateService.class
);
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
verifyUpdatesInManifestFile(remoteManifestManager);
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
List<String> expectedIndexNames = new ArrayList<>();
List<String> deletedIndexNames = new ArrayList<>();
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true);

List<RoutingTable> routingTableVersions = getRoutingTableFromAllNodes();
assertTrue(areRoutingTablesSame(routingTableVersions));
Expand All @@ -86,7 +96,11 @@ public void testRemoteRoutingTableIndexLifecycle() throws Exception {
assertTrue(indexRoutingFilesAfterUpdate >= indexRoutingFiles.get() + 3);
});

verifyUpdatesInManifestFile(remoteManifestManager);
latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true);

routingTableVersions = getRoutingTableFromAllNodes();
assertTrue(areRoutingTablesSame(routingTableVersions));
Expand All @@ -98,6 +112,42 @@ public void testRemoteRoutingTableIndexLifecycle() throws Exception {
assertTrue(areRoutingTablesSame(routingTableVersions));
}

public void testRemoteRoutingTableEmptyRoutingTableDiff() throws Exception {
prepareClusterAndVerifyRepository();

RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
List<String> expectedIndexNames = new ArrayList<>();
List<String> deletedIndexNames = new ArrayList<>();
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true);

List<RoutingTable> routingTableVersions = getRoutingTableFromAllNodes();
assertTrue(areRoutingTablesSame(routingTableVersions));

// Update cluster settings
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.getKey(), 0, TimeUnit.SECONDS))
.get();
assertTrue(response.isAcknowledged());

latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, false);

routingTableVersions = getRoutingTableFromAllNodes();
assertTrue(areRoutingTablesSame(routingTableVersions));
}

public void testRemoteRoutingTableIndexNodeRestart() throws Exception {
BlobStoreRepository repository = prepareClusterAndVerifyRepository();

Expand All @@ -124,10 +174,16 @@ public void testRemoteRoutingTableIndexNodeRestart() throws Exception {
RemoteClusterStateService.class
);
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
verifyUpdatesInManifestFile(remoteManifestManager);
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
List<String> expectedIndexNames = new ArrayList<>();
List<String> deletedIndexNames = new ArrayList<>();
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true);
}

public void testRemoteRoutingTableIndexMasterRestart1() throws Exception {
public void testRemoteRoutingTableIndexMasterRestart() throws Exception {
BlobStoreRepository repository = prepareClusterAndVerifyRepository();

List<RoutingTable> routingTableVersions = getRoutingTableFromAllNodes();
Expand All @@ -153,7 +209,13 @@ public void testRemoteRoutingTableIndexMasterRestart1() throws Exception {
RemoteClusterStateService.class
);
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
verifyUpdatesInManifestFile(remoteManifestManager);
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
List<String> expectedIndexNames = new ArrayList<>();
List<String> deletedIndexNames = new ArrayList<>();
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true);
}

private BlobStoreRepository prepareClusterAndVerifyRepository() throws Exception {
Expand Down Expand Up @@ -208,18 +270,23 @@ private BlobPath getIndexRoutingPath(BlobPath indexRoutingPath, String indexUUID
);
}

private void verifyUpdatesInManifestFile(RemoteManifestManager remoteManifestManager) {
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
private void verifyUpdatesInManifestFile(
Optional<ClusterMetadataManifest> latestManifest,
List<String> expectedIndexNames,
int expectedIndicesRoutingFilesInManifest,
List<String> expectedDeletedIndex,
boolean isRoutingTableDiffFileExpected
) {
assertTrue(latestManifest.isPresent());
ClusterMetadataManifest manifest = latestManifest.get();
assertTrue(manifest.getDiffManifest().getIndicesRoutingUpdated().contains(INDEX_NAME));
assertTrue(manifest.getDiffManifest().getIndicesDeleted().isEmpty());
assertFalse(manifest.getIndicesRouting().isEmpty());
assertEquals(1, manifest.getIndicesRouting().size());
assertTrue(manifest.getIndicesRouting().get(0).getUploadedFilename().contains(indexRoutingPath.buildAsString()));

assertEquals(expectedIndexNames, manifest.getDiffManifest().getIndicesRoutingUpdated());
assertEquals(expectedDeletedIndex, manifest.getDiffManifest().getIndicesDeleted());
assertEquals(expectedIndicesRoutingFilesInManifest, manifest.getIndicesRouting().size());
for (ClusterMetadataManifest.UploadedIndexMetadata uploadedFilename : manifest.getIndicesRouting()) {
assertTrue(uploadedFilename.getUploadedFilename().contains(indexRoutingPath.buildAsString()));
}
assertEquals(isRoutingTableDiffFileExpected, manifest.getDiffManifest().getIndicesRoutingDiffPath() != null);
}

private List<RoutingTable> getRoutingTableFromAllNodes() throws ExecutionException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.cluster.Diff;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Represents a difference between {@link RoutingTable} objects that can be serialized and deserialized.
*/
public class RoutingTableIncrementalDiff implements Diff<RoutingTable> {

private final Map<String, Diff<IndexRoutingTable>> diffs;

/**
* Constructs a new RoutingTableIncrementalDiff with the given differences.
*
* @param diffs a map containing the differences of {@link IndexRoutingTable}.
*/
public RoutingTableIncrementalDiff(Map<String, Diff<IndexRoutingTable>> diffs) {
this.diffs = diffs;
}

/**
* Gets the map of differences of {@link IndexRoutingTable}.
*
* @return a map containing the differences.
*/
public Map<String, Diff<IndexRoutingTable>> getDiffs() {
return diffs;
}

/**
* Reads a {@link RoutingTableIncrementalDiff} from the given {@link StreamInput}.
*
* @param in the input stream to read from.
* @return the deserialized RoutingTableIncrementalDiff.
* @throws IOException if an I/O exception occurs while reading from the stream.
*/
public static RoutingTableIncrementalDiff readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
Map<String, Diff<IndexRoutingTable>> diffs = new HashMap<>();

for (int i = 0; i < size; i++) {
String key = in.readString();
Diff<IndexRoutingTable> diff = IndexRoutingTableIncrementalDiff.readFrom(in);
diffs.put(key, diff);
}
return new RoutingTableIncrementalDiff(diffs);
}

/**
* Applies the differences to the provided {@link RoutingTable}.
*
* @param part the original RoutingTable to which the differences will be applied.
* @return the updated RoutingTable with the applied differences.
*/
@Override
public RoutingTable apply(RoutingTable part) {
RoutingTable.Builder builder = new RoutingTable.Builder();

Check warning on line 73 in server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java#L73

Added line #L73 was not covered by tests
for (IndexRoutingTable indexRoutingTable : part) {
builder.add(indexRoutingTable); // Add existing index routing tables to builder
}

Check warning on line 76 in server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java#L75-L76

Added lines #L75 - L76 were not covered by tests

// Apply the diffs
for (Map.Entry<String, Diff<IndexRoutingTable>> entry : diffs.entrySet()) {
builder.add(entry.getValue().apply(part.index(entry.getKey())));
}

Check warning on line 81 in server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java#L80-L81

Added lines #L80 - L81 were not covered by tests

return builder.build();

Check warning on line 83 in server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java#L83

Added line #L83 was not covered by tests
}

/**
* Writes the differences to the given {@link StreamOutput}.
*
* @param out the output stream to write to.
* @throws IOException if an I/O exception occurs while writing to the stream.
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(diffs.size());
for (Map.Entry<String, Diff<IndexRoutingTable>> entry : diffs.entrySet()) {
out.writeString(entry.getKey());
entry.getValue().writeTo(out);
}
}

/**
* Represents a difference between {@link IndexShardRoutingTable} objects that can be serialized and deserialized.
*/
public static class IndexRoutingTableIncrementalDiff implements Diff<IndexRoutingTable> {

private final List<IndexShardRoutingTable> indexShardRoutingTables;

/**
* Constructs a new IndexShardRoutingTableDiff with the given shard routing tables.
*
* @param indexShardRoutingTables a list of IndexShardRoutingTable representing the differences.
*/
public IndexRoutingTableIncrementalDiff(List<IndexShardRoutingTable> indexShardRoutingTables) {
this.indexShardRoutingTables = indexShardRoutingTables;
}

/**
* Applies the differences to the provided {@link IndexRoutingTable}.
*
* @param part the original IndexRoutingTable to which the differences will be applied.
* @return the updated IndexRoutingTable with the applied differences.
*/
@Override
public IndexRoutingTable apply(IndexRoutingTable part) {
IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(part.getIndex());
for (IndexShardRoutingTable shardRoutingTable : part) {
builder.addIndexShard(shardRoutingTable); // Add existing shards to builder
}

// Apply the diff: update or add the new shard routing tables
for (IndexShardRoutingTable diffShard : indexShardRoutingTables) {
builder.addIndexShard(diffShard);
}
return builder.build();
}

/**
* Writes the differences to the given {@link StreamOutput}.
*
* @param out the output stream to write to.
* @throws IOException if an I/O exception occurs while writing to the stream.
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(indexShardRoutingTables.size());
for (IndexShardRoutingTable shardRoutingTable : indexShardRoutingTables) {
IndexShardRoutingTable.Builder.writeTo(shardRoutingTable, out);
}
}

/**
* Reads a {@link IndexRoutingTableIncrementalDiff} from the given {@link StreamInput}.
*
* @param in the input stream to read from.
* @return the deserialized IndexShardRoutingTableDiff.
* @throws IOException if an I/O exception occurs while reading from the stream.
*/
public static IndexRoutingTableIncrementalDiff readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
List<IndexShardRoutingTable> indexShardRoutingTables = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
IndexShardRoutingTable shardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in);
indexShardRoutingTables.add(shardRoutingTable);
}
return new IndexRoutingTableIncrementalDiff(indexShardRoutingTables);
}
}
}
Loading
Loading