Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refresh OpenSearch nodes version in cluster state after upgrade #865

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.upgrades;

import org.opensearch.Version;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.common.io.Streams;

import java.io.IOException;

public class RefreshVersionInClusterStateIT extends AbstractRollingTestCase {

/*
This test ensures that after the upgrade from ElasticSearch/ OpenSearch all nodes report the version on and after 1.0.0
*/
public void testRefresh() throws IOException {
switch (CLUSTER_TYPE) {
case OLD:
case MIXED:
break;
case UPGRADED:
Response response = client().performRequest(new Request("GET", "/_cat/nodes?h=id,version"));
for (String nodeLine : Streams.readAllLines(response.getEntity().getContent())) {
String[] elements = nodeLine.split(" +");
assertEquals(Version.fromString(elements[1]), Version.CURRENT);
}
break;
default:
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public class JoinHelper {
this.transportService = transportService;
this.nodeHealthService = nodeHealthService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService) {
this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService,
transportService) {

private final long term = currentTermSupplier.getAsLong();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package org.opensearch.cluster.coordination;

import org.apache.logging.log4j.Logger;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.ClusterState;
Expand All @@ -48,6 +49,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.discovery.zen.ElectMasterService;
import org.opensearch.persistent.PersistentTasksCustomMetadata;
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -67,6 +69,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut

private final Logger logger;
private final RerouteService rerouteService;
private final TransportService transportService;

private final int minimumMasterNodesOnLocalNode;

Expand Down Expand Up @@ -105,11 +108,13 @@ public boolean isFinishElectionTask() {
private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
}

public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger, RerouteService rerouteService) {
public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger,
RerouteService rerouteService, TransportService transportService) {
this.allocationService = allocationService;
this.logger = logger;
minimumMasterNodesOnLocalNode = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
this.rerouteService = rerouteService;
this.transportService = transportService;
}

@Override
Expand Down Expand Up @@ -150,7 +155,7 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
for (final Task joinTask : joiningNodes) {
if (joinTask.isBecomeMasterTask() || joinTask.isFinishElectionTask()) {
// noop
} else if (currentNodes.nodeExistsWithSameRoles(joinTask.node())) {
} else if (currentNodes.nodeExistsWithSameRoles(joinTask.node()) && !currentNodes.nodeExistsWithBWCVersion(joinTask.node())) {
logger.debug("received a join request for an existing node [{}]", joinTask.node());
} else {
final DiscoveryNode node = joinTask.node();
Expand Down Expand Up @@ -222,8 +227,10 @@ protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState
nodesBuilder.masterNodeId(currentState.nodes().getLocalNodeId());

for (final Task joinTask : joiningNodes) {
if (joinTask.isBecomeMasterTask() || joinTask.isFinishElectionTask()) {
// noop
if (joinTask.isBecomeMasterTask()) {
refreshDiscoveryNodeVersionAfterUpgrade(currentNodes, nodesBuilder);
} else if (joinTask.isFinishElectionTask()) {
//no-op
} else {
final DiscoveryNode joiningNode = joinTask.node();
final DiscoveryNode nodeWithSameId = nodesBuilder.get(joiningNode.getId());
Expand Down Expand Up @@ -254,6 +261,45 @@ protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState
return ClusterState.builder(allocationService.disassociateDeadNodes(tmpState, false, "removed dead nodes on election"));
}

private void refreshDiscoveryNodeVersionAfterUpgrade(DiscoveryNodes currentNodes, DiscoveryNodes.Builder nodesBuilder) {
// During the upgrade from Elasticsearch, OpenSearch node send their version as 7.10.2 to Elasticsearch master
// in order to successfully join the cluster. But as soon as OpenSearch node becomes the master, cluster state
// should show the OpenSearch nodes version as 1.x. As the cluster state was carry forwarded from ES master,
// version in DiscoveryNode is stale 7.10.2. As soon as OpenSearch node becomes master, it can refresh the
// DiscoveryNodes version and publish the updated state while finishing the election. This helps in atomically
// updating the version of those node which have connection with the new master.
// Note: This should get deprecated with BWC mode logic
if(null == transportService) {
// this logic is only applicable when OpenSearch node is master and is noop for zen discovery node
return;
}
if(currentNodes.getMinNodeVersion().before(Version.V_1_0_0)) {
Map<String, Version> channelVersions = transportService.getChannelVersion(currentNodes);
for (DiscoveryNode node : currentNodes) {
if(channelVersions.containsKey(node.getId())) {
if (channelVersions.get(node.getId()) != node.getVersion()) {
DiscoveryNode tmpNode = nodesBuilder.get(node.getId());
nodesBuilder.remove(node.getId());
nodesBuilder.add(new DiscoveryNode(tmpNode.getName(), tmpNode.getId(), tmpNode.getEphemeralId(),
tmpNode.getHostName(), tmpNode.getHostAddress(), tmpNode.getAddress(), tmpNode.getAttributes(),
tmpNode.getRoles(), channelVersions.get(tmpNode.getId())));
logger.info("Refreshed the DiscoveryNode version for node {}:{} from {} to {}",
node.getId(), node.getAddress(), node.getVersion(), channelVersions.get(tmpNode.getId()));
}
} else {
// in case existing OpenSearch node is present in the cluster and but there is no connection to that node yet,
// either that node will send new JoinRequest to the master with version >=1.0, then no issue or
// there is an edge case if doesn't send JoinRequest and connection is established,
// then it can continue to report version as 7.10.2 instead of actual OpenSearch version. So,
// removing the node from cluster state to prevent stale version reporting and let it reconnect.
if (node.getVersion().equals(LegacyESVersion.V_7_10_2)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it not be any other legacy version e.g. 6.8.x or 7.10.x which we are upgrading from?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're only concerned about the OpenSearch nodes in BWC mode, any other ES node that is not yet upgraded should run in mixed cluster - is that correct understanding?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For other ES versions, the version in cluster state and channel version would always be same. So, there is no need for special handling. Its only for OpenSearch nodes which spoof to be 7.10.2 and we don't know if it is real 7.10.2 or OpenSearch node spoofing its version.

nodesBuilder.remove(node.getId());
}
}
}
}
}

@Override
public boolean runOnlyOnMaster() {
// we validate that we are allowed to change the cluster state during cluster state processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public void writeTo(StreamOutput out) throws IOException {
}
}
}
if (out.getVersion().before(Version.V_1_0_0)) {
if (out.getVersion().before(Version.V_1_0_0) && version.onOrAfter(Version.V_1_0_0)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please help me understand why we're adding this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, DiscoveryNode writeTo serialization method can be triggered from multiple places e.g. when cluster state is being sent from one node to another and BWC version should be set for only OpenSearch nodes. Without this check, it will mess with the version for Elasticsearch nodes in the cluster as well which ideally should be untouched.

Version.writeVersion(LegacyESVersion.V_7_10_2, out);
} else {
Version.writeVersion(version, out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.cluster.AbstractDiffable;
import org.opensearch.cluster.Diff;
Expand Down Expand Up @@ -226,6 +227,16 @@ public boolean nodeExistsWithSameRoles(DiscoveryNode discoveryNode) {
return existing != null && existing.equals(discoveryNode) && existing.getRoles().equals(discoveryNode.getRoles());
}

/**
* Determine if the given node exists and has the right version. During upgrade from Elasticsearch version as OpenSearch node run in
* BWC mode and can have the version as 7.10.2 in cluster state from older master to OpenSearch master.
*/
public boolean nodeExistsWithBWCVersion(DiscoveryNode discoveryNode) {
final DiscoveryNode existing = nodes.get(discoveryNode.getId());
return existing != null && existing.equals(discoveryNode) &&
existing.getVersion().equals(LegacyESVersion.V_7_10_2) && discoveryNode.getVersion().onOrAfter(Version.V_1_0_0);
}

/**
* Get the id of the master node
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class NodeJoinController {
public NodeJoinController(Settings settings, MasterService masterService, AllocationService allocationService,
ElectMasterService electMaster, RerouteService rerouteService) {
this.masterService = masterService;
joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, rerouteService) {
joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, rerouteService, null) {
@Override
public void clusterStatePublished(ClusterChangedEvent event) {
electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.client.transport.TransportClient;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable;
import org.opensearch.common.Strings;
import org.opensearch.common.component.AbstractLifecycleComponent;
Expand Down Expand Up @@ -74,6 +75,7 @@
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -705,6 +707,18 @@ public Transport.Connection getConnection(DiscoveryNode node) {
}
}

public Map<String, Version> getChannelVersion(DiscoveryNodes nodes) {
Map<String, Version> nodeChannelVersions = new HashMap<>(nodes.getSize());
for (DiscoveryNode node: nodes) {
try {
nodeChannelVersions.putIfAbsent(node.getId(), connectionManager.getConnection(node).getVersion());
} catch (Exception e) {
// ignore in case node is not connected
}
}
return nodeChannelVersions;
}

public final <T extends TransportResponse> void sendChildRequest(final DiscoveryNode node, final String action,
final TransportRequest request, final Task parentTask,
final TransportRequestOptions options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;
import org.opensearch.transport.TransportService;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import static org.mockito.Matchers.anyBoolean;
import static org.opensearch.test.VersionUtils.getPreviousVersion;
import static org.opensearch.test.VersionUtils.incompatibleFutureVersion;
import static org.opensearch.test.VersionUtils.maxCompatibleVersion;
Expand Down Expand Up @@ -173,7 +177,7 @@ public void testUpdatesNodeWithNewRoles() throws Exception {
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);

final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, rerouteService);
final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, rerouteService, null);

final DiscoveryNode masterNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);

Expand All @@ -196,4 +200,61 @@ public void testUpdatesNodeWithNewRoles() throws Exception {

assertThat(result.resultingState.getNodes().get(actualNode.getId()).getRoles(), equalTo(actualNode.getRoles()));
}

public void testUpdatesNodeWithOpenSearchVersionForExistingAndNewNodes() throws Exception {
// During the upgrade from Elasticsearch, OpenSearch node send their version as 7.10.2 to Elasticsearch master
// in order to successfully join the cluster. But as soon as OpenSearch node becomes the master, cluster state
// should show the OpenSearch nodes version as 1.x. As the cluster state was carry forwarded from ES master,
// version in DiscoveryNode is stale 7.10.2.
final AllocationService allocationService = mock(AllocationService.class);
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
when(allocationService.disassociateDeadNodes(any(), anyBoolean(), any())).then(
invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
Map<String, Version> channelVersions = new HashMap<>();
String node_1 = UUIDs.base64UUID(); // OpenSearch node running BWC version
String node_2 = UUIDs.base64UUID(); // OpenSearch node running BWC version
String node_3 = UUIDs.base64UUID(); // OpenSearch node running BWC version, sending new join request and no active channel
String node_4 = UUIDs.base64UUID(); // ES node 7.10.2
String node_5 = UUIDs.base64UUID(); // ES node 7.10.2 in cluster but missing channel version
String node_6 = UUIDs.base64UUID(); // ES node 7.9.0
String node_7 = UUIDs.base64UUID(); // ES node 7.9.0 in cluster but missing channel version
channelVersions.put(node_1, LegacyESVersion.CURRENT);
channelVersions.put(node_2, LegacyESVersion.CURRENT);
channelVersions.put(node_4, LegacyESVersion.V_7_10_2);
channelVersions.put(node_6, LegacyESVersion.V_7_9_0);

final TransportService transportService = mock(TransportService.class);
when(transportService.getChannelVersion(any())).thenReturn(channelVersions);
DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().localNodeId(node_1);
nodes.add(new DiscoveryNode(node_1, buildNewFakeTransportAddress(), LegacyESVersion.V_7_10_2));
nodes.add(new DiscoveryNode(node_2, buildNewFakeTransportAddress(), LegacyESVersion.V_7_10_2));
nodes.add(new DiscoveryNode(node_3, buildNewFakeTransportAddress(), LegacyESVersion.V_7_10_2));
nodes.add(new DiscoveryNode(node_4, buildNewFakeTransportAddress(), LegacyESVersion.V_7_10_2));
nodes.add(new DiscoveryNode(node_5, buildNewFakeTransportAddress(), LegacyESVersion.V_7_10_2));
nodes.add(new DiscoveryNode(node_6, buildNewFakeTransportAddress(), LegacyESVersion.V_7_9_0));
nodes.add(new DiscoveryNode(node_7, buildNewFakeTransportAddress(), LegacyESVersion.V_7_9_0));
final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).build();
final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger,
rerouteService, transportService);
final DiscoveryNode existing_node_3 = clusterState.nodes().get(node_3);
final DiscoveryNode node_3_new_join = new DiscoveryNode(existing_node_3.getName(), existing_node_3.getId(),
existing_node_3.getEphemeralId(), existing_node_3.getHostName(), existing_node_3.getHostAddress(),
existing_node_3.getAddress(), existing_node_3.getAttributes(), existing_node_3.getRoles(), Version.CURRENT);

final ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> result
= joinTaskExecutor.execute(clusterState, List.of(new JoinTaskExecutor.Task(node_3_new_join, "test"),
JoinTaskExecutor.newBecomeMasterTask(), JoinTaskExecutor.newFinishElectionTask()));
final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next();
assertTrue(taskResult.isSuccess());
DiscoveryNodes resultNodes = result.resultingState.getNodes();
assertEquals(resultNodes.get(node_1).getVersion(), Version.CURRENT);
assertEquals(resultNodes.get(node_2).getVersion(), Version.CURRENT);
assertEquals(resultNodes.get(node_3).getVersion(), Version.CURRENT); // 7.10.2 in old state but sent new join and processed
assertEquals(resultNodes.get(node_4).getVersion(), LegacyESVersion.V_7_10_2);
assertFalse(resultNodes.nodeExists(node_5)); // 7.10.2 node without active channel will be removed and should rejoin
assertEquals(resultNodes.get(node_6).getVersion(), LegacyESVersion.V_7_9_0);
// 7.9.0 node without active channel but shouldn't get removed
assertEquals(resultNodes.get(node_7).getVersion(), LegacyESVersion.V_7_9_0);
}
}
Loading