Skip to content

Commit

Permalink
fix the existing repo lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
rajiv-kv committed Oct 21, 2024
1 parent c23f86e commit 75032b7
Showing 1 changed file with 27 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RerouteService;
Expand All @@ -57,6 +58,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -185,19 +187,29 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
// for every set of node join task which we can optimize to not compute if cluster state already has
// repository information.
Optional<DiscoveryNode> remoteDN = currentNodes.getNodes().values().stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
DiscoveryNode dn = remoteDN.orElseGet(() -> (currentNodes.getNodes().values()).stream().findFirst().get());
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
dn,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
);

Optional<DiscoveryNode> remotePublicationDN = currentNodes.getNodes()
.values()
.stream()
.filter(DiscoveryNode::isRemoteStatePublicationEnabled)
.findFirst();
RepositoriesMetadata existingrepositoriesMetadata = currentState.getMetadata().custom(RepositoriesMetadata.TYPE);
Map<String, RepositoryMetadata> repositories = new LinkedHashMap<>();
if (existingrepositoriesMetadata != null) {
existingrepositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
}
if (remoteDN.isPresent()) {
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
remoteDN.get(),
existingrepositoriesMetadata
);
repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
}
if (remotePublicationDN.isPresent()) {
repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(remotePublicationDN.get(), repositoriesMetadata);
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
remotePublicationDN.get(),
existingrepositoriesMetadata
);
repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
}

assert nodesBuilder.isLocalNodeElectedClusterManager();
Expand Down Expand Up @@ -228,17 +240,16 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
ensureNodeCommissioned(node, currentState.metadata());
nodesBuilder.add(node);

if (remoteDN.isEmpty() && node.isRemoteStoreNode()) {
if ((remoteDN.isEmpty() && node.isRemoteStoreNode())
|| (remotePublicationDN.isEmpty() && node.isRemoteStatePublicationEnabled())) {
// This is hit only on cases where we encounter first remote node
logger.info("Updating system repository now for remote store");
repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(node, repositoriesMetadata);
}
if (remotePublicationDN.isEmpty() && node.isRemoteStatePublicationEnabled()) {
// This is hit only on cases where we encounter first remote publication node
logger.info("Updating system repository now for remote publication store");
repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(node, repositoriesMetadata);
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
node,
existingrepositoriesMetadata
);
repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
}

nodesChanged = true;
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
Expand All @@ -252,7 +263,7 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
}
results.success(joinTask);
}

RepositoriesMetadata repositoriesMetadata = new RepositoriesMetadata(new ArrayList<>(repositories.values()));
if (nodesChanged) {
rerouteService.reroute(
"post-join reroute",
Expand Down

0 comments on commit 75032b7

Please sign in to comment.