Skip to content

Commit 9496aa1

Browse files
author
Rahul Karajgikar
committed
Add custom connect to node for handleJoinRequest + info logs + comments
Signed-off-by: Rahul Karajgikar <karajgik@amazon.com>
1 parent 9194b7a commit 9496aa1

File tree

16 files changed

+606
-42
lines changed

16 files changed

+606
-42
lines changed
Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/*
10+
* Licensed to Elasticsearch under one or more contributor
11+
* license agreements. See the NOTICE file distributed with
12+
* this work for additional information regarding copyright
13+
* ownership. Elasticsearch licenses this file to you under
14+
* the Apache License, Version 2.0 (the "License"); you may
15+
* not use this file except in compliance with the License.
16+
* You may obtain a copy of the License at
17+
*
18+
* http://www.apache.org/licenses/LICENSE-2.0
19+
*
20+
* Unless required by applicable law or agreed to in writing,
21+
* software distributed under the License is distributed on an
22+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
23+
* KIND, either express or implied. See the License for the
24+
* specific language governing permissions and limitations
25+
* under the License.
26+
*/
27+
28+
/*
29+
* Modifications Copyright OpenSearch Contributors. See
30+
* GitHub history for details.
31+
*/
32+
33+
package org.opensearch.cluster.coordination;
34+
35+
import java.util.Arrays;
36+
import java.util.Collection;
37+
import java.util.concurrent.atomic.AtomicBoolean;
38+
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
39+
import org.opensearch.cluster.ClusterChangedEvent;
40+
import org.opensearch.cluster.ClusterStateApplier;
41+
import org.opensearch.cluster.NodeConnectionsService;
42+
import org.opensearch.cluster.metadata.IndexMetadata;
43+
import org.opensearch.cluster.node.DiscoveryNode;
44+
import org.opensearch.cluster.service.ClusterService;
45+
import org.opensearch.common.settings.Settings;
46+
import org.opensearch.index.MockEngineFactoryPlugin;
47+
import org.opensearch.indices.recovery.RecoverySettings;
48+
import org.opensearch.plugins.Plugin;
49+
import org.opensearch.tasks.Task;
50+
import org.opensearch.test.InternalSettingsPlugin;
51+
import org.opensearch.test.OpenSearchIntegTestCase;
52+
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
53+
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
54+
import org.opensearch.test.store.MockFSIndexStore;
55+
import org.opensearch.test.transport.MockTransportService;
56+
import org.opensearch.test.transport.StubbableTransport;
57+
import org.opensearch.transport.*;
58+
59+
import static org.hamcrest.Matchers.is;
60+
import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_ACTION_NAME;
61+
62+
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
63+
public class NodeJoinLeftIT extends OpenSearchIntegTestCase {
64+
65+
private static final String INDEX_NAME = "test-idx-1";
66+
private static final String REPO_NAME = "test-repo-1";
67+
private static final String SNAP_NAME = "test-snap-1";
68+
69+
private static final int MIN_DOC_COUNT = 500;
70+
private static final int MAX_DOC_COUNT = 1000;
71+
private static final int SHARD_COUNT = 1;
72+
private static final int REPLICA_COUNT = 0;
73+
74+
@Override
75+
protected Collection<Class<? extends Plugin>> nodePlugins() {
76+
return Arrays.asList(
77+
MockTransportService.TestPlugin.class,
78+
MockFSIndexStore.TestPlugin.class,
79+
InternalSettingsPlugin.class,
80+
MockEngineFactoryPlugin.class
81+
);
82+
}
83+
84+
@Override
85+
protected void beforeIndexDeletion() throws Exception {
86+
super.beforeIndexDeletion();
87+
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
88+
internalCluster().assertSeqNos();
89+
internalCluster().assertSameDocIdsOnShards();
90+
}
91+
92+
public void testTransientErrorsDuringRecovery1AreRetried() throws Exception {
93+
final String indexName = "test";
94+
final Settings nodeSettings = Settings.builder()
95+
.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "100ms")
96+
.put(NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.getKey(), "10s")
97+
.put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "200ms")
98+
.put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "100ms")
99+
.put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
100+
.build();
101+
// start a cluster-manager node
102+
final String cm =internalCluster().startNode(nodeSettings);
103+
104+
System.out.println("--> spawning node t1");
105+
final String blueNodeName = internalCluster().startNode(
106+
Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()
107+
);
108+
System.out.println("--> spawning node t2");
109+
final String redNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build());
110+
111+
System.out.println("--> initial health check");
112+
ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get();
113+
assertThat(response.isTimedOut(), is(false));
114+
System.out.println("--> done initial health check");
115+
116+
System.out.println("--> creating index");
117+
client().admin()
118+
.indices()
119+
.prepareCreate(indexName)
120+
.setSettings(
121+
Settings.builder()
122+
.put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "color", "blue")
123+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
124+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
125+
)
126+
.get();
127+
System.out.println("--> done creating index");
128+
MockTransportService cmTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, cm);
129+
MockTransportService redTransportService =
130+
(MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName);
131+
132+
ClusterService cmClsService = internalCluster().getInstance(ClusterService.class, cm);
133+
// simulate a slow applier on the cm
134+
cmClsService.addStateApplier(new ClusterStateApplier() {
135+
@Override
136+
public void applyClusterState(ClusterChangedEvent event) {
137+
if (event.nodesRemoved()) {
138+
try {
139+
Thread.sleep(3000);
140+
} catch (InterruptedException e) {
141+
throw new RuntimeException(e);
142+
}
143+
}
144+
}
145+
});
146+
cmTransportService.connectionManager().addListener(new TransportConnectionListener() {
147+
148+
@Override
149+
public void onConnectionOpened(Transport.Connection connection) {
150+
// try {
151+
// Thread.sleep(500);
152+
// } catch (InterruptedException e) {
153+
// throw new RuntimeException(e);
154+
// }
155+
156+
}
157+
158+
@Override
159+
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
160+
// if (node.getName().equals("node_t2")) {
161+
// try {
162+
// Thread.sleep(250);
163+
// } catch (InterruptedException e) {
164+
// throw new RuntimeException(e);
165+
// }
166+
// }
167+
}
168+
169+
// @Override
170+
// public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
171+
// try {
172+
// Thread.sleep(5000);
173+
// } catch (InterruptedException e) {
174+
// throw new RuntimeException(e);
175+
// }
176+
// }
177+
});
178+
AtomicBoolean bb = new AtomicBoolean();
179+
// simulate followerchecker failure
180+
181+
ConnectionDelay handlingBehavior = new ConnectionDelay(
182+
FOLLOWER_CHECK_ACTION_NAME,
183+
()->{
184+
if (bb.get()) {
185+
return;
186+
}
187+
try {
188+
Thread.sleep(10);
189+
} catch (InterruptedException e) {
190+
throw new RuntimeException(e);
191+
}
192+
throw new NodeHealthCheckFailureException("non writable exception");
193+
});
194+
redTransportService.addRequestHandlingBehavior(FOLLOWER_CHECK_ACTION_NAME, handlingBehavior);
195+
196+
197+
// for (int i=0 ;i < 1; i++) {
198+
// //cmTransportService.disconnectFromNode(redTransportService.getLocalDiscoNode());
199+
// System.out.println("--> follower check, iteration: " + i);
200+
// bb.set(true); // pass followerchecker
201+
// System.out.println("--> setting bb to true, sleeping for 1500ms, iteration: " + i);
202+
// Thread.sleep(1500);
203+
// bb.set(false); // fail followerchecker
204+
// System.out.println("--> setting bb to false, iteration: " + i);
205+
// System.out.println("--> checking cluster health 2 nodes, iteration: " + i);
206+
// ClusterHealthResponse response1 = client().admin().cluster().prepareHealth().setWaitForNodes("2").get();
207+
// assertThat(response1.isTimedOut(), is(false));
208+
// System.out.println("--> completed checking cluster health 2 nodes, iteration: " + i);
209+
// //internalCluster().stopRandomNode(InternalTestCluster.nameFilter(blueNodeName));
210+
// System.out.println("--> checking cluster health 3 nodes, iteration: " + i);
211+
// ClusterHealthResponse response2 = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
212+
// assertThat(response2.isTimedOut(), is(false));
213+
// System.out.println("--> completed checking cluster health 3 nodes, iteration: " + i);
214+
// }
215+
// for (int i=0 ;i < 1; i++) {
216+
//
217+
// bb.set(true); // pass followerchecker
218+
//
219+
// Thread.sleep(1500);
220+
// System.out.println("--> manually disconnecting node, iteration: " + i);
221+
// cmTransportService.disconnectFromNode(redTransportService.getLocalDiscoNode());
222+
// }
223+
224+
// FAILS WITHOUT CODE CHANGES
225+
for (int i=0 ; i < 10; i++) {
226+
bb.set(false); // fail followerchecker by force to trigger node disconnect
227+
System.out.println("--> disconnecting from red node, iteration: " + i);
228+
// cmTransportService.disconnectFromNode(redTransportService.getLocalDiscoNode());
229+
// now followerchecker should fail and trigger node left
230+
System.out.println("--> checking cluster health 2 nodes, iteration: " + i);
231+
ClusterHealthResponse response1 = client().admin().cluster().prepareHealth().setWaitForNodes("2").get();
232+
assertThat(response1.isTimedOut(), is(false));
233+
System.out.println("--> completed checking cluster health 2 nodes, iteration: " + i);
234+
235+
// once we know a node has left, we can re-enable followerchecker to work normally
236+
bb.set(true);
237+
Thread.sleep(1500);
238+
System.out.println("--> checking cluster health 3 nodes, iteration: " + i);
239+
ClusterHealthResponse response2 = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
240+
assertThat(response2.isTimedOut(), is(false));
241+
System.out.println("--> completed checking cluster health 3 nodes, iteration: " + i);
242+
243+
Thread.sleep(1500);
244+
245+
// Checking again
246+
System.out.println("--> checking cluster health 3 nodes again, iteration: " + i);
247+
ClusterHealthResponse response3 = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
248+
assertThat(response3.isTimedOut(), is(false));
249+
System.out.println("--> completed checking cluster health 3 nodes again, iteration: " + i);
250+
}
251+
252+
bb.set(true);
253+
System.out.println("-->first validation outside loop");
254+
response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
255+
assertThat(response.isTimedOut(), is(false));
256+
257+
System.out.println("-->sleeping for 20s");
258+
Thread.sleep(20000);
259+
260+
System.out.println("-->second validation outside loop after sleep");
261+
response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
262+
assertThat(response.isTimedOut(), is(false));
263+
}
264+
private class ConnectionDelay implements StubbableTransport.RequestHandlingBehavior<TransportRequest> {
265+
266+
private final String actionName;
267+
private final Runnable connectionBreaker;
268+
269+
private ConnectionDelay(
270+
String actionName,
271+
Runnable connectionBreaker
272+
) {
273+
this.actionName = actionName;
274+
this.connectionBreaker = connectionBreaker;
275+
}
276+
277+
@Override
278+
public void messageReceived(
279+
TransportRequestHandler<TransportRequest> handler,
280+
TransportRequest request,
281+
TransportChannel channel,
282+
Task task
283+
) throws Exception {
284+
285+
286+
connectionBreaker.run();
287+
handler.messageReceived(request, channel, task);
288+
}
289+
}
290+
}

