Skip to content

Commit

Permalink
Remove DiscoveryService and reduce guice to just Discovery elastic#16821
Browse files Browse the repository at this point in the history


DiscoveryService was a bridge into the discovery universe. This is unneeded and we can just access discovery directly or do things in a different way.

One of those different ways, is not having a dedicated discovery implementation for each our dicovery plugins but rather reuse ZenDiscovery.

UnicastHostProviders are now classified by discovery type, removing unneeded checks on plugins.

Closes elastic#16821
  • Loading branch information
bleskes committed Feb 29, 2016
1 parent 26863a4 commit 195b43d
Show file tree
Hide file tree
Showing 37 changed files with 254 additions and 543 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
*/
void removeInitialStateBlock(ClusterBlock block) throws IllegalStateException;

/**
* Remove an initial block to be set on the first cluster state created.
*/
void removeInitialStateBlock(int blockId) throws IllegalStateException;

/**
* The operation routing.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.discovery.local.LocalDiscovery;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;

Expand All @@ -70,7 +69,7 @@
* and cluster state {@link #status}, which is updated during cluster state publishing and applying
* processing. The cluster state can be updated only on the master node. All updates are performed by on a
* single thread and controlled by the {@link InternalClusterService}. After every update the
* {@link DiscoveryService#publish} method publishes new version of the cluster state to all other nodes in the
* {@link Discovery#publish} method publishes new version of the cluster state to all other nodes in the
* cluster. The actual publishing mechanism is delegated to the {@link Discovery#publish} method and depends on
* the type of discovery. For example, for local discovery it is implemented by the {@link LocalDiscovery#publish}
* method. In the Zen Discovery it is handled in the {@link PublishClusterStateAction#publish} method. The
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,12 @@ public Builder removeGlobalBlock(ClusterBlock block) {
return this;
}

public Builder removeGlobalBlock(int blockId) {
global.removeIf(block -> block.id() == blockId);
return this;
}


public Builder addIndexBlock(String index, ClusterBlock block) {
if (!indices.containsKey(index)) {
indices.put(index, new HashSet<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -64,7 +65,6 @@
import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -76,14 +76,17 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
Expand All @@ -97,9 +100,12 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
public static final Setting<TimeValue> CLUSTER_SERVICE_RECONNECT_INTERVAL_SETTING = Setting.positiveTimeSetting("cluster.service.reconnect_interval", TimeValue.timeValueSeconds(10), false, Setting.Scope.CLUSTER);

public static final String UPDATE_THREAD_NAME = "clusterService#updateTask";
public static final Setting<Long> NODE_ID_SEED_SETTING =
// don't use node.id.seed so it won't be seen as an attribute
Setting.longSetting("node_id.seed", 0L, Long.MIN_VALUE, false, Setting.Scope.CLUSTER);
private final ThreadPool threadPool;

private final DiscoveryService discoveryService;
private BiConsumer<ClusterChangedEvent, Discovery.AckListener> clusterStatePublisher;

private final OperationRouting operationRouting;

Expand Down Expand Up @@ -139,12 +145,11 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private volatile ScheduledFuture reconnectToNodes;

@Inject
public InternalClusterService(Settings settings, DiscoveryService discoveryService, OperationRouting operationRouting, TransportService transportService,
public InternalClusterService(Settings settings, OperationRouting operationRouting, TransportService transportService,
ClusterSettings clusterSettings, ThreadPool threadPool, ClusterName clusterName, DiscoveryNodeService discoveryNodeService, Version version) {
super(settings);
this.operationRouting = operationRouting;
this.transportService = transportService;
this.discoveryService = discoveryService;
this.threadPool = threadPool;
this.clusterSettings = clusterSettings;
this.discoveryNodeService = discoveryNodeService;
Expand All @@ -161,7 +166,7 @@ public InternalClusterService(Settings settings, DiscoveryService discoveryServi

localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);

initialBlocks = ClusterBlocks.builder().addGlobalBlock(discoveryService.getNoMasterBlock());
initialBlocks = ClusterBlocks.builder();

taskManager = transportService.getTaskManager();
}
Expand All @@ -170,6 +175,10 @@ private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
this.slowTaskLoggingThreshold = slowTaskLoggingThreshold;
}

public void setClusterStatePublisher(BiConsumer<ClusterChangedEvent, Discovery.AckListener> publisher) {
clusterStatePublisher = publisher;
}

@Override
public void addInitialStateBlock(ClusterBlock block) throws IllegalStateException {
if (lifecycle.started()) {
Expand All @@ -180,22 +189,28 @@ public void addInitialStateBlock(ClusterBlock block) throws IllegalStateExceptio

@Override
public void removeInitialStateBlock(ClusterBlock block) throws IllegalStateException {
removeInitialStateBlock(block.id());
}

@Override
public void removeInitialStateBlock(int blockId) throws IllegalStateException {
if (lifecycle.started()) {
throw new IllegalStateException("can't set initial block when started");
}
initialBlocks.removeGlobalBlock(block);
initialBlocks.removeGlobalBlock(blockId);
}

@Override
protected void doStart() {
Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting");
add(localNodeMasterListeners);
add(taskManager);
this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build();
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME, daemonThreadFactory(settings, UPDATE_THREAD_NAME), threadPool.getThreadContext());
this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ReconnectToNodes());
Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();
// note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling
final String nodeId = DiscoveryService.generateNodeId(settings);
final String nodeId = generateNodeId(settings);
final TransportAddress publishAddress = transportService.boundAddress().publishAddress();
DiscoveryNode localNode = new DiscoveryNode(settings.get("node.name"), nodeId, publishAddress, nodeAttributes, version);
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder().put(localNode).localNodeId(localNode.id());
Expand Down Expand Up @@ -572,7 +587,7 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
if (newClusterState.nodes().localNodeMaster()) {
logger.debug("publishing cluster state version [{}]", newClusterState.version());
try {
discoveryService.publish(clusterChangedEvent, ackListener);
clusterStatePublisher.accept(clusterChangedEvent, ackListener);
} catch (Discovery.FailedToCommitClusterStateException t) {
logger.warn("failing [{}]: failed to commit cluster state version [{}]", t, source, newClusterState.version());
proccessedListeners.forEach(task -> task.listener.onFailure(task.source, t));
Expand Down Expand Up @@ -853,6 +868,11 @@ public void run() {
}
}

public static String generateNodeId(Settings settings) {
Random random = Randomness.get(settings, NODE_ID_SEED_SETTING);
return Strings.randomBase64UUID(random);
}

private boolean nodeRequiresConnection(DiscoveryNode node) {
return localNode().shouldConnectTo(node);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* setting a reproducible seed. When running the Elasticsearch server
* process, non-reproducible sources of randomness are provided (unless
* a setting is provided for a module that exposes a seed setting (e.g.,
* DiscoveryService#DISCOVERY_SEED_SETTING)).
* DiscoveryService#NODE_ID_SEED_SETTING)).
*/
public final class Randomness {
private static final Method currentMethod;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
Expand All @@ -62,11 +61,11 @@
import org.elasticsearch.http.netty.NettyHttpServerTransport;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.IndicesRequestCache;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.HunspellService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.IndicesRequestCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
Expand Down Expand Up @@ -324,8 +323,8 @@ public void apply(Settings value, Settings current, Settings previous) {
Environment.PATH_SCRIPTS_SETTING,
Environment.PATH_SHARED_DATA_SETTING,
Environment.PIDFILE_SETTING,
DiscoveryService.DISCOVERY_SEED_SETTING,
DiscoveryService.INITIAL_STATE_TIMEOUT_SETTING,
InternalClusterService.NODE_ID_SEED_SETTING,
DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING,
DiscoveryModule.DISCOVERY_TYPE_SETTING,
DiscoveryModule.ZEN_MASTER_SERVICE_TYPE_SETTING,
FaultDetection.PING_RETRIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ public interface Discovery extends LifecycleComponent<Discovery> {

DiscoveryNode localNode();

void addListener(InitialStateDiscoveryListener listener);

void removeListener(InitialStateDiscoveryListener listener);

String nodeDescription();

/**
Expand Down Expand Up @@ -93,13 +89,13 @@ public FailedToCommitClusterStateException(String msg, Throwable cause, Object..
*/
DiscoveryStats stats();

DiscoverySettings getDiscoverySettings();

/**
* Triggers the first join cycle
*/
void startInitialJoin();


/***
* @return the current value of minimum master nodes, or -1 for not set
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -50,7 +51,7 @@ public class DiscoveryModule extends AbstractModule {
"zen", Function.identity(), false, Setting.Scope.CLUSTER);

private final Settings settings;
private final List<Class<? extends UnicastHostsProvider>> unicastHostProviders = new ArrayList<>();
private final Map<String, List<Class<? extends UnicastHostsProvider>>> unicastHostProviders = new HashMap<>();
private final ExtensionPoint.ClassSet<ZenPing> zenPings = new ExtensionPoint.ClassSet<>("zen_ping", ZenPing.class);
private final Map<String, Class<? extends Discovery>> discoveryTypes = new HashMap<>();
private final Map<String, Class<? extends ElectMasterService>> masterServiceType = new HashMap<>();
Expand All @@ -66,9 +67,17 @@ public DiscoveryModule(Settings settings) {

/**
* Adds a custom unicast hosts provider to build a dynamic list of unicast hosts list when doing unicast discovery.
*
* @param type discovery for which this provider is relevant
* @param unicastHostProvider the host provider
*/
public void addUnicastHostProvider(Class<? extends UnicastHostsProvider> unicastHostProvider) {
unicastHostProviders.add(unicastHostProvider);
public void addUnicastHostProvider(String type, Class<? extends UnicastHostsProvider> unicastHostProvider) {
List<Class<? extends UnicastHostsProvider>> providerList = unicastHostProviders.get(type);
if (providerList == null) {
providerList = new ArrayList<>();
unicastHostProviders.put(type, providerList);
}
providerList.add(unicastHostProvider);
}

/**
Expand Down Expand Up @@ -116,12 +125,12 @@ protected void configure() {
}
bind(ZenPingService.class).asEagerSingleton();
Multibinder<UnicastHostsProvider> unicastHostsProviderMultibinder = Multibinder.newSetBinder(binder(), UnicastHostsProvider.class);
for (Class<? extends UnicastHostsProvider> unicastHostProvider : unicastHostProviders) {
for (Class<? extends UnicastHostsProvider> unicastHostProvider :
unicastHostProviders.getOrDefault(discoveryType, Collections.emptyList())) {
unicastHostsProviderMultibinder.addBinding().to(unicastHostProvider);
}
zenPings.bind(binder());
}
bind(Discovery.class).to(discoveryClass).asEagerSingleton();
bind(DiscoveryService.class).asEagerSingleton();
}
}
Loading

0 comments on commit 195b43d

Please sign in to comment.