Skip to content

Allow CCR on nodes with legacy roles only #60093

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 29, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster.node;

import org.elasticsearch.Version;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -172,6 +173,12 @@ public Setting<Boolean> legacySetting() {
public static SortedSet<DiscoveryNodeRole> BUILT_IN_ROLES = Collections.unmodifiableSortedSet(
new TreeSet<>(Arrays.asList(DATA_ROLE, INGEST_ROLE, MASTER_ROLE, REMOTE_CLUSTER_CLIENT_ROLE)));

/**
* The version that {@link #REMOTE_CLUSTER_CLIENT_ROLE} is introduced. Nodes before this version do not have that role even
* they can connect to remote clusters.
*/
public static final Version REMOTE_CLUSTER_CLIENT_ROLE_VERSION = Version.V_7_8_0;

static SortedSet<DiscoveryNodeRole> LEGACY_ROLES =
Collections.unmodifiableSortedSet(new TreeSet<>(Arrays.asList(DATA_ROLE, INGEST_ROLE, MASTER_ROLE)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -124,14 +125,18 @@ public void validate(ShardFollowTask params, ClusterState clusterState) {

@Override
public Assignment getAssignment(final ShardFollowTask params, final ClusterState clusterState) {
final DiscoveryNode node = selectLeastLoadedNode(
clusterState,
DiscoveryNode selectedNode = selectLeastLoadedNode(clusterState,
((Predicate<DiscoveryNode>) DiscoveryNode::isDataNode).and(DiscoveryNode::isRemoteClusterClient)
);
if (node == null) {
if (selectedNode == null) {
// best effort as nodes before 7.8 might not be able to connect to remote clusters
selectedNode = selectLeastLoadedNode(clusterState,
node -> node.isDataNode() && node.getVersion().before(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE_VERSION));
}
if (selectedNode == null) {
return NO_ASSIGNMENT;
} else {
return new Assignment(node.getId(), "node is the least loaded data node and remote cluster client");
return new Assignment(selectedNode.getId(), "node is the least loaded data node and remote cluster client");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,14 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
return allocation.decision(Decision.YES, NAME,
"shard is a primary follower but was bootstrapped already; hence is not under the purview of this decider");
}
if (node.node().isRemoteClusterClient() == false) {
return allocation.decision(Decision.NO, NAME, "shard is a primary follower and being bootstrapped, but node does not have the "
+ DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role");
if (node.node().isRemoteClusterClient()) {
return allocation.decision(Decision.YES, NAME,
"shard is a primary follower and node has the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role");
}
if (node.node().getVersion().before(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE_VERSION)) {
return allocation.decision(Decision.YES, NAME, "shard is a primary follower and node has only the legacy roles");
}
return allocation.decision(Decision.YES, NAME,
"shard is a primary follower and node has the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role");
return allocation.decision(Decision.NO, NAME, "shard is a primary follower and being bootstrapped, but node does not have the "
+ DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.CcrSettings;

Expand All @@ -38,9 +40,13 @@ public class ShardFollowTasksExecutorAssignmentTests extends ESTestCase {

public void testAssignmentToNodeWithDataAndRemoteClusterClientRoles() {
runAssignmentTest(
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)),
randomIntBetween(0, 8),
() -> new HashSet<>(randomSubsetOf(new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.INGEST_ROLE)))),
newNode(
Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE),
VersionUtils.randomVersion(random())),
newNodes(
between(0, 8),
() -> Sets.newHashSet(randomSubsetOf(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.INGEST_ROLE))),
Version.CURRENT),
(theSpecial, assignment) -> {
assertTrue(assignment.isAssigned());
assertThat(assignment.getExecutorNode(), equalTo(theSpecial.getId()));
Expand All @@ -56,11 +62,26 @@ public void testRemoteClusterClientRoleWithoutDataRole() {
runNoAssignmentTest(Collections.singleton(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE));
}

public void testNodeWithLegacyRolesOnly() {
final Version oldVersion = VersionUtils.randomVersionBetween(random(),
Version.V_6_0_0, VersionUtils.getPreviousVersion(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE_VERSION));
runAssignmentTest(
newNode(Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE), oldVersion),
newNodes(
between(0, 8),
() -> Sets.newHashSet(randomSubsetOf(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.INGEST_ROLE))),
Version.CURRENT),
(theSpecial, assignment) -> {
assertTrue(assignment.isAssigned());
assertThat(assignment.getExecutorNode(), equalTo(theSpecial.getId()));
}
);
}

private void runNoAssignmentTest(final Set<DiscoveryNodeRole> roles) {
runAssignmentTest(
roles,
0,
Collections::emptySet,
newNode(roles, Version.CURRENT),
Collections.emptySet(),
(theSpecial, assignment) -> {
assertFalse(assignment.isAssigned());
assertThat(assignment.getExplanation(), equalTo("no nodes found with data and remote cluster client roles"));
Expand All @@ -69,9 +90,8 @@ private void runNoAssignmentTest(final Set<DiscoveryNodeRole> roles) {
}

private void runAssignmentTest(
final Set<DiscoveryNodeRole> theSpecialRoles,
final int numberOfOtherNodes,
final Supplier<Set<DiscoveryNodeRole>> otherNodesRolesSupplier,
final DiscoveryNode targetNode,
final Set<DiscoveryNode> otherNodes,
final BiConsumer<DiscoveryNode, Assignment> consumer
) {
final ClusterService clusterService = mock(ClusterService.class);
Expand All @@ -82,25 +102,30 @@ private void runAssignmentTest(
final ShardFollowTasksExecutor executor =
new ShardFollowTasksExecutor(mock(Client.class), mock(ThreadPool.class), clusterService, settingsModule);
final ClusterState.Builder clusterStateBuilder = ClusterState.builder(new ClusterName("test"));
final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
final DiscoveryNode theSpecial = newNode(theSpecialRoles);
nodesBuilder.add(theSpecial);
for (int i = 0; i < numberOfOtherNodes; i++) {
nodesBuilder.add(newNode(otherNodesRolesSupplier.get()));
final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder().add(targetNode);
for (DiscoveryNode node : otherNodes) {
nodesBuilder.add(node);
}
clusterStateBuilder.nodes(nodesBuilder);
final Assignment assignment = executor.getAssignment(mock(ShardFollowTask.class), clusterStateBuilder.build());
consumer.accept(theSpecial, assignment);
consumer.accept(targetNode, assignment);
}

private static DiscoveryNode newNode(final Set<DiscoveryNodeRole> roles) {
private static DiscoveryNode newNode(final Set<DiscoveryNodeRole> roles, final Version version) {
return new DiscoveryNode(
"node_" + UUIDs.randomBase64UUID(random()),
buildNewFakeTransportAddress(),
Collections.emptyMap(),
roles,
Version.CURRENT
version
);
}

private static Set<DiscoveryNode> newNodes(int numberOfNodes, Supplier<Set<DiscoveryNodeRole>> rolesSupplier, Version version) {
Set<DiscoveryNode> nodes = new HashSet<>();
for (int i = 0; i < numberOfNodes; i++) {
nodes.add(newNode(rolesSupplier.get(), version));
}
return nodes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.xpack.ccr.CcrSettings;

import java.util.ArrayList;
Expand All @@ -60,6 +61,7 @@
import java.util.List;
import java.util.Set;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -151,9 +153,10 @@ public void testBootstrappingFollowerIndex() {
IndexMetadata.Builder indexMetadata = IndexMetadata.builder(index)
.settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true))
.numberOfShards(1).numberOfReplicas(1);
DiscoveryNode dataOnlyNode = newNode("d1", Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE));
DiscoveryNode dataAndRemoteNode = newNode("dr1",
final DiscoveryNode dataOnlyNode = newNode("data_role_only", Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE));
final DiscoveryNode dataAndRemoteNode = newNode("data_and_remote_cluster_client_role",
Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE));
final DiscoveryNode nodeWithLegacyRolesOnly = newNodeWithLegacyRoles("legacy_roles_only");
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(dataOnlyNode).add(dataAndRemoteNode).build();
Metadata metadata = Metadata.builder().put(indexMetadata).build();
RoutingTable.Builder routingTable = RoutingTable.builder()
Expand All @@ -171,6 +174,11 @@ public void testBootstrappingFollowerIndex() {
Decision yesDecision = executeAllocation(clusterState, shardRouting.primaryShard(), dataAndRemoteNode);
assertThat(yesDecision.type(), equalTo(Decision.Type.YES));
assertThat(yesDecision.getExplanation(), equalTo("shard is a primary follower and node has the remote_cluster_client role"));

yesDecision = executeAllocation(clusterState, shardRouting.primaryShard(), nodeWithLegacyRolesOnly);
assertThat(yesDecision.type(), equalTo(Decision.Type.YES));
assertThat(yesDecision.getExplanation(), equalTo("shard is a primary follower and node has only the legacy roles"));

for (ShardRouting replica : shardRouting.replicaShards()) {
assertThat(replica.state(), equalTo(UNASSIGNED));
yesDecision = executeAllocation(clusterState, replica, randomFrom(dataOnlyNode, dataAndRemoteNode));
Expand All @@ -181,6 +189,12 @@ public void testBootstrappingFollowerIndex() {
}
}

static DiscoveryNode newNodeWithLegacyRoles(String id) {
final Version version = VersionUtils.randomVersionBetween(random(),
Version.V_6_0_0, VersionUtils.getPreviousVersion(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE_VERSION));
return new DiscoveryNode(id, buildNewFakeTransportAddress(), emptyMap(), Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE), version);
}

static Decision executeAllocation(ClusterState clusterState, ShardRouting shardRouting, DiscoveryNode node) {
final AllocationDecider decider = new CcrPrimaryFollowerAllocationDecider();
final RoutingAllocation routingAllocation = new RoutingAllocation(new AllocationDeciders(Collections.singletonList(decider)),
Expand Down