server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,33 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
172172
}
173173

174174
for (final DiscoveryNode discoveryNode : nodesToDisconnect) {
175+
logger.info("NodeConnectionsService - disconnecting from node [{}] in loop", discoveryNode);
176+
runnables.add(targetsByNode.get(discoveryNode).disconnect());
177+
}
178+
}
179+
runnables.forEach(Runnable::run);
180+
}
181+
182+
public void markPendingJoinsAsComplete(List<DiscoveryNode> nodesConnected) {
183+
for (final DiscoveryNode discoveryNode: nodesConnected) {
184+
transportService.markPendingJoinAsCompleted(discoveryNode);
185+
}
186+
}
187+
public void disconnectFromNonBlockedNodesExcept(DiscoveryNodes discoveryNodes, DiscoveryNodes.Delta nodesDelta) {
188+
final List<Runnable> runnables = new ArrayList<>();
189+
synchronized (mutex) {
190+
final Set<DiscoveryNode> nodesToDisconnect = new HashSet<>(targetsByNode.keySet());
191+
for (final DiscoveryNode discoveryNode : discoveryNodes) {
192+
nodesToDisconnect.remove(discoveryNode);
193+
}
194+
195+
for (final DiscoveryNode discoveryNode : nodesToDisconnect) {
196+
// if node is trying to be disconnected (node-left) and pendingjoin , skip disconnect and then remove the blocking
197+
if (transportService.getNodesJoinInProgress().contains(discoveryNode)) {
198+
logger.info("Skipping disconnection for node [{}] as it has a join in progress", discoveryNode);
199+
continue;
200+
}
201+
logger.info("NodeConnectionsService - disconnecting from node [{}] in loop", discoveryNode);
175202
runnables.add(targetsByNode.get(discoveryNode).disconnect());
176203
}
177204
}
@@ -388,9 +415,10 @@ public String toString() {
388415
@Override
389416
protected void doRun() {
390417
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
418+
logger.info("disconnecting from {}", discoveryNode);
391419
transportService.disconnectFromNode(discoveryNode);
392420
consecutiveFailureCount.set(0);
393-
logger.debug("disconnected from {}", discoveryNode);
421+
logger.info("disconnected from {}", discoveryNode);
394422
onCompletion(ActivityType.DISCONNECTING, null, connectActivity);
395423
}
396424

@@ -419,6 +447,7 @@ Runnable connect(@Nullable ActionListener<Void> listener) {
419447
}
420448

421449
Runnable disconnect() {
450+
logger.info("running runnable disconnect");
422451
return addListenerAndStartActivity(
423452
null,
424453
ActivityType.DISCONNECTING,

0 commit comments

Comments
 (0)