Skip to content

Commit

Permalink
Merge branch 'fix_ip_replace' into HEAD
Browse files Browse the repository at this point in the history
  • Loading branch information
Vincent Royer committed Jun 5, 2018
2 parents b47d9d8 + 99e8072 commit 4ed93b0
Showing 1 changed file with 23 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.elassandra.discovery;

import com.google.common.collect.Maps;
import com.google.common.net.InetAddresses;

import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
Expand Down Expand Up @@ -53,6 +52,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
Expand Down Expand Up @@ -128,7 +128,7 @@ public CassandraDiscovery(Settings settings, TransportService transportService,
this.clusterName = clusterService.getClusterName();

this.localAddress = FBUtilities.getBroadcastAddress();
this.localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(localAddress);
this.localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());

this.clusterGroup = new ClusterGroup();
}
Expand Down Expand Up @@ -542,7 +542,7 @@ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValu
switch (state) {
case STATUS:
if (logger.isTraceEnabled())
logger.trace("onChange Endpoint={} ApplicationState={} value={}", endpoint, state, versionValue);
logger.trace("Endpoint={} ApplicationState={} value={}", endpoint, state, versionValue);
if (isNormal(epState)) {
updateNode(endpoint, epState, DiscoveryNodeStatus.ALIVE);
} else {
Expand All @@ -568,6 +568,17 @@ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValu
logger.warn("Failed to parse gossip index shard state", e);
}
break;

case RPC_ADDRESS: // manage ip rpc_address replacement from a remote node
if (logger.isTraceEnabled())
logger.trace("Endpoint={} ApplicationState={} value={}", endpoint, state, versionValue);
InetAddress newRpcAddress = InetAddresses.forString(versionValue.value);
String hostId = epState.getApplicationState(ApplicationState.HOST_ID).value;
if (clusterGroup.needUpdate(hostId, newRpcAddress)) {
logger.info("Node HOST_ID={} change RPC_ADDRESS={}", hostId, newRpcAddress);
updateNode(endpoint, epState, DiscoveryNodeStatus.ALIVE);
}
break;
}
}

Expand Down Expand Up @@ -628,7 +639,7 @@ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValu
@Override
public void onAlive(InetAddress endpoint, EndpointState epState) {
if (isMember(endpoint)) {
logger.debug("onAlive Endpoint={} ApplicationState={} isAlive={} => update node + connecting", endpoint, epState, epState.isAlive());
logger.debug("Endpoint={} ApplicationState={} isAlive={} => update node + connecting", endpoint, epState, epState.isAlive());
if (isNormal(epState))
updateNode(endpoint, epState, DiscoveryNodeStatus.ALIVE);
}
Expand All @@ -637,7 +648,7 @@ public void onAlive(InetAddress endpoint, EndpointState epState) {
@Override
public void onDead(InetAddress endpoint, EndpointState epState) {
if (isMember(endpoint)) {
logger.debug("onDead Endpoint={} ApplicationState={} isAlive={} => update node + disconnecting", endpoint, epState, epState.isAlive());
logger.debug("Endpoint={} ApplicationState={} isAlive={} => update node + disconnecting", endpoint, epState, epState.isAlive());
if (this.metaDataVersionAckListener.get() != null) {
notifyMetaDataVersionAckListener(Gossiper.instance.getEndpointStateForEndpoint(endpoint));
}
Expand All @@ -649,7 +660,7 @@ public void onDead(InetAddress endpoint, EndpointState epState) {
public void onRestart(InetAddress endpoint, EndpointState epState) {
if (isMember(endpoint)) {
if (logger.isTraceEnabled())
logger.debug("onRestart Endpoint={} ApplicationState={} isAlive={} status={}", endpoint, epState, epState.isAlive());
logger.debug("Endpoint={} ApplicationState={} isAlive={} status={}", endpoint, epState, epState.isAlive());
if (isNormal(epState))
updateNode(endpoint, epState, DiscoveryNodeStatus.ALIVE);
}
Expand All @@ -659,7 +670,7 @@ public void onRestart(InetAddress endpoint, EndpointState epState) {
public void onJoin(InetAddress endpoint, EndpointState epState) {
if (isLocal(endpoint)) {
if (logger.isTraceEnabled())
logger.trace("onJoin Endpoint={} ApplicationState={} isAlive={} status={}", endpoint, epState, epState.isAlive(), epState.getStatus() );
logger.trace("Endpoint={} ApplicationState={} isAlive={} status={}", endpoint, epState, epState.isAlive(), epState.getStatus() );
if (isNormal(epState))
updateNode(endpoint, epState, DiscoveryNodeStatus.ALIVE);
}
Expand Down Expand Up @@ -822,6 +833,11 @@ public boolean contains(String id) {
return members.containsKey(id);
}

public boolean needUpdate(String id, InetAddress rpcAddess) {
DiscoveryNode n = members.get(id);
return n != null && n.getInetAddress().equals(rpcAddess);
}

public Collection<DiscoveryNode> values() {
return members.values();
}
Expand Down

0 comments on commit 4ed93b0

Please sign in to comment.