Skip to content

Commit

Permalink
create publication repos during join task
Browse files Browse the repository at this point in the history
  • Loading branch information
rajiv-kv committed Oct 18, 2024
1 parent 0bded88 commit f54d4f3
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,13 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
// for every set of node join task which we can optimize to not compute if cluster state already has
// repository information.
Optional<DiscoveryNode> remoteDN = currentNodes.getNodes().values().stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
DiscoveryNode dn = remoteDN.orElseGet(() -> (currentNodes.getNodes().values()).stream().findFirst().get());
Optional<DiscoveryNode> remotePublicationDN = currentNodes.getNodes()
.values()
.stream()
.filter(DiscoveryNode::isRemoteStatePublicationEnabled)
.findFirst();
DiscoveryNode dn = remoteDN.or(() -> remotePublicationDN)
.orElseGet(() -> (currentNodes.getNodes().values()).stream().findFirst().get());
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
dn,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
Expand Down Expand Up @@ -226,6 +232,13 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
node,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
);
} else if (remotePublicationDN.isEmpty() && node.isRemoteStatePublicationEnabled()) {
// This is hit only on cases where we encounter first remote node
logger.info("Updating system repository now for remote publication store");
repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
node,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
);
}

nodesChanged = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void createAndVerifyRepositories(DiscoveryNode localNode) {
* node repository metadata an exception will be thrown and the node will not be allowed to join the cluster.
*/
public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode, RepositoriesMetadata existingRepositories) {
if (joiningNode.isRemoteStoreNode()) {
if (joiningNode.isRemoteStoreNode() || joiningNode.isRemoteStatePublicationEnabled()) {
List<RepositoryMetadata> updatedRepositoryMetadataList = new ArrayList<>();
List<RepositoryMetadata> newRepositoryMetadataList = new RemoteStoreNodeAttribute(joiningNode).getRepositoriesMetadata()
.repositories();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,132 @@ public void testUpdatesClusterStateWithMultiNodeClusterAndSameRepository() throw
validateRepositoryMetadata(result.resultingState, clusterManagerNode, 2);
}

public void testUpdatesClusterStateRemoteNodeJoinPublicationCluster() throws Exception {
final AllocationService allocationService = mock(AllocationService.class);
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService(
new SetOnce<>(mock(RepositoriesService.class))::get,
null
);

final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(
Settings.EMPTY,
allocationService,
logger,
rerouteService,
remoteStoreNodeService
);

final DiscoveryNode clusterManagerNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remotePublicationNodeAttributes(),
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(
DiscoveryNodes.builder()
.add(clusterManagerNode)
.localNodeId(clusterManagerNode.getId())
.clusterManagerNodeId(clusterManagerNode.getId())
)
.build();

final ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> result = joinTaskExecutor.execute(
clusterState,
List.of(new JoinTaskExecutor.Task(clusterManagerNode, "elect leader"))
);
assertThat(result.executionResults.entrySet(), hasSize(1));
final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next();
assertTrue(taskResult.isSuccess());
validatePublicationRepositoryMetadata(result.resultingState, clusterManagerNode);

final Settings settings = Settings.builder()
.put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE)
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
.build();
final Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);
Metadata metadata = Metadata.builder().persistentSettings(settings).build();

ClusterState currentState = ClusterState.builder(result.resultingState).metadata(metadata).build();

final DiscoveryNode remoteStoreNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO),
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

final ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> resultAfterRemoteNodeJoin = joinTaskExecutor.execute(
currentState,
List.of(new JoinTaskExecutor.Task(remoteStoreNode, "test"))
);
assertThat(resultAfterRemoteNodeJoin.executionResults.entrySet(), hasSize(1));
final ClusterStateTaskExecutor.TaskResult taskResult1 = resultAfterRemoteNodeJoin.executionResults.values().iterator().next();
assertTrue(taskResult1.isSuccess());
validateRepositoryMetadata(resultAfterRemoteNodeJoin.resultingState, remoteStoreNode, 3);
}

public void testUpdatesClusterStateWithMultiplePublicationNodeJoin() throws Exception {
Map<String, String> remoteStoreNodeAttributes = remotePublicationNodeAttributes();
final AllocationService allocationService = mock(AllocationService.class);
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService(
new SetOnce<>(mock(RepositoriesService.class))::get,
null
);

final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(
Settings.EMPTY,
allocationService,
logger,
rerouteService,
remoteStoreNodeService
);

final DiscoveryNode clusterManagerNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>();

final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(
DiscoveryNodes.builder()
.add(clusterManagerNode)
.localNodeId(clusterManagerNode.getId())
.clusterManagerNodeId(clusterManagerNode.getId())
)
.metadata(Metadata.builder().putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositoriesMetadata)))
.build();

final DiscoveryNode joiningNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

final ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> result = joinTaskExecutor.execute(
clusterState,
List.of(new JoinTaskExecutor.Task(joiningNode, "test"))
);
assertThat(result.executionResults.entrySet(), hasSize(1));
final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next();
assertTrue(taskResult.isSuccess());
validatePublicationRepositoryMetadata(result.resultingState, clusterManagerNode);
}

public void testNodeJoinInMixedMode() {
List<Version> versions = allOpenSearchVersions();
assert versions.size() >= 2 : "test requires at least two open search versions";
Expand Down Expand Up @@ -1212,6 +1338,20 @@ private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode
}
}

private void validatePublicationRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode) throws Exception {
final RepositoriesMetadata repositoriesMetadata = updatedState.metadata().custom(RepositoriesMetadata.TYPE);
assertTrue(repositoriesMetadata.repositories().size() == 2);
final RepositoryMetadata clusterStateRepoMetadata = buildRepositoryMetadata(existingNode, CLUSTER_STATE_REPO);
final RepositoryMetadata routingTableRepoMetadata = buildRepositoryMetadata(existingNode, ROUTING_TABLE_REPO);
for (RepositoryMetadata repositoryMetadata : repositoriesMetadata.repositories()) {
if (repositoryMetadata.name().equals(clusterStateRepoMetadata.name())) {
assertTrue(clusterStateRepoMetadata.equalsIgnoreGenerations(repositoryMetadata));
} else if (repositoryMetadata.name().equals(routingTableRepoMetadata.name())) {
assertTrue(routingTableRepoMetadata.equalsIgnoreGenerations(repositoryMetadata));
}
}
}

private DiscoveryNode newDiscoveryNode(Map<String, String> attributes) {
return new DiscoveryNode(
randomAlphaOfLength(10),
Expand Down

0 comments on commit f54d4f3

Please sign in to comment.