Skip to content

Commit 4c86da9

Browse files
authored
Put CCR tasks on (data && remote cluster clients) (#54146)
Today we assign CCR persistent tasks to nodes with the data role. It could be that the data node is not capable of connecting to remote clusters, in which case the task will fail since it can not connect to the remote cluster with the leader shard. Instead, we need to assign such tasks to nodes that are capable of connecting to remote clusters. This commit addresses this by enabling such persistent tasks to only be assigned to nodes that have the data role, and also have the remote cluster client role.
1 parent db6aba4 commit 4c86da9

File tree

2 files changed

+123
-0
lines changed

2 files changed

+123
-0
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.cluster.metadata.AliasMetaData;
3131
import org.elasticsearch.cluster.metadata.IndexMetaData;
3232
import org.elasticsearch.cluster.metadata.MappingMetaData;
33+
import org.elasticsearch.cluster.node.DiscoveryNode;
3334
import org.elasticsearch.cluster.routing.IndexRoutingTable;
3435
import org.elasticsearch.cluster.service.ClusterService;
3536
import org.elasticsearch.common.CheckedConsumer;
@@ -53,6 +54,7 @@
5354
import org.elasticsearch.persistent.AllocatedPersistentTask;
5455
import org.elasticsearch.persistent.PersistentTaskState;
5556
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
57+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
5658
import org.elasticsearch.persistent.PersistentTasksExecutor;
5759
import org.elasticsearch.tasks.TaskId;
5860
import org.elasticsearch.threadpool.Scheduler;
@@ -75,6 +77,7 @@
7577
import java.util.function.Consumer;
7678
import java.util.function.LongConsumer;
7779
import java.util.function.LongSupplier;
80+
import java.util.function.Predicate;
7881
import java.util.function.Supplier;
7982

8083
import static org.elasticsearch.xpack.ccr.CcrLicenseChecker.wrapClient;
@@ -114,6 +117,21 @@ public void validate(ShardFollowTask params, ClusterState clusterState) {
114117
}
115118
}
116119

120+
private static final Assignment NO_ASSIGNMENT = new Assignment(null, "no nodes found with data and remote cluster client roles");
121+
122+
@Override
123+
public Assignment getAssignment(final ShardFollowTask params, final ClusterState clusterState) {
124+
final DiscoveryNode node = selectLeastLoadedNode(
125+
clusterState,
126+
((Predicate<DiscoveryNode>) DiscoveryNode::isDataNode).and(DiscoveryNode::isRemoteClusterClient)
127+
);
128+
if (node == null) {
129+
return NO_ASSIGNMENT;
130+
} else {
131+
return new Assignment(node.getId(), "node is the least loaded data node and remote cluster client");
132+
}
133+
}
134+
117135
@Override
118136
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
119137
PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> taskInProgress,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr.action;
8+
9+
import org.elasticsearch.Version;
10+
import org.elasticsearch.client.Client;
11+
import org.elasticsearch.cluster.ClusterName;
12+
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.node.DiscoveryNode;
14+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
15+
import org.elasticsearch.cluster.node.DiscoveryNodes;
16+
import org.elasticsearch.cluster.service.ClusterService;
17+
import org.elasticsearch.common.UUIDs;
18+
import org.elasticsearch.common.settings.ClusterSettings;
19+
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.common.settings.SettingsModule;
21+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
22+
import org.elasticsearch.test.ESTestCase;
23+
import org.elasticsearch.threadpool.ThreadPool;
24+
import org.elasticsearch.xpack.ccr.CcrSettings;
25+
26+
import java.util.HashSet;
27+
import java.util.Map;
28+
import java.util.Set;
29+
import java.util.function.BiConsumer;
30+
import java.util.function.Supplier;
31+
32+
import static org.hamcrest.Matchers.equalTo;
33+
import static org.mockito.Mockito.mock;
34+
import static org.mockito.Mockito.when;
35+
36+
public class ShardFollowTasksExecutorAssignmentTests extends ESTestCase {
37+
38+
public void testAssignmentToNodeWithDataAndRemoteClusterClientRoles() {
39+
runAssignmentTest(
40+
Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE),
41+
randomIntBetween(0, 8),
42+
() -> new HashSet<>(randomSubsetOf(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.INGEST_ROLE))),
43+
(theSpecial, assignment) -> {
44+
assertTrue(assignment.isAssigned());
45+
assertThat(assignment.getExecutorNode(), equalTo(theSpecial.getId()));
46+
}
47+
);
48+
}
49+
50+
public void testDataRoleWithoutRemoteClusterServiceRole() {
51+
runNoAssignmentTest(Set.of(DiscoveryNodeRole.DATA_ROLE));
52+
}
53+
54+
public void testRemoteClusterClientRoleWithoutDataRole() {
55+
runNoAssignmentTest(Set.of(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE));
56+
}
57+
58+
private void runNoAssignmentTest(final Set<DiscoveryNodeRole> roles) {
59+
runAssignmentTest(
60+
roles,
61+
0,
62+
Set::of,
63+
(theSpecial, assignment) -> {
64+
assertFalse(assignment.isAssigned());
65+
assertThat(assignment.getExplanation(), equalTo("no nodes found with data and remote cluster client roles"));
66+
}
67+
);
68+
}
69+
70+
private void runAssignmentTest(
71+
final Set<DiscoveryNodeRole> theSpecialRoles,
72+
final int numberOfOtherNodes,
73+
final Supplier<Set<DiscoveryNodeRole>> otherNodesRolesSupplier,
74+
final BiConsumer<DiscoveryNode, Assignment> consumer
75+
) {
76+
final ClusterService clusterService = mock(ClusterService.class);
77+
when(clusterService.getClusterSettings())
78+
.thenReturn(new ClusterSettings(Settings.EMPTY, Set.of(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT)));
79+
final SettingsModule settingsModule = mock(SettingsModule.class);
80+
when(settingsModule.getSettings()).thenReturn(Settings.EMPTY);
81+
final ShardFollowTasksExecutor executor =
82+
new ShardFollowTasksExecutor(mock(Client.class), mock(ThreadPool.class), clusterService, settingsModule);
83+
final ClusterState.Builder clusterStateBuilder = ClusterState.builder(new ClusterName("test"));
84+
final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
85+
final DiscoveryNode theSpecial = newNode(theSpecialRoles);
86+
nodesBuilder.add(theSpecial);
87+
for (int i = 0; i < numberOfOtherNodes; i++) {
88+
nodesBuilder.add(newNode(otherNodesRolesSupplier.get()));
89+
}
90+
clusterStateBuilder.nodes(nodesBuilder);
91+
final Assignment assignment = executor.getAssignment(mock(ShardFollowTask.class), clusterStateBuilder.build());
92+
consumer.accept(theSpecial, assignment);
93+
}
94+
95+
private static DiscoveryNode newNode(final Set<DiscoveryNodeRole> roles) {
96+
return new DiscoveryNode(
97+
"node_" + UUIDs.randomBase64UUID(random()),
98+
buildNewFakeTransportAddress(),
99+
Map.of(),
100+
roles,
101+
Version.CURRENT
102+
);
103+
}
104+
105+
}

0 commit comments

Comments
 (0)