Skip to content

Commit

Permalink
CLOUD-974 JGroups 4 support
Browse files Browse the repository at this point in the history
fixes #18
  • Loading branch information
slaskawi committed Mar 14, 2017
1 parent dae1dd8 commit f0568c5
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 32 deletions.
45 changes: 42 additions & 3 deletions common/src/main/java/org/jgroups/ping/common/OpenshiftPing.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,24 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Message;
import org.jgroups.annotations.Property;
import org.jgroups.protocols.PING;
import org.jgroups.ping.common.server.Server;
import org.jgroups.ping.common.server.ServerFactory;
import org.jgroups.ping.common.server.Servers;
import org.jgroups.protocols.PING;
import org.openshift.ping.common.compatibility.CompatibilityException;
import org.openshift.ping.common.compatibility.CompatibilityUtils;

public abstract class OpenshiftPing extends PING {

Expand Down Expand Up @@ -63,9 +67,20 @@ public abstract class OpenshiftPing extends PING {
private Server _server;
private String _serverName;

private static Method sendMethod; //handled via reflection due to JGroups 3/4 incompatibility

public OpenshiftPing(String systemEnvPrefix) {
super();
_systemEnvPrefix = trimToNull(systemEnvPrefix);
try {
if(CompatibilityUtils.isJGroups4()) {
sendMethod = this.getClass().getMethod("up", Message.class);
} else {
sendMethod = this.getClass().getMethod("up", Event.class);
}
} catch (Exception e) {
throw new CompatibilityException("Could not find suitable 'up' method.", e);
}
}

protected final String getSystemEnvName(String systemEnvSuffix) {
Expand Down Expand Up @@ -180,25 +195,49 @@ protected void sendMcastDiscoveryRequest(Message msg) {
return;
}
if (msg.getSrc() == null) {
msg.setSrc(local_addr);
setMessageSourceAddress(msg);

}
for (InetSocketAddress node : nodes) {
// forward the request to each node
timer.execute(new SendDiscoveryRequest(node, msg));
}
}

private void setMessageSourceAddress(Message msg) {
//public void setSrc(Address new_src) {src_addr=new_src;} - JGroups 3.6.13
//public Message setSrc(Address new_src) {src_addr=new_src; return this;} - JGroups 4.0.1
//unfortunately need to use reflections
try {
msg.getClass().getMethod("setSrc", Address.class).invoke(msg, local_addr);
} catch (Exception e) {
log.warn("Setting local address for discovery failed.");
}
}

public void handlePingRequest(InputStream stream) throws Exception {
DataInputStream dataInput = new DataInputStream(stream);
Message msg = new Message();
msg.readFrom(dataInput);
try {
up(new Event(Event.MSG, msg));
sendUp(msg);
} catch (Exception e) {
log.error("Error processing GET_MBRS_REQ.", e);
}
}

private void sendUp(Message msg) {
try {
if(CompatibilityUtils.isJGroups4()) {
sendMethod.invoke(this, msg);
} else {
sendMethod.invoke(this, new Event(1, msg));
}
} catch (Exception e) {
throw new CompatibilityException("Could not invoke 'up' method.", e);
}
}

private List<InetSocketAddress> readAll() {
if (isClusteringEnabled()) {
return doReadAll(clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.HashMap;
import java.util.Map;

import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.jgroups.ping.common.OpenshiftPing;

Expand All @@ -31,13 +30,13 @@
public abstract class AbstractServer implements Server {

protected final int port;
protected final Map<String, Channel> CHANNELS = new HashMap<String, Channel>();
protected final Map<String, JChannel> CHANNELS = new HashMap<>();

protected AbstractServer(int port) {
this.port = port;
}

public final Channel getChannel(String clusterName) {
public final JChannel getChannel(String clusterName) {
if (clusterName != null) {
synchronized (CHANNELS) {
return CHANNELS.get(clusterName);
Expand All @@ -46,7 +45,7 @@ public final Channel getChannel(String clusterName) {
return null;
}

protected final void addChannel(Channel channel) {
protected final void addChannel(JChannel channel) {
String clusterName = getClusterName(channel);
if (clusterName != null) {
synchronized (CHANNELS) {
Expand All @@ -55,7 +54,7 @@ protected final void addChannel(Channel channel) {
}
}

protected final void removeChannel(Channel channel) {
protected final void removeChannel(JChannel channel) {
String clusterName = getClusterName(channel);
if (clusterName != null) {
synchronized (CHANNELS) {
Expand All @@ -70,11 +69,11 @@ protected final boolean hasChannels() {
}
}

private String getClusterName(final Channel channel) {
private String getClusterName(final JChannel channel) {
if (channel != null) {
String clusterName = channel.getClusterName();
// clusterName will be null if the Channel is not yet connected, but we still need it!
if (clusterName == null && channel instanceof JChannel) {
if (clusterName == null) {
try {
Field field = JChannel.class.getDeclaredField("cluster_name");
field.setAccessible(true);
Expand All @@ -85,7 +84,7 @@ private String getClusterName(final Channel channel) {
return null;
}

protected final void handlePingRequest(Channel channel, InputStream stream) throws Exception {
protected final void handlePingRequest(JChannel channel, InputStream stream) throws Exception {
if (channel != null) {
OpenshiftPing handler = (OpenshiftPing) channel.getProtocolStack().findProtocol(OpenshiftPing.class);
handler.handlePingRequest(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jgroups.Channel;
import org.jgroups.JChannel;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
Expand All @@ -38,7 +38,7 @@ public JDKServer(int port) {
super(port);
}

public synchronized boolean start(Channel channel) throws Exception {
public synchronized boolean start(JChannel channel) throws Exception {
boolean started = false;
if (server == null) {
try {
Expand All @@ -57,7 +57,7 @@ public synchronized boolean start(Channel channel) throws Exception {
return started;
}

public synchronized boolean stop(Channel channel) {
public synchronized boolean stop(JChannel channel) {
boolean stopped = false;
removeChannel(channel);
if (server != null && !hasChannels()) {
Expand All @@ -82,7 +82,7 @@ public void handle(HttpExchange exchange) throws IOException {
exchange.sendResponseHeaders(200, 0);
try {
String clusterName = exchange.getRequestHeaders().getFirst(CLUSTER_NAME);
Channel channel = server.getChannel(clusterName);
JChannel channel = server.getChannel(clusterName);
try (InputStream stream = exchange.getRequestBody()) {
handlePingRequest(channel, stream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package org.jgroups.ping.common.server;

import org.jgroups.Channel;
import org.jgroups.JChannel;

/**
* @author <a href="mailto:ales.justin@jboss.org">Ales Justin</a>
*/
public interface Server {
public static final String CLUSTER_NAME = "CLUSTER_NAME";
public boolean start(Channel channel) throws Exception;
public boolean stop(Channel channel);
public Channel getChannel(String clusterName);
public boolean start(JChannel channel) throws Exception;
public boolean stop(JChannel channel);
public JChannel getChannel(String clusterName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package org.jgroups.ping.common.server;

import java.io.InputStream;

import org.jgroups.JChannel;

import io.undertow.Undertow;
import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;

import java.io.InputStream;

import org.jgroups.Channel;

/**
* @author <a href="mailto:ales.justin@jboss.org">Ales Justin</a>
*/
Expand All @@ -34,7 +34,7 @@ public UndertowServer(int port) {
super(port);
}

public synchronized boolean start(Channel channel) throws Exception {
public synchronized boolean start(JChannel channel) throws Exception {
boolean started = false;
if (server == null) {
try {
Expand All @@ -53,7 +53,7 @@ public synchronized boolean start(Channel channel) throws Exception {
return started;
}

public synchronized boolean stop(Channel channel) {
public synchronized boolean stop(JChannel channel) {
boolean stopped = false;
removeChannel(channel);
if (server != null && !hasChannels()) {
Expand Down Expand Up @@ -82,7 +82,7 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {

exchange.startBlocking();
String clusterName = exchange.getRequestHeaders().getFirst(CLUSTER_NAME);
Channel channel = server.getChannel(clusterName);
JChannel channel = server.getChannel(clusterName);
try (InputStream stream = exchange.getInputStream()) {
handlePingRequest(channel, stream);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.openshift.ping.common.compatibility;

/**
* Thrown on incompatibility errors
*
* @author Sebastian Łaskawiec
*/
public class CompatibilityException extends RuntimeException {

public CompatibilityException() {
}

public CompatibilityException(String message) {
super(message);
}

public CompatibilityException(String message, Throwable cause) {
super(message, cause);
}

public CompatibilityException(Throwable cause) {
super(cause);
}

public CompatibilityException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.openshift.ping.common.compatibility;

import org.jgroups.Version;

/**
* A small set of compatibility checking utils
*
* @author Sebastian Łaskawiec
*/
public class CompatibilityUtils {

private CompatibilityUtils() {
}

/**
* @return <code>true</code> when JGroups 4 is on the classpath. <code>false</code> otherwise.
*/
public static boolean isJGroups4() {
return Version.major == 4;
}
}
39 changes: 38 additions & 1 deletion kube/src/test/java/org/jgroups/ping/kube/test/PingTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeoutException;

import org.jgroups.JChannel;
import org.jgroups.View;
import org.jgroups.util.Util;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -45,7 +48,7 @@ public void testRestart() throws Exception {
}

protected void doTestCluster() throws Exception {
Util.waitUntilAllChannelsHaveSameSize(10000, 1000, channels);
waitUntilAllChannelsHaveSameView(10000, 1000, channels);

// Tests unicasts from the first to the last
JChannel first = channels[0], last = channels[getNum() - 1];
Expand Down Expand Up @@ -96,4 +99,38 @@ protected void doTestCluster() throws Exception {
clearReceivers();
}

/**
* This method has been copied from JGroups. It changed name a couple of times, so we should
* have a safety copy here...
*/
public static void waitUntilAllChannelsHaveSameView(long timeout, long interval, JChannel... channels) throws TimeoutException {
if(interval >= timeout || timeout <= 0)
throw new IllegalArgumentException("interval needs to be smaller than timeout or timeout needs to be > 0");
long target_time=System.currentTimeMillis() + timeout;
while(System.currentTimeMillis() <= target_time) {
boolean all_channels_have_correct_view=true;
View first=channels[0].getView();
for(JChannel ch : channels) {
View view=ch.getView();
if(!Objects.equals(view, first) || view.size() != channels.length) {
all_channels_have_correct_view=false;
break;
}
}
if(all_channels_have_correct_view)
return;
Util.sleep(interval);
}
View[] views=new View[channels.length];
StringBuilder sb=new StringBuilder();
for(int i=0; i < channels.length; i++) {
views[i]=channels[i].getView();
sb.append(channels[i].getName()).append(": ").append(views[i]).append("\n");
}
View first=channels[0].getView();
for(View view : views)
if(!Objects.equals(view, first))
throw new TimeoutException("Timeout " + timeout + " kicked in, views are:\n" + sb);
}

}
Loading

0 comments on commit f0568c5

Please sign in to comment.