Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void testRetryForAppendOnlyIndices() throws Exception {
Client client = internalCluster().coordOnlyNodeClient();
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
NodeStats unluckyNode = randomFrom(
nodeStats.getNodes().stream().filter((s) -> s.getNode().isDataNode()).collect(Collectors.toList())
nodeStats.getNodes().stream().filter((s) -> s.getNode().canContainData()).collect(Collectors.toList())
);
assertAcked(
client().admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ public void testCorruptionOnNetworkLayerFinalizingRecovery() throws ExecutionExc
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
List<NodeStats> dataNodeStats = new ArrayList<>();
for (NodeStats stat : nodeStats.getNodes()) {
if (stat.getNode().isDataNode()) {
if (stat.getNode().canContainData()) {
dataNodeStats.add(stat);
}
}
Expand Down Expand Up @@ -433,7 +433,7 @@ public void testCorruptionOnNetworkLayer() throws ExecutionException, Interrupte
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
List<NodeStats> dataNodeStats = new ArrayList<>();
for (NodeStats stat : nodeStats.getNodes()) {
if (stat.getNode().isDataNode()) {
if (stat.getNode().canContainData()) {
dataNodeStats.add(stat);
}
}
Expand Down Expand Up @@ -698,7 +698,7 @@ public void testPrimaryCorruptionDuringReplicationDoesNotFailReplicaShard() thro
final NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
final List<NodeStats> dataNodeStats = nodeStats.getNodes()
.stream()
.filter(stat -> stat.getNode().isDataNode())
.filter(stat -> stat.getNode().canContainData())
.collect(Collectors.toUnmodifiableList());
MatcherAssert.assertThat(dataNodeStats.size(), greaterThanOrEqualTo(2));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void testRetryDueToExceptionOnNetworkLayer() throws ExecutionException, I
Client client = internalCluster().coordOnlyNodeClient();
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
NodeStats unluckyNode = randomFrom(
nodeStats.getNodes().stream().filter((s) -> s.getNode().isDataNode()).collect(Collectors.toList())
nodeStats.getNodes().stream().filter((s) -> s.getNode().canContainData()).collect(Collectors.toList())
);
assertAcked(
client().admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ private static RequestCacheStats getRequestCacheStats(Client client, String inde
private static RequestCacheStats getNodeCacheStats(Client client) {
NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().execute().actionGet();
for (NodeStats stat : stats.getNodes()) {
if (stat.getNode().isDataNode()) {
if (stat.getNode().canContainData()) {
return stat.getIndices().getRequestCache();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ private static RequestCacheStats getRequestCacheStats(Client client, String inde
private static RequestCacheStats getNodeCacheStats(Client client) {
NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().execute().actionGet();
for (NodeStats stat : stats.getNodes()) {
if (stat.getNode().isDataNode()) {
if (stat.getNode().canContainData()) {
return stat.getIndices().getRequestCache();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public void testLimitsRequestSize() {
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
List<NodeStats> dataNodeStats = new ArrayList<>();
for (NodeStats stat : nodeStats.getNodes()) {
if (stat.getNode().isDataNode()) {
if (stat.getNode().canContainData()) {
dataNodeStats.add(stat);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ public void testRerouteRecovery() throws Exception {
.get();
List<NodeStats> dataNodeStats = statsResponse1.getNodes()
.stream()
.filter(nodeStats -> nodeStats.getNode().isDataNode())
.filter(nodeStats -> nodeStats.getNode().canContainData())
.collect(Collectors.toList());
assertThat(dataNodeStats, hasSize(2));
for (NodeStats nodeStats : dataNodeStats) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public void testCloseWhileRelocatingShards() throws Exception {
);

for (DiscoveryNode node : state.getNodes()) {
if (node.isDataNode() && node.getName().equals(targetNode) == false) {
if (node.canContainData() && node.getName().equals(targetNode) == false) {
final TransportService sourceTransportService = internalCluster().getInstance(TransportService.class, node.getName());
targetTransportService.addSendBehavior(sourceTransportService, sendBehavior);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testCancelRecoveryAndResume() throws Exception {
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
List<NodeStats> dataNodeStats = new ArrayList<>();
for (NodeStats stat : nodeStats.getNodes()) {
if (stat.getNode().isDataNode()) {
if (stat.getNode().canContainData()) {
dataNodeStats.add(stat);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void testZeroRemoteStatsOnNodesStatsForClusterManager() {

assertTrue(
nodesStatsResponseForClusterManager.getNodes().get(0).getNode().isClusterManagerNode()
&& !nodesStatsResponseForClusterManager.getNodes().get(0).getNode().isDataNode()
&& !nodesStatsResponseForClusterManager.getNodes().get(0).getNode().canContainData()
);
assertZeroRemoteSegmentStats(
nodesStatsResponseForClusterManager.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats()
Expand All @@ -127,7 +127,7 @@ public void testZeroRemoteStatsOnNodesStatsForClusterManager() {
.setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true).set(CommonStatsFlags.Flag.Translog, true))
.get();

assertTrue(nodesStatsResponseForDataNode.getNodes().get(0).getNode().isDataNode());
assertTrue(nodesStatsResponseForDataNode.getNodes().get(0).getNode().canContainData());
RemoteSegmentStats remoteSegmentStats = nodesStatsResponseForDataNode.getNodes()
.get(0)
.getIndices()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ private void assertNoSearchInAZ(String az) {
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet();
for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal();
if (stat.getNode().isDataNode()) {
if (stat.getNode().canContainData()) {
if (stat.getNode().getId().equals(dataNodeId)) {
assertEquals(0, searchStats.getQueryCount());
assertEquals(0, searchStats.getFetchCount());
Expand All @@ -870,7 +870,7 @@ private void assertSearchInAZ(String az) {
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet();
for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal();
if (stat.getNode().isDataNode()) {
if (stat.getNode().canContainData()) {
if (stat.getNode().getId().equals(dataNodeId)) {
Assert.assertTrue(searchStats.getFetchCount() > 0L || searchStats.getQueryCount() > 0L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ private void executeHealth(
} else {
DiscoveryNode localNode = currentState.getNodes().getLocalNode();
// TODO: make this check more generic, check for node role instead
if (localNode.isDataNode()) {
if (localNode.canContainData()) {
assert request.local() == true : "local node request false for request for local node weighed in";
boolean weighedAway = WeightedRoutingUtils.isWeighedAway(localNode.getId(), currentState);
if (weighedAway) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,15 @@ public void clusterChanged(ClusterChangedEvent event) {

// Refresh if a data node was added
for (DiscoveryNode addedNode : event.nodesDelta().addedNodes()) {
if (addedNode.isDataNode()) {
if (addedNode.canContainData()) {
executeRefresh(event.state(), "data node added");
break;
}
}

// Clean up info for any removed nodes
for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) {
if (removedNode.isDataNode()) {
if (removedNode.canContainData()) {
logger.trace("Removing node from cluster info: {}", removedNode.getId());
if (leastAvailableSpaceUsages.containsKey(removedNode.getId())) {
Map<String, DiskUsage> newMaxUsages = new HashMap<>(leastAvailableSpaceUsages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class DiscoveryNode implements VerifiableWriteable, ToXContentFragment {

public static boolean nodeRequiresLocalStorage(Settings settings) {
boolean localStorageEnable = Node.NODE_LOCAL_STORAGE_SETTING.get(settings);
if (localStorageEnable == false && (isDataNode(settings) || isClusterManagerNode(settings))) {
if (localStorageEnable == false && (canContainData(settings) || isClusterManagerNode(settings))) {
// TODO: make this a proper setting validation logic, requiring multi-settings validation
throw new IllegalArgumentException("storage can not be disabled for cluster-manager and data nodes");
}
Expand Down Expand Up @@ -110,10 +110,14 @@ public static boolean isClusterManagerNode(Settings settings) {
* not all roles may be available from a static/initializing context such as a {@link Setting}
* default value function. In that case, be warned that this may not include all plugin roles.
*/
public static boolean isDataNode(final Settings settings) {
public static boolean canContainData(final Settings settings) {
return getRolesFromSettings(settings).stream().anyMatch(DiscoveryNodeRole::canContainData);
}

public static boolean isDataNode(final Settings settings) {
return hasRole(settings, DiscoveryNodeRole.DATA_ROLE);
}

public static boolean isIngestNode(Settings settings) {
return hasRole(settings, DiscoveryNodeRole.INGEST_ROLE);
}
Expand Down Expand Up @@ -452,10 +456,14 @@ public Map<String, String> getAttributes() {
/**
* Should this node hold data (shards) or not.
*/
public boolean isDataNode() {
public boolean canContainData() {
return roles.stream().anyMatch(DiscoveryNodeRole::canContainData);
}

public boolean isDataNode() {
return roles.contains(DiscoveryNodeRole.DATA_ROLE);
}

/**
* Can this node become cluster-manager or not.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ public DiscoveryNodes build() {
clusterManagerNodesBuilder.put(nodeEntry.getKey(), nodeEntry.getValue());
}
final Version version = nodeEntry.getValue().getVersion();
if (nodeEntry.getValue().isDataNode() || nodeEntry.getValue().isClusterManagerNode()) {
if (nodeEntry.getValue().canContainData() || nodeEntry.getValue().isClusterManagerNode()) {
if (minNonClientNodeVersion == null) {
minNonClientNodeVersion = version;
maxNonClientNodeVersion = version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public String node() {
* Handle case where a disco node cannot be found in the routing table. Usually means that it's not a data node.
*/
protected RerouteExplanation explainOrThrowMissingRoutingNode(RoutingAllocation allocation, boolean explain, DiscoveryNode discoNode) {
if (!discoNode.isDataNode()) {
if (!discoNode.canContainData()) {
return explainOrThrowRejectedCommand(explain, allocation, "allocation can only be done on data nodes, not [" + node + "]");
} else {
return explainOrThrowRejectedCommand(explain, allocation, "could not find [" + node + "] among the routing nodes");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)

boolean found = false;
RoutingNode fromRoutingNode = allocation.routingNodes().node(fromDiscoNode.getId());
if (fromRoutingNode == null && !fromDiscoNode.isDataNode()) {
if (fromRoutingNode == null && !fromDiscoNode.canContainData()) {
throw new IllegalArgumentException(
"[move_allocation] can't move ["
+ index
Expand All @@ -136,7 +136,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
);
}
RoutingNode toRoutingNode = allocation.routingNodes().node(toDiscoNode.getId());
if (toRoutingNode == null && !toDiscoNode.isDataNode()) {
if (toRoutingNode == null && !toDiscoNode.canContainData()) {
throw new IllegalArgumentException(
"[move_allocation] can't move ["
+ index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,11 +376,11 @@ public NodeEnvironment(Settings settings, Environment environment, IndexStoreLis
applySegmentInfosTrace(settings);
assertCanWrite();

if (DiscoveryNode.isClusterManagerNode(settings) || DiscoveryNode.isDataNode(settings)) {
if (DiscoveryNode.isClusterManagerNode(settings) || DiscoveryNode.canContainData(settings)) {
ensureAtomicMoveSupported(nodePaths);
}

if (DiscoveryNode.isDataNode(settings) == false) {
if (DiscoveryNode.canContainData(settings) == false) {
if (DiscoveryNode.isClusterManagerNode(settings) == false) {
ensureNoIndexMetadata(nodePaths);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void testExecute(Terminal terminal, OptionSet options, Environment env) throws E
@Override
protected boolean validateBeforeLock(Terminal terminal, Environment env) {
Settings settings = env.settings();
if (DiscoveryNode.isDataNode(settings) && DiscoveryNode.isWarmNode(settings)) {
if (DiscoveryNode.canContainData(settings) && DiscoveryNode.isWarmNode(settings)) {
terminal.println(Terminal.Verbosity.NORMAL, NO_CLEANUP);
return false;
}
Expand All @@ -97,9 +97,9 @@ protected boolean validateBeforeLock(Terminal terminal, Environment env) {
@Override
protected void processNodePaths(Terminal terminal, Path[] dataPaths, int nodeLockId, OptionSet options, Environment env)
throws IOException {
assert DiscoveryNode.isDataNode(env.settings()) == false || DiscoveryNode.isWarmNode(env.settings()) == false;
assert DiscoveryNode.canContainData(env.settings()) == false || DiscoveryNode.isWarmNode(env.settings()) == false;

boolean repurposeData = DiscoveryNode.isDataNode(env.settings()) == false;
boolean repurposeData = DiscoveryNode.canContainData(env.settings()) == false;
boolean repurposeWarm = DiscoveryNode.isWarmNode(env.settings()) == false;

if (DiscoveryNode.isClusterManagerNode(env.settings()) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void start(
assert this.persistedStateRegistry == null : "Persisted state registry should only be set once";
this.persistedStateRegistry = persistedStateRegistry;

if (DiscoveryNode.isClusterManagerNode(settings) || DiscoveryNode.isDataNode(settings)) {
if (DiscoveryNode.isClusterManagerNode(settings) || DiscoveryNode.canContainData(settings)) {
try {
final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadBestOnDiskState();

Expand Down Expand Up @@ -205,7 +205,7 @@ public void start(
new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState)
);
}
if (DiscoveryNode.isDataNode(settings)) {
if (DiscoveryNode.canContainData(settings)) {
metaStateService.unreferenceAll(); // unreference legacy files (only keep them for dangling indices functionality)
} else {
metaStateService.deleteAll(); // delete legacy files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ static List<IndexMetadataAction> resolveIndexMetadataActions(

// exposed for tests
static Set<Index> getRelevantIndices(ClusterState state) {
assert state.nodes().getLocalNode().isDataNode();
assert state.nodes().getLocalNode().canContainData();
final RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
if (newRoutingNode == null) {
throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2024,7 +2024,7 @@ private void setIdFieldDataEnabled(boolean value) {
}

private void updateDanglingIndicesInfo(Index index) {
assert DiscoveryNode.isDataNode(settings) : "dangling indices information should only be persisted on data nodes";
assert DiscoveryNode.canContainData(settings) : "dangling indices information should only be persisted on data nodes";
assert nodeWriteDanglingIndicesInfo : "writing dangling indices info is not enabled";
assert danglingIndicesThreadPoolExecutor != null : "executor for dangling indices info is not available";
if (danglingIndicesToWrite.add(index)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,14 @@ public IndicesClusterStateService(
@Override
protected void doStart() {
// Doesn't make sense to manage shards on non-master and non-data nodes
if (DiscoveryNode.isDataNode(settings) || DiscoveryNode.isClusterManagerNode(settings)) {
if (DiscoveryNode.canContainData(settings) || DiscoveryNode.isClusterManagerNode(settings)) {
clusterService.addHighPriorityApplier(this);
}
}

@Override
protected void doStop() {
if (DiscoveryNode.isDataNode(settings) || DiscoveryNode.isClusterManagerNode(settings)) {
if (DiscoveryNode.canContainData(settings) || DiscoveryNode.isClusterManagerNode(settings)) {
clusterService.removeApplier(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,15 @@ public PeerRecoverySourceService(TransportService transportService, IndicesServi
@Override
protected void doStart() {
final ClusterService clusterService = indicesService.clusterService();
if (DiscoveryNode.isDataNode(clusterService.getSettings())) {
if (DiscoveryNode.canContainData(clusterService.getSettings())) {
clusterService.addListener(this);
}
}

@Override
protected void doStop() {
final ClusterService clusterService = indicesService.clusterService();
if (DiscoveryNode.isDataNode(clusterService.getSettings())) {
if (DiscoveryNode.canContainData(clusterService.getSettings())) {
ongoingRecoveries.awaitEmpty();
indicesService.clusterService().removeListener(this);
}
Expand Down
Loading
Loading