Skip to content

Commit 5e4a0e7

Browse files
committed
remove zen discovery
1 parent 756dc42 commit 5e4a0e7

35 files changed

+12
-8682
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
2424
import org.apache.logging.log4j.message.ParameterizedMessage;
25-
import org.elasticsearch.cluster.ClusterName;
26-
import org.elasticsearch.cluster.ClusterState;
2725
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
2826
import org.elasticsearch.cluster.node.DiscoveryNode;
2927
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -33,7 +31,6 @@
3331
import org.elasticsearch.common.settings.Settings;
3432
import org.elasticsearch.common.unit.TimeValue;
3533
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
36-
import org.elasticsearch.discovery.zen.NodesFaultDetection;
3734
import org.elasticsearch.threadpool.ThreadPool.Names;
3835
import org.elasticsearch.transport.ConnectTransportException;
3936
import org.elasticsearch.transport.TransportChannel;
@@ -114,10 +111,6 @@ public FollowersChecker(Settings settings, TransportService transportService,
114111
updateFastResponseState(0, Mode.CANDIDATE);
115112
transportService.registerRequestHandler(FOLLOWER_CHECK_ACTION_NAME, Names.SAME, false, false, FollowerCheckRequest::new,
116113
(request, transportChannel, task) -> handleFollowerCheck(request, transportChannel));
117-
transportService.registerRequestHandler(
118-
NodesFaultDetection.PING_ACTION_NAME, NodesFaultDetection.PingRequest::new, Names.SAME, false, false,
119-
(request, channel, task) -> // TODO: check that we're a follower of the requesting node?
120-
channel.sendResponse(new NodesFaultDetection.PingResponse()));
121114
transportService.addConnectionListener(new TransportConnectionListener() {
122115
@Override
123116
public void onNodeDisconnected(DiscoveryNode node) {
@@ -304,17 +297,7 @@ private void handleWakeUp() {
304297
final FollowerCheckRequest request = new FollowerCheckRequest(fastResponseState.term, transportService.getLocalNode());
305298
logger.trace("handleWakeUp: checking {} with {}", discoveryNode, request);
306299

307-
final String actionName;
308-
final TransportRequest transportRequest;
309-
if (Coordinator.isZen1Node(discoveryNode)) {
310-
actionName = NodesFaultDetection.PING_ACTION_NAME;
311-
transportRequest = new NodesFaultDetection.PingRequest(discoveryNode, ClusterName.CLUSTER_NAME_SETTING.get(settings),
312-
transportService.getLocalNode(), ClusterState.UNKNOWN_VERSION);
313-
} else {
314-
actionName = FOLLOWER_CHECK_ACTION_NAME;
315-
transportRequest = request;
316-
}
317-
transportService.sendRequest(discoveryNode, actionName, transportRequest,
300+
transportService.sendRequest(discoveryNode, FOLLOWER_CHECK_ACTION_NAME, request,
318301
TransportRequestOptions.builder().withTimeout(followerCheckTimeout).withType(Type.PING).build(),
319302
new TransportResponseHandler<Empty>() {
320303
@Override

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@
3737
import org.elasticsearch.common.settings.Settings;
3838
import org.elasticsearch.common.unit.TimeValue;
3939
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
40-
import org.elasticsearch.discovery.zen.MembershipAction;
41-
import org.elasticsearch.discovery.zen.ZenDiscovery;
4240
import org.elasticsearch.threadpool.ThreadPool;
4341
import org.elasticsearch.threadpool.ThreadPool.Names;
4442
import org.elasticsearch.transport.EmptyTransportResponseHandler;
@@ -117,11 +115,6 @@ public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentSta
117115
transportService.registerRequestHandler(JOIN_ACTION_NAME, ThreadPool.Names.GENERIC, false, false, JoinRequest::new,
118116
(request, channel, task) -> joinHandler.accept(request, transportJoinCallback(request, channel)));
119117

120-
transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_ACTION_NAME, MembershipAction.JoinRequest::new,
121-
ThreadPool.Names.GENERIC, false, false,
122-
(request, channel, task) -> joinHandler.accept(new JoinRequest(request.getNode(), Optional.empty()), // treat as non-voting join
123-
transportJoinCallback(request, channel)));
124-
125118
transportService.registerRequestHandler(START_JOIN_ACTION_NAME, Names.GENERIC, false, false,
126119
StartJoinRequest::new,
127120
(request, channel, task) -> {
@@ -143,21 +136,6 @@ public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentSta
143136
joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState()));
144137
channel.sendResponse(Empty.INSTANCE);
145138
});
146-
147-
transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
148-
ValidateJoinRequest::new, ThreadPool.Names.GENERIC,
149-
(request, channel, task) -> {
150-
joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState()));
151-
channel.sendResponse(Empty.INSTANCE);
152-
});
153-
154-
transportService.registerRequestHandler(
155-
ZenDiscovery.DISCOVERY_REJOIN_ACTION_NAME, ZenDiscovery.RejoinClusterRequest::new, ThreadPool.Names.SAME,
156-
(request, channel, task) -> channel.sendResponse(Empty.INSTANCE)); // TODO: do we need to implement anything here?
157-
158-
transportService.registerRequestHandler(
159-
MembershipAction.DISCOVERY_LEAVE_ACTION_NAME, MembershipAction.LeaveRequest::new, ThreadPool.Names.SAME,
160-
(request, channel, task) -> channel.sendResponse(Empty.INSTANCE)); // TODO: do we need to implement anything here?
161139
}
162140

163141
private JoinCallback transportJoinCallback(TransportRequest request, TransportChannel channel) {
@@ -200,16 +178,7 @@ public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJo
200178
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
201179
if (pendingOutgoingJoins.add(dedupKey)) {
202180
logger.debug("attempting to join {} with {}", destination, joinRequest);
203-
final String actionName;
204-
final TransportRequest transportRequest;
205-
if (Coordinator.isZen1Node(destination)) {
206-
actionName = MembershipAction.DISCOVERY_JOIN_ACTION_NAME;
207-
transportRequest = new MembershipAction.JoinRequest(transportService.getLocalNode());
208-
} else {
209-
actionName = JOIN_ACTION_NAME;
210-
transportRequest = joinRequest;
211-
}
212-
transportService.sendRequest(destination, actionName, transportRequest,
181+
transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest,
213182
TransportRequestOptions.builder().withTimeout(joinTimeout).build(),
214183
new TransportResponseHandler<Empty>() {
215184
@Override
@@ -269,13 +238,7 @@ public String executor() {
269238
}
270239

271240
public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, ActionListener<TransportResponse.Empty> listener) {
272-
final String actionName;
273-
if (Coordinator.isZen1Node(node)) {
274-
actionName = MembershipAction.DISCOVERY_JOIN_VALIDATE_ACTION_NAME;
275-
} else {
276-
actionName = VALIDATE_JOIN_ACTION_NAME;
277-
}
278-
transportService.sendRequest(node, actionName,
241+
transportService.sendRequest(node, VALIDATE_JOIN_ACTION_NAME,
279242
new ValidateJoinRequest(state),
280243
TransportRequestOptions.builder().withTimeout(joinTimeout).build(),
281244
new EmptyTransportResponseHandler(ThreadPool.Names.GENERIC) {

server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.elasticsearch.cluster.node.DiscoveryNodes;
3131
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3232
import org.elasticsearch.common.settings.Settings;
33-
import org.elasticsearch.discovery.zen.ElectMasterService;
3433
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
3534

3635
import java.util.ArrayList;
@@ -47,8 +46,6 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
4746

4847
private final Logger logger;
4948

50-
private final int minimumMasterNodesOnLocalNode;
51-
5249
public static class Task {
5350

5451
private final DiscoveryNode node;
@@ -87,7 +84,6 @@ public boolean isFinishElectionTask() {
8784
public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger) {
8885
this.allocationService = allocationService;
8986
this.logger = logger;
90-
minimumMasterNodesOnLocalNode = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
9187
}
9288

9389
@Override
@@ -191,7 +187,6 @@ protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState
191187
ClusterState tmpState = ClusterState.builder(currentState).nodes(nodesBuilder).blocks(ClusterBlocks.builder()
192188
.blocks(currentState.blocks())
193189
.removeGlobalBlock(NoMasterBlockService.NO_MASTER_BLOCK_ID))
194-
.minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnLocalNode)
195190
.build();
196191
logger.trace("becomeMasterAndTrimConflictingNodes: {}", tmpState.nodes());
197192
allocationService.cleanCaches();

server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
2424
import org.apache.logging.log4j.message.ParameterizedMessage;
25-
import org.elasticsearch.cluster.ClusterName;
2625
import org.elasticsearch.cluster.node.DiscoveryNode;
2726
import org.elasticsearch.cluster.node.DiscoveryNodes;
2827
import org.elasticsearch.common.Nullable;
@@ -32,7 +31,6 @@
3231
import org.elasticsearch.common.settings.Setting;
3332
import org.elasticsearch.common.settings.Settings;
3433
import org.elasticsearch.common.unit.TimeValue;
35-
import org.elasticsearch.discovery.zen.MasterFaultDetection;
3634
import org.elasticsearch.threadpool.ThreadPool.Names;
3735
import org.elasticsearch.transport.ConnectTransportException;
3836
import org.elasticsearch.transport.TransportConnectionListener;
@@ -103,16 +101,6 @@ public LeaderChecker(final Settings settings, final TransportService transportSe
103101
channel.sendResponse(Empty.INSTANCE);
104102
});
105103

106-
transportService.registerRequestHandler(MasterFaultDetection.MASTER_PING_ACTION_NAME, MasterFaultDetection.MasterPingRequest::new,
107-
Names.SAME, false, false, (request, channel, task) -> {
108-
try {
109-
handleLeaderCheck(new LeaderCheckRequest(request.sourceNode));
110-
} catch (CoordinationStateRejectedException e) {
111-
throw new MasterFaultDetection.ThisIsNotTheMasterYouAreLookingForException(e.getMessage());
112-
}
113-
channel.sendResponse(new MasterFaultDetection.MasterPingResponseResponse());
114-
});
115-
116104
transportService.addConnectionListener(new TransportConnectionListener() {
117105
@Override
118106
public void onNodeDisconnected(DiscoveryNode node) {
@@ -217,21 +205,7 @@ void handleWakeUp() {
217205

218206
logger.trace("checking {} with [{}] = {}", leader, LEADER_CHECK_TIMEOUT_SETTING.getKey(), leaderCheckTimeout);
219207

220-
final String actionName;
221-
final TransportRequest transportRequest;
222-
if (Coordinator.isZen1Node(leader)) {
223-
actionName = MasterFaultDetection.MASTER_PING_ACTION_NAME;
224-
transportRequest = new MasterFaultDetection.MasterPingRequest(
225-
transportService.getLocalNode(), leader, ClusterName.CLUSTER_NAME_SETTING.get(settings));
226-
} else {
227-
actionName = LEADER_CHECK_ACTION_NAME;
228-
transportRequest = new LeaderCheckRequest(transportService.getLocalNode());
229-
}
230-
// TODO lag detection:
231-
// In the PoC, the leader sent its current version to the follower in the response to a LeaderCheck, so the follower
232-
// could detect if it was lagging. We'd prefer this to be implemented on the leader, so the response is just
233-
// TransportResponse.Empty here.
234-
transportService.sendRequest(leader, actionName, transportRequest,
208+
transportService.sendRequest(leader, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(transportService.getLocalNode()),
235209
TransportRequestOptions.builder().withTimeout(leaderCheckTimeout).withType(Type.PING).build(),
236210

237211
new TransportResponseHandler<TransportResponse.Empty>() {

server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,10 @@
4040
import org.elasticsearch.common.io.stream.StreamOutput;
4141
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4242
import org.elasticsearch.core.internal.io.IOUtils;
43-
import org.elasticsearch.discovery.zen.PublishClusterStateAction;
4443
import org.elasticsearch.threadpool.ThreadPool;
4544
import org.elasticsearch.transport.BytesTransportRequest;
4645
import org.elasticsearch.transport.TransportChannel;
4746
import org.elasticsearch.transport.TransportException;
48-
import org.elasticsearch.transport.TransportRequest;
4947
import org.elasticsearch.transport.TransportRequestOptions;
5048
import org.elasticsearch.transport.TransportResponse;
5149
import org.elasticsearch.transport.TransportResponseHandler;
@@ -54,7 +52,6 @@
5452
import java.io.IOException;
5553
import java.util.HashMap;
5654
import java.util.Map;
57-
import java.util.Optional;
5855
import java.util.concurrent.atomic.AtomicLong;
5956
import java.util.concurrent.atomic.AtomicReference;
6057
import java.util.function.BiConsumer;
@@ -194,16 +191,7 @@ public String toString() {
194191
@Override
195192
public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest,
196193
ActionListener<TransportResponse.Empty> responseActionListener) {
197-
final String actionName;
198-
final TransportRequest transportRequest;
199-
if (Coordinator.isZen1Node(destination)) {
200-
actionName = PublishClusterStateAction.COMMIT_ACTION_NAME;
201-
transportRequest = new PublishClusterStateAction.CommitClusterStateRequest(newState.stateUUID());
202-
} else {
203-
actionName = COMMIT_STATE_ACTION_NAME;
204-
transportRequest = applyCommitRequest;
205-
}
206-
transportService.sendRequest(destination, actionName, transportRequest, stateRequestOptions,
194+
transportService.sendRequest(destination, COMMIT_STATE_ACTION_NAME, applyCommitRequest, stateRequestOptions,
207195
new TransportResponseHandler<TransportResponse.Empty>() {
208196

209197
@Override
@@ -267,19 +255,7 @@ public String executor() {
267255
return ThreadPool.Names.GENERIC;
268256
}
269257
};
270-
final String actionName;
271-
final TransportResponseHandler<?> transportResponseHandler;
272-
if (Coordinator.isZen1Node(node)) {
273-
actionName = PublishClusterStateAction.SEND_ACTION_NAME;
274-
transportResponseHandler = publishWithJoinResponseHandler.wrap(empty -> new PublishWithJoinResponse(
275-
new PublishResponse(clusterState.term(), clusterState.version()),
276-
Optional.of(new Join(node, transportService.getLocalNode(), clusterState.term(), clusterState.term(),
277-
clusterState.version()))), in -> TransportResponse.Empty.INSTANCE);
278-
} else {
279-
actionName = PUBLISH_STATE_ACTION_NAME;
280-
transportResponseHandler = publishWithJoinResponseHandler;
281-
}
282-
transportService.sendRequest(node, actionName, request, stateRequestOptions, transportResponseHandler);
258+
transportService.sendRequest(node, PUBLISH_STATE_ACTION_NAME, request, stateRequestOptions, publishWithJoinResponseHandler);
283259
} catch (Exception e) {
284260
logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e);
285261
responseActionListener.onFailure(e);

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,6 @@
6868
import org.elasticsearch.discovery.PeerFinder;
6969
import org.elasticsearch.discovery.SeedHostsResolver;
7070
import org.elasticsearch.discovery.SettingsBasedSeedHostsProvider;
71-
import org.elasticsearch.discovery.zen.ElectMasterService;
72-
import org.elasticsearch.discovery.zen.FaultDetection;
73-
import org.elasticsearch.discovery.zen.ZenDiscovery;
7471
import org.elasticsearch.env.Environment;
7572
import org.elasticsearch.env.NodeEnvironment;
7673
import org.elasticsearch.gateway.GatewayService;
@@ -291,7 +288,6 @@ public void apply(Settings value, Settings current, Settings previous) {
291288
ClusterService.USER_DEFINED_META_DATA,
292289
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
293290
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
294-
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
295291
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
296292
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
297293
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS,
@@ -390,20 +386,6 @@ public void apply(Settings value, Settings current, Settings previous) {
390386
DiscoveryModule.DISCOVERY_TYPE_SETTING,
391387
DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING,
392388
DiscoveryModule.LEGACY_DISCOVERY_HOSTS_PROVIDER_SETTING,
393-
FaultDetection.PING_RETRIES_SETTING,
394-
FaultDetection.PING_TIMEOUT_SETTING,
395-
FaultDetection.REGISTER_CONNECTION_LISTENER_SETTING,
396-
FaultDetection.PING_INTERVAL_SETTING,
397-
FaultDetection.CONNECT_ON_NETWORK_DISCONNECT_SETTING,
398-
ZenDiscovery.PING_TIMEOUT_SETTING,
399-
ZenDiscovery.JOIN_TIMEOUT_SETTING,
400-
ZenDiscovery.JOIN_RETRY_ATTEMPTS_SETTING,
401-
ZenDiscovery.JOIN_RETRY_DELAY_SETTING,
402-
ZenDiscovery.MAX_PINGS_FROM_ANOTHER_MASTER_SETTING,
403-
ZenDiscovery.SEND_LEAVE_REQUEST_SETTING,
404-
ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING,
405-
ZenDiscovery.MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING,
406-
ZenDiscovery.MAX_PENDING_CLUSTER_STATES_SETTING,
407389
SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING,
408390
SettingsBasedSeedHostsProvider.LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
409391
SeedHostsResolver.DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING,

server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
package org.elasticsearch.discovery;
2121

22-
import org.apache.logging.log4j.Logger;
2322
import org.apache.logging.log4j.LogManager;
23+
import org.apache.logging.log4j.Logger;
2424
import org.elasticsearch.cluster.ClusterState;
2525
import org.elasticsearch.cluster.coordination.Coordinator;
2626
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -37,7 +37,6 @@
3737
import org.elasticsearch.common.settings.Settings;
3838
import org.elasticsearch.common.transport.TransportAddress;
3939
import org.elasticsearch.discovery.single.SingleNodeDiscovery;
40-
import org.elasticsearch.discovery.zen.ZenDiscovery;
4140
import org.elasticsearch.gateway.GatewayMetaState;
4241
import org.elasticsearch.plugins.DiscoveryPlugin;
4342
import org.elasticsearch.threadpool.ThreadPool;
@@ -128,9 +127,6 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
128127
};
129128

130129
Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
131-
discoveryTypes.put(ZEN_DISCOVERY_TYPE,
132-
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
133-
clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState));
134130
discoveryTypes.put(ZEN2_DISCOVERY_TYPE, () -> new Coordinator(NODE_NAME_SETTING.get(settings), settings, clusterSettings,
135131
transportService, namedWriteableRegistry, allocationService, masterService,
136132
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider, clusterApplier,

0 commit comments

Comments
 (0)