Skip to content

Commit

Permalink
zen disco: support for a node to act as a client (and not become mast…
Browse files Browse the repository at this point in the history
…er) using discovery.zen.master setting (default to true). It will automatically be set to false when node.client is set to true.
  • Loading branch information
kimchy committed Apr 25, 2010
1 parent 4ab298c commit 265e2fb
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,45 @@
package org.elasticsearch.cluster.node;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.lucene.util.StringHelper;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.transport.TransportAddress;
import org.elasticsearch.util.transport.TransportAddressSerializers;

import java.io.IOException;
import java.io.Serializable;
import java.util.Map;

import static org.elasticsearch.util.transport.TransportAddressSerializers.*;

/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class DiscoveryNode implements Streamable, Serializable {

public static Map<String, String> buildCommonNodesAttributes(Settings settings) {
Map<String, String> attributes = Maps.newHashMap(settings.getByPrefix("node.").getAsMap());
if (attributes.containsKey("client")) {
if (attributes.get("client").equals("false")) {
attributes.remove("client"); // this is the default
} else {
// if we are client node, don't store data ...
attributes.put("data", "false");
}
}
if (attributes.containsKey("data")) {
if (attributes.get("data").equals("true")) {
attributes.remove("data");
}
}
return attributes;
}

public static final ImmutableList<DiscoveryNode> EMPTY_LIST = ImmutableList.of();

private String nodeName = StringHelper.intern("");
Expand All @@ -43,22 +67,26 @@ public class DiscoveryNode implements Streamable, Serializable {

private TransportAddress address;

private boolean dataNode = true;
private ImmutableMap<String, String> attributes;

private DiscoveryNode() {
}

public DiscoveryNode(String nodeId, TransportAddress address) {
this("", true, nodeId, address);
this("", nodeId, address, ImmutableMap.<String, String>of());
}

public DiscoveryNode(String nodeName, boolean dataNode, String nodeId, TransportAddress address) {
public DiscoveryNode(String nodeName, String nodeId, TransportAddress address, Map<String, String> attributes) {
if (nodeName == null) {
this.nodeName = StringHelper.intern("");
} else {
this.nodeName = StringHelper.intern(nodeName);
}
this.dataNode = dataNode;
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
for (Map.Entry<String, String> entry : attributes.entrySet()) {
builder.put(StringHelper.intern(entry.getKey()), StringHelper.intern(entry.getValue()));
}
this.attributes = builder.build();
this.nodeId = StringHelper.intern(nodeId);
this.address = address;
}
Expand Down Expand Up @@ -105,11 +133,26 @@ public String getName() {
return name();
}

/**
* The node attributes.
*/
public ImmutableMap<String, String> attributes() {
return this.attributes;
}

/**
* The node attributes.
*/
public ImmutableMap<String, String> getAttributes() {
return attributes();
}

/**
* Should this node hold data (shards) or not.
*/
public boolean dataNode() {
return dataNode;
String data = attributes.get("data");
return data == null || data.equals("true");
}

/**
Expand All @@ -119,6 +162,18 @@ public boolean isDataNode() {
return dataNode();
}

/**
* Is the node a client node or not.
*/
public boolean clientNode() {
String client = attributes.get("client");
return client != null && client.equals("true");
}

public boolean isClientNode() {
return clientNode();
}

public static DiscoveryNode readNode(StreamInput in) throws IOException {
DiscoveryNode node = new DiscoveryNode();
node.readFrom(in);
Expand All @@ -127,16 +182,25 @@ public static DiscoveryNode readNode(StreamInput in) throws IOException {

@Override public void readFrom(StreamInput in) throws IOException {
nodeName = StringHelper.intern(in.readUTF());
dataNode = in.readBoolean();
nodeId = StringHelper.intern(in.readUTF());
address = TransportAddressSerializers.addressFromStream(in);
int size = in.readVInt();
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
for (int i = 0; i < size; i++) {
builder.put(StringHelper.intern(in.readUTF()), StringHelper.intern(in.readUTF()));
}
attributes = builder.build();
}

@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(nodeName);
out.writeBoolean(dataNode);
out.writeUTF(nodeId);
TransportAddressSerializers.addressToStream(out, address);
addressToStream(out, address);
out.writeVInt(attributes.size());
for (Map.Entry<String, String> entry : attributes.entrySet()) {
out.writeUTF(entry.getKey());
out.writeUTF(entry.getValue());
}
}

@Override public boolean equals(Object obj) {
Expand All @@ -159,12 +223,12 @@ public static DiscoveryNode readNode(StreamInput in) throws IOException {
if (nodeId != null) {
sb.append('[').append(nodeId).append(']');
}
if (dataNode) {
sb.append("[data]");
}
if (address != null) {
sb.append('[').append(address).append(']');
}
if (!attributes.isEmpty()) {
sb.append(attributes);
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static com.google.common.collect.Maps.*;
import static com.google.common.collect.Sets.*;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.node.DiscoveryNode.*;

/**
* @author kimchy (Shay Banon)
Expand Down Expand Up @@ -142,11 +143,11 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent<Discovery> impl
channel.connect(clusterName.value());
channel.setReceiver(this);
logger.debug("Connected to cluster [{}], address [{}]", channel.getClusterName(), channel.getAddress());
this.localNode = new DiscoveryNode(settings.get("name"), settings.getAsBoolean("node.data", !settings.getAsBoolean("node.client", false)), channel.getAddress().toString(), transportService.boundAddress().publishAddress());
this.localNode = new DiscoveryNode(settings.get("name"), channel.getAddress().toString(), transportService.boundAddress().publishAddress(), buildCommonNodesAttributes(settings));

if (isMaster()) {
firstMaster = true;
clusterService.submitStateUpdateTask("jgroups-disco-initialconnect(master)", new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("jgroups-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
.localNodeId(localNode.id())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import static com.google.common.collect.Sets.*;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.node.DiscoveryNode.*;

/**
* @author kimchy (Shay Banon)
Expand Down Expand Up @@ -84,14 +85,14 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
clusterGroups.put(clusterName, clusterGroup);
}
logger.debug("Connected to cluster [{}]", clusterName);
this.localNode = new DiscoveryNode(settings.get("name"), settings.getAsBoolean("node.data", !settings.getAsBoolean("node.client", false)), Long.toString(nodeIdGenerator.incrementAndGet()), transportService.boundAddress().publishAddress());
this.localNode = new DiscoveryNode(settings.get("name"), Long.toString(nodeIdGenerator.incrementAndGet()), transportService.boundAddress().publishAddress(), buildCommonNodesAttributes(settings));

clusterGroup.members().add(this);
if (clusterGroup.members().size() == 1) {
// we are the first master (and the master)
master = true;
firstMaster = true;
clusterService.submitStateUpdateTask("local-disco-initialconnect(master)", new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
.localNodeId(localNode.id())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@
import org.elasticsearch.util.settings.Settings;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.google.common.collect.Lists.*;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.node.DiscoveryNode.*;
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.util.TimeValue.*;

Expand All @@ -55,6 +57,8 @@
*/
public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, DiscoveryNodesProvider {

private final ThreadPool threadPool;

private final TransportService transportService;

private final ClusterService clusterService;
Expand Down Expand Up @@ -94,6 +98,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
ZenPingService pingService) {
super(settings);
this.clusterName = clusterName;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.transportService = transportService;
this.pingService = pingService;
Expand All @@ -114,57 +119,29 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}

@Override protected void doStart() throws ElasticSearchException {
localNode = new DiscoveryNode(settings.get("name"), settings.getAsBoolean("node.data", !settings.getAsBoolean("node.client", false)), UUID.randomUUID().toString(), transportService.boundAddress().publishAddress());
Map<String, String> nodeAttributes = buildCommonNodesAttributes(settings);
Boolean zenMaster = componentSettings.getAsBoolean("master", null);
if (zenMaster != null) {
if (zenMaster.equals(Boolean.FALSE)) {
nodeAttributes.put("zen.master", "false");
}
} else if (nodeAttributes.containsKey("client")) {
if (nodeAttributes.get("client").equals("true")) {
nodeAttributes.put("zen.master", "false");
}
}
localNode = new DiscoveryNode(settings.get("name"), UUID.randomUUID().toString(), transportService.boundAddress().publishAddress(), nodeAttributes);
pingService.start();

boolean retry = true;
while (retry) {
retry = false;
DiscoveryNode masterNode = broadBingTillMasterResolved();
if (localNode.equals(masterNode)) {
// we are the master (first)
this.firstMaster = true;
this.master = true;
nodesFD.start(); // start the nodes FD
clusterService.submitStateUpdateTask("zen-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
.localNodeId(localNode.id())
.masterNodeId(localNode.id())
// put our local node
.put(localNode);
// update the fact that we are the master...
latestDiscoNodes = builder.build();
return newClusterStateBuilder().state(currentState).nodes(builder).build();
}

@Override public void clusterStateProcessed(ClusterState clusterState) {
sendInitialStateEventIfNeeded();
}
});
} else {
this.firstMaster = false;
this.master = false;
try {
// first, make sure we can connect to the master
transportService.connectToNode(masterNode);
} catch (Exception e) {
logger.warn("Failed to connect to master [{}], retrying...", e, masterNode);
retry = true;
continue;
}
// send join request
try {
membership.sendJoinRequestBlocking(masterNode, localNode, initialPingTimeout);
} catch (Exception e) {
logger.warn("Failed to send join request to master [{}], retrying...", e, masterNode);
// failed to send the join request, retry
retry = true;
continue;
if (nodeAttributes.containsKey("zen.master") && nodeAttributes.get("zen.master").equals("false")) {
// do the join on a different thread
threadPool.execute(new Runnable() {
@Override public void run() {
initialJoin();
}
// cool, we found a master, start an FD on it
masterFD.start(masterNode);
}
});
} else {
initialJoin();
}
}

Expand Down Expand Up @@ -239,6 +216,63 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
publishClusterState.publish(clusterState);
}

private void initialJoin() {
boolean retry = true;
while (retry) {
retry = false;
DiscoveryNode masterNode = broadPingTillMasterResolved();
if (localNode.equals(masterNode)) {
// we are the master (first)
this.firstMaster = true;
this.master = true;
nodesFD.start(); // start the nodes FD
clusterService.submitStateUpdateTask("zen-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
.localNodeId(localNode.id())
.masterNodeId(localNode.id())
// put our local node
.put(localNode);
// update the fact that we are the master...
latestDiscoNodes = builder.build();
return newClusterStateBuilder().state(currentState).nodes(builder).build();
}

@Override public void clusterStateProcessed(ClusterState clusterState) {
sendInitialStateEventIfNeeded();
}
});
} else {
this.firstMaster = false;
this.master = false;
try {
// first, make sure we can connect to the master
transportService.connectToNode(masterNode);
} catch (Exception e) {
logger.warn("Failed to connect to master [{}], retrying...", e, masterNode);
retry = true;
continue;
}
// send join request
try {
membership.sendJoinRequestBlocking(masterNode, localNode, initialPingTimeout);
} catch (Exception e) {
logger.warn("Failed to send join request to master [{}], retrying...", e, masterNode);
// failed to send the join request, retry
retry = true;
continue;
}
// cool, we found a master, start an FD on it
masterFD.start(masterNode);
}
if (retry) {
if (!lifecycle.started()) {
return;
}
}
}
}

private void handleNodeFailure(final DiscoveryNode node) {
if (!master) {
// nothing to do here...
Expand Down Expand Up @@ -365,7 +399,7 @@ private void handleJoinRequest(final DiscoveryNode node) {
}
}

private DiscoveryNode broadBingTillMasterResolved() {
private DiscoveryNode broadPingTillMasterResolved() {
while (true) {
ZenPing.PingResponse[] pingResponses = pingService.pingAndWait(initialPingTimeout);
List<DiscoveryNode> pingMasters = newArrayList();
Expand Down
Loading

0 comments on commit 265e2fb

Please sign in to comment.