Skip to content

Commit

Permalink
Refresh OpenSearch nodes version in cluster state after upgrade (#865)
Browse files Browse the repository at this point in the history
Signed-off-by: Shweta Thareja <tharejas@amazon.com>

Co-authored-by: Shweta Thareja <tharejas@amazon.com>
  • Loading branch information
shwetathareja and Shweta Thareja authored Jul 2, 2021
1 parent 4a1add9 commit 8082604
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.upgrades;

import org.opensearch.Version;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.common.io.Streams;

import java.io.IOException;

public class RefreshVersionInClusterStateIT extends AbstractRollingTestCase {

/*
This test ensures that after the upgrade from ElasticSearch/ OpenSearch all nodes report the version on and after 1.0.0
*/
public void testRefresh() throws IOException {
switch (CLUSTER_TYPE) {
case OLD:
case MIXED:
break;
case UPGRADED:
Response response = client().performRequest(new Request("GET", "/_cat/nodes?h=id,version"));
for (String nodeLine : Streams.readAllLines(response.getEntity().getContent())) {
String[] elements = nodeLine.split(" +");
assertEquals(Version.fromString(elements[1]), Version.CURRENT);
}
break;
default:
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public class JoinHelper {
this.transportService = transportService;
this.nodeHealthService = nodeHealthService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService) {
this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService,
transportService) {

private final long term = currentTermSupplier.getAsLong();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package org.opensearch.cluster.coordination;

import org.apache.logging.log4j.Logger;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.ClusterState;
Expand All @@ -48,6 +49,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.discovery.zen.ElectMasterService;
import org.opensearch.persistent.PersistentTasksCustomMetadata;
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -67,6 +69,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut

private final Logger logger;
private final RerouteService rerouteService;
private final TransportService transportService;

private final int minimumMasterNodesOnLocalNode;

Expand Down Expand Up @@ -105,11 +108,13 @@ public boolean isFinishElectionTask() {
private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
}

public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger, RerouteService rerouteService) {
public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger,
RerouteService rerouteService, TransportService transportService) {
this.allocationService = allocationService;
this.logger = logger;
minimumMasterNodesOnLocalNode = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
this.rerouteService = rerouteService;
this.transportService = transportService;
}

@Override
Expand Down Expand Up @@ -150,7 +155,7 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
for (final Task joinTask : joiningNodes) {
if (joinTask.isBecomeMasterTask() || joinTask.isFinishElectionTask()) {
// noop
} else if (currentNodes.nodeExistsWithSameRoles(joinTask.node())) {
} else if (currentNodes.nodeExistsWithSameRoles(joinTask.node()) && !currentNodes.nodeExistsWithBWCVersion(joinTask.node())) {
logger.debug("received a join request for an existing node [{}]", joinTask.node());
} else {
final DiscoveryNode node = joinTask.node();
Expand Down Expand Up @@ -222,8 +227,10 @@ protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState
nodesBuilder.masterNodeId(currentState.nodes().getLocalNodeId());

for (final Task joinTask : joiningNodes) {
if (joinTask.isBecomeMasterTask() || joinTask.isFinishElectionTask()) {
// noop
if (joinTask.isBecomeMasterTask()) {
refreshDiscoveryNodeVersionAfterUpgrade(currentNodes, nodesBuilder);
} else if (joinTask.isFinishElectionTask()) {
//no-op
} else {
final DiscoveryNode joiningNode = joinTask.node();
final DiscoveryNode nodeWithSameId = nodesBuilder.get(joiningNode.getId());
Expand Down Expand Up @@ -254,6 +261,45 @@ protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState
return ClusterState.builder(allocationService.disassociateDeadNodes(tmpState, false, "removed dead nodes on election"));
}

private void refreshDiscoveryNodeVersionAfterUpgrade(DiscoveryNodes currentNodes, DiscoveryNodes.Builder nodesBuilder) {
// During the upgrade from Elasticsearch, OpenSearch node send their version as 7.10.2 to Elasticsearch master
// in order to successfully join the cluster. But as soon as OpenSearch node becomes the master, cluster state
// should show the OpenSearch nodes version as 1.x. As the cluster state was carry forwarded from ES master,
// version in DiscoveryNode is stale 7.10.2. As soon as OpenSearch node becomes master, it can refresh the
// DiscoveryNodes version and publish the updated state while finishing the election. This helps in atomically
// updating the version of those node which have connection with the new master.
// Note: This should get deprecated with BWC mode logic
if(null == transportService) {
// this logic is only applicable when OpenSearch node is master and is noop for zen discovery node
return;
}
if(currentNodes.getMinNodeVersion().before(Version.V_1_0_0)) {
Map<String, Version> channelVersions = transportService.getChannelVersion(currentNodes);
for (DiscoveryNode node : currentNodes) {
if(channelVersions.containsKey(node.getId())) {
if (channelVersions.get(node.getId()) != node.getVersion()) {
DiscoveryNode tmpNode = nodesBuilder.get(node.getId());
nodesBuilder.remove(node.getId());
nodesBuilder.add(new DiscoveryNode(tmpNode.getName(), tmpNode.getId(), tmpNode.getEphemeralId(),
tmpNode.getHostName(), tmpNode.getHostAddress(), tmpNode.getAddress(), tmpNode.getAttributes(),
tmpNode.getRoles(), channelVersions.get(tmpNode.getId())));
logger.info("Refreshed the DiscoveryNode version for node {}:{} from {} to {}",
node.getId(), node.getAddress(), node.getVersion(), channelVersions.get(tmpNode.getId()));
}
} else {
// in case existing OpenSearch node is present in the cluster and but there is no connection to that node yet,
// either that node will send new JoinRequest to the master with version >=1.0, then no issue or
// there is an edge case if doesn't send JoinRequest and connection is established,
// then it can continue to report version as 7.10.2 instead of actual OpenSearch version. So,
// removing the node from cluster state to prevent stale version reporting and let it reconnect.
if (node.getVersion().equals(LegacyESVersion.V_7_10_2)) {
nodesBuilder.remove(node.getId());
}
}
}
}
}

@Override
public boolean runOnlyOnMaster() {
// we validate that we are allowed to change the cluster state during cluster state processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public void writeTo(StreamOutput out) throws IOException {
}
}
}
if (out.getVersion().before(Version.V_1_0_0)) {
if (out.getVersion().before(Version.V_1_0_0) && version.onOrAfter(Version.V_1_0_0)) {
Version.writeVersion(LegacyESVersion.V_7_10_2, out);
} else {
Version.writeVersion(version, out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.cluster.AbstractDiffable;
import org.opensearch.cluster.Diff;
Expand Down Expand Up @@ -226,6 +227,16 @@ public boolean nodeExistsWithSameRoles(DiscoveryNode discoveryNode) {
return existing != null && existing.equals(discoveryNode) && existing.getRoles().equals(discoveryNode.getRoles());
}

/**
* Determine if the given node exists and has the right version. During upgrade from Elasticsearch version as OpenSearch node run in
* BWC mode and can have the version as 7.10.2 in cluster state from older master to OpenSearch master.
*/
public boolean nodeExistsWithBWCVersion(DiscoveryNode discoveryNode) {
final DiscoveryNode existing = nodes.get(discoveryNode.getId());
return existing != null && existing.equals(discoveryNode) &&
existing.getVersion().equals(LegacyESVersion.V_7_10_2) && discoveryNode.getVersion().onOrAfter(Version.V_1_0_0);
}

/**
* Get the id of the master node
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class NodeJoinController {
public NodeJoinController(Settings settings, MasterService masterService, AllocationService allocationService,
ElectMasterService electMaster, RerouteService rerouteService) {
this.masterService = masterService;
joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, rerouteService) {
joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, rerouteService, null) {
@Override
public void clusterStatePublished(ClusterChangedEvent event) {
electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.client.transport.TransportClient;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable;
import org.opensearch.common.Strings;
import org.opensearch.common.component.AbstractLifecycleComponent;
Expand Down Expand Up @@ -74,6 +75,7 @@
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -705,6 +707,18 @@ public Transport.Connection getConnection(DiscoveryNode node) {
}
}

public Map<String, Version> getChannelVersion(DiscoveryNodes nodes) {
Map<String, Version> nodeChannelVersions = new HashMap<>(nodes.getSize());
for (DiscoveryNode node: nodes) {
try {
nodeChannelVersions.putIfAbsent(node.getId(), connectionManager.getConnection(node).getVersion());
} catch (Exception e) {
// ignore in case node is not connected
}
}
return nodeChannelVersions;
}

public final <T extends TransportResponse> void sendChildRequest(final DiscoveryNode node, final String action,
final TransportRequest request, final Task parentTask,
final TransportRequestOptions options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;
import org.opensearch.transport.TransportService;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import static org.mockito.Matchers.anyBoolean;
import static org.opensearch.test.VersionUtils.getPreviousVersion;
import static org.opensearch.test.VersionUtils.incompatibleFutureVersion;
import static org.opensearch.test.VersionUtils.maxCompatibleVersion;
Expand Down Expand Up @@ -173,7 +177,7 @@ public void testUpdatesNodeWithNewRoles() throws Exception {
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);

final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, rerouteService);
final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, rerouteService, null);

final DiscoveryNode masterNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);

Expand All @@ -196,4 +200,61 @@ public void testUpdatesNodeWithNewRoles() throws Exception {

assertThat(result.resultingState.getNodes().get(actualNode.getId()).getRoles(), equalTo(actualNode.getRoles()));
}

public void testUpdatesNodeWithOpenSearchVersionForExistingAndNewNodes() throws Exception {
// During the upgrade from Elasticsearch, OpenSearch node send their version as 7.10.2 to Elasticsearch master
// in order to successfully join the cluster. But as soon as OpenSearch node becomes the master, cluster state
// should show the OpenSearch nodes version as 1.x. As the cluster state was carry forwarded from ES master,
// version in DiscoveryNode is stale 7.10.2.
final AllocationService allocationService = mock(AllocationService.class);
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
when(allocationService.disassociateDeadNodes(any(), anyBoolean(), any())).then(
invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
Map<String, Version> channelVersions = new HashMap<>();
String node_1 = UUIDs.base64UUID(); // OpenSearch node running BWC version
String node_2 = UUIDs.base64UUID(); // OpenSearch node running BWC version
String node_3 = UUIDs.base64UUID(); // OpenSearch node running BWC version, sending new join request and no active channel
String node_4 = UUIDs.base64UUID(); // ES node 7.10.2
String node_5 = UUIDs.base64UUID(); // ES node 7.10.2 in cluster but missing channel version
String node_6 = UUIDs.base64UUID(); // ES node 7.9.0
String node_7 = UUIDs.base64UUID(); // ES node 7.9.0 in cluster but missing channel version
channelVersions.put(node_1, LegacyESVersion.CURRENT);
channelVersions.put(node_2, LegacyESVersion.CURRENT);
channelVersions.put(node_4, LegacyESVersion.V_7_10_2);
channelVersions.put(node_6, LegacyESVersion.V_7_9_0);

final TransportService transportService = mock(TransportService.class);
when(transportService.getChannelVersion(any())).thenReturn(channelVersions);
DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().localNodeId(node_1);
nodes.add(new DiscoveryNode(node_1, buildNewFakeTransportAddress(), LegacyESVersion.V_7_10_2));
nodes.add(new DiscoveryNode(node_2, buildNewFakeTransportAddress(), LegacyESVersion.V_7_10_2));
nodes.add(new DiscoveryNode(node_3, buildNewFakeTransportAddress(), LegacyESVersion.V_7_10_2));
nodes.add(new DiscoveryNode(node_4, buildNewFakeTransportAddress(), LegacyESVersion.V_7_10_2));
nodes.add(new DiscoveryNode(node_5, buildNewFakeTransportAddress(), LegacyESVersion.V_7_10_2));
nodes.add(new DiscoveryNode(node_6, buildNewFakeTransportAddress(), LegacyESVersion.V_7_9_0));
nodes.add(new DiscoveryNode(node_7, buildNewFakeTransportAddress(), LegacyESVersion.V_7_9_0));
final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).build();
final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger,
rerouteService, transportService);
final DiscoveryNode existing_node_3 = clusterState.nodes().get(node_3);
final DiscoveryNode node_3_new_join = new DiscoveryNode(existing_node_3.getName(), existing_node_3.getId(),
existing_node_3.getEphemeralId(), existing_node_3.getHostName(), existing_node_3.getHostAddress(),
existing_node_3.getAddress(), existing_node_3.getAttributes(), existing_node_3.getRoles(), Version.CURRENT);

final ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> result
= joinTaskExecutor.execute(clusterState, List.of(new JoinTaskExecutor.Task(node_3_new_join, "test"),
JoinTaskExecutor.newBecomeMasterTask(), JoinTaskExecutor.newFinishElectionTask()));
final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next();
assertTrue(taskResult.isSuccess());
DiscoveryNodes resultNodes = result.resultingState.getNodes();
assertEquals(resultNodes.get(node_1).getVersion(), Version.CURRENT);
assertEquals(resultNodes.get(node_2).getVersion(), Version.CURRENT);
assertEquals(resultNodes.get(node_3).getVersion(), Version.CURRENT); // 7.10.2 in old state but sent new join and processed
assertEquals(resultNodes.get(node_4).getVersion(), LegacyESVersion.V_7_10_2);
assertFalse(resultNodes.nodeExists(node_5)); // 7.10.2 node without active channel will be removed and should rejoin
assertEquals(resultNodes.get(node_6).getVersion(), LegacyESVersion.V_7_9_0);
// 7.9.0 node without active channel but shouldn't get removed
assertEquals(resultNodes.get(node_7).getVersion(), LegacyESVersion.V_7_9_0);
}
}
Loading

0 comments on commit 8082604

Please sign in to comment.