Skip to content

Introduce zen2 discovery type #36298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -58,14 +61,19 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.node.Node.NODE_NAME_SETTING;

/**
* A module for loading classes for node discovery.
*/
public class DiscoveryModule {
private static final Logger logger = LogManager.getLogger(DiscoveryModule.class);

public static final String ZEN_DISCOVERY_TYPE = "zen";
public static final String ZEN2_DISCOVERY_TYPE = "zen2";

public static final Setting<String> DISCOVERY_TYPE_SETTING =
new Setting<>("discovery.type", "zen", Function.identity(), Property.NodeScope);
new Setting<>("discovery.type", ZEN_DISCOVERY_TYPE, Function.identity(), Property.NodeScope);
public static final Setting<List<String>> DISCOVERY_HOSTS_PROVIDER_SETTING =
Setting.listSetting("discovery.zen.hosts_provider", Collections.emptyList(), Function.identity(), Property.NodeScope);

Expand All @@ -75,14 +83,14 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService,
ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState) {
final Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators = new ArrayList<>();
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
final Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
hostProviders.put("settings", () -> new SettingsBasedHostsProvider(settings, transportService));
hostProviders.put("file", () -> new FileBasedUnicastHostsProvider(configFile));
for (DiscoveryPlugin plugin : plugins) {
plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> {
if (hostProviders.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Cannot register zen hosts provider [" + entry.getKey() + "] twice");
plugin.getZenHostsProviders(transportService, networkService).forEach((key, value) -> {
if (hostProviders.put(key, value) != null) {
throw new IllegalArgumentException("Cannot register zen hosts provider [" + key + "] twice");
}
});
BiConsumer<DiscoveryNode, ClusterState> joinValidator = plugin.getJoinValidator();
Expand Down Expand Up @@ -117,18 +125,21 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
};

Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
discoveryTypes.put("zen",
discoveryTypes.put(ZEN_DISCOVERY_TYPE,
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, hostsProvider, allocationService, Collections.unmodifiableCollection(joinValidators), gatewayMetaState));
discoveryTypes.put(ZEN2_DISCOVERY_TYPE, () -> new Coordinator(NODE_NAME_SETTING.get(settings), settings, clusterSettings,
transportService, namedWriteableRegistry, allocationService, masterService,
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), hostsProvider, clusterApplier,
Randomness.get()));
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier));
for (DiscoveryPlugin plugin : plugins) {
plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry,
masterService, clusterApplier, clusterSettings, hostsProvider, allocationService, gatewayMetaState).entrySet()
.forEach(entry -> {
if (discoveryTypes.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Cannot register discovery type [" + entry.getKey() + "] twice");
}
});
plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, clusterSettings,
hostsProvider, allocationService, gatewayMetaState).forEach((key, value) -> {
if (discoveryTypes.put(key, value) != null) {
throw new IllegalArgumentException("Cannot register discovery type [" + key + "] twice");
}
});
}
String discoveryType = DISCOVERY_TYPE_SETTING.get(settings);
Supplier<Discovery> discoverySupplier = discoveryTypes.get(discoveryType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.CoordinationState;
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
Expand Down Expand Up @@ -108,6 +111,17 @@ public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateSer
incrementalWrite = false;
}

public PersistedState getPersistedState(Settings settings, ClusterApplierService clusterApplierService) {
applyClusterStateUpdaters();
if (DiscoveryNode.isMasterNode(settings) == false) {
// use Zen1 way of writing cluster state for non-master-eligible nodes
// this avoids concurrent manipulating of IndexMetadata with IndicesStore
clusterApplierService.addLowPriorityApplier(this);
return new InMemoryPersistedState(getCurrentTerm(), getLastAcceptedState());
}
return this;
}

private void initializeClusterState(ClusterName clusterName) throws IOException {
long startNS = System.nanoTime();
Tuple<Manifest, MetaData> manifestAndMetaData = metaStateService.loadFullState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.elasticsearch.discovery.DiscoveryModule.ZEN2_DISCOVERY_TYPE;
import static org.elasticsearch.discovery.DiscoveryModule.ZEN_DISCOVERY_TYPE;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
Expand Down Expand Up @@ -101,7 +103,7 @@ public void testEnforceLimitsWhenBoundToNonLocalAddress() {
when(boundTransportAddress.boundAddresses()).thenReturn(transportAddresses.toArray(new TransportAddress[0]));
when(boundTransportAddress.publishAddress()).thenReturn(publishAddress);

final String discoveryType = randomFrom("zen", "single-node");
final String discoveryType = randomFrom(ZEN_DISCOVERY_TYPE, ZEN2_DISCOVERY_TYPE, "single-node");

assertEquals(BootstrapChecks.enforceLimits(boundTransportAddress, discoveryType), !"single-node".equals(discoveryType));
}
Expand All @@ -119,7 +121,7 @@ public void testEnforceLimitsWhenPublishingToNonLocalAddress() {
when(boundTransportAddress.boundAddresses()).thenReturn(transportAddresses.toArray(new TransportAddress[0]));
when(boundTransportAddress.publishAddress()).thenReturn(publishAddress);

final String discoveryType = randomFrom("zen", "single-node");
final String discoveryType = randomFrom(ZEN_DISCOVERY_TYPE, ZEN2_DISCOVERY_TYPE, "single-node");

assertEquals(BootstrapChecks.enforceLimits(boundTransportAddress, discoveryType), !"single-node".equals(discoveryType));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@

package org.elasticsearch.test.discovery;

import org.elasticsearch.cluster.coordination.CoordinationState;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplierService;
Expand Down Expand Up @@ -81,20 +78,10 @@ public Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool,
Settings fixedSettings = Settings.builder().put(settings).putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()).build();
return Collections.singletonMap("test-zen", () -> {
if (USE_ZEN2.get(settings)) {
Supplier<CoordinationState.PersistedState> persistedStateSupplier = () -> {
gatewayMetaState.applyClusterStateUpdaters();
if (DiscoveryNode.isMasterNode(settings) == false) {
// use Zen1 way of writing cluster state for non-master-eligible nodes
// this avoids concurrent manipulating of IndexMetadata with IndicesStore
((ClusterApplierService) clusterApplier).addLowPriorityApplier(gatewayMetaState);
return new InMemoryPersistedState(gatewayMetaState.getCurrentTerm(), gatewayMetaState.getLastAcceptedState());
}
return gatewayMetaState;
};

return new Coordinator("test_node", fixedSettings, clusterSettings, transportService, namedWriteableRegistry,
allocationService, masterService, persistedStateSupplier, hostsProvider, clusterApplier,
new Random(Randomness.get().nextLong()));
allocationService, masterService,
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), hostsProvider,
clusterApplier, new Random(Randomness.get().nextLong()));
} else {
return new TestZenDiscovery(fixedSettings, threadPool, transportService, namedWriteableRegistry, masterService,
clusterApplier, clusterSettings, hostsProvider, allocationService, gatewayMetaState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_FORMAT_SETTING;
import static org.elasticsearch.discovery.DiscoveryModule.ZEN2_DISCOVERY_TYPE;
import static org.elasticsearch.discovery.DiscoveryModule.ZEN_DISCOVERY_TYPE;
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.SECURITY_INDEX_NAME;
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.INTERNAL_INDEX_FORMAT;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -281,7 +283,7 @@ public void testTLSJoinValidator() throws Exception {
int numIters = randomIntBetween(1, 10);
for (int i = 0; i < numIters; i++) {
boolean tlsOn = randomBoolean();
String discoveryType = randomFrom("single-node", "zen", randomAlphaOfLength(4));
String discoveryType = randomFrom("single-node", ZEN_DISCOVERY_TYPE, ZEN2_DISCOVERY_TYPE, randomAlphaOfLength(4));
Security.ValidateTLSOnJoin validator = new Security.ValidateTLSOnJoin(tlsOn, discoveryType);
MetaData.Builder builder = MetaData.builder();
License license = TestUtils.generateSignedLicense(TimeValue.timeValueHours(24));
Expand Down