Skip to content

Commit

Permalink
Revert "CLOUD-974 JGroups 4 support"
Browse files Browse the repository at this point in the history
This reverts commit 27c8ca6.
  • Loading branch information
slaskawi committed Mar 14, 2017
1 parent 5f03ab2 commit dae1dd8
Show file tree
Hide file tree
Showing 11 changed files with 31 additions and 147 deletions.
30 changes: 2 additions & 28 deletions common/src/main/java/org/jgroups/ping/common/OpenshiftPing.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URL;
Expand All @@ -33,12 +32,10 @@
import org.jgroups.Event;
import org.jgroups.Message;
import org.jgroups.annotations.Property;
import org.jgroups.protocols.PING;
import org.jgroups.ping.common.server.Server;
import org.jgroups.ping.common.server.ServerFactory;
import org.jgroups.ping.common.server.Servers;
import org.jgroups.protocols.PING;
import org.openshift.ping.common.compatibility.CompatibilityException;
import org.openshift.ping.common.compatibility.CompatibilityUtils;

public abstract class OpenshiftPing extends PING {

Expand Down Expand Up @@ -66,20 +63,9 @@ public abstract class OpenshiftPing extends PING {
private Server _server;
private String _serverName;

private static Method sendMethod; //handled via reflection due to JGroups 3/4 incompatibility

public OpenshiftPing(String systemEnvPrefix) {
super();
_systemEnvPrefix = trimToNull(systemEnvPrefix);
try {
if(CompatibilityUtils.isJGroups4()) {
sendMethod = this.getClass().getMethod("up", Message.class);
} else {
sendMethod = this.getClass().getMethod("up", Event.class);
}
} catch (Exception e) {
throw new CompatibilityException("Could not find suitable 'up' method.", e);
}
}

protected final String getSystemEnvName(String systemEnvSuffix) {
Expand Down Expand Up @@ -207,24 +193,12 @@ public void handlePingRequest(InputStream stream) throws Exception {
Message msg = new Message();
msg.readFrom(dataInput);
try {
sendUp(msg);
up(new Event(Event.MSG, msg));
} catch (Exception e) {
log.error("Error processing GET_MBRS_REQ.", e);
}
}

private void sendUp(Message msg) {
try {
if(CompatibilityUtils.isJGroups4()) {
sendMethod.invoke(this, msg);
} else {
sendMethod.invoke(this, new Event(1, msg));
}
} catch (Exception e) {
throw new CompatibilityException("Could not invoke 'up' method.", e);
}
}

private List<InetSocketAddress> readAll() {
if (isClusteringEnabled()) {
return doReadAll(clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.Map;

import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.jgroups.ping.common.OpenshiftPing;

Expand All @@ -30,13 +31,13 @@
public abstract class AbstractServer implements Server {

protected final int port;
protected final Map<String, JChannel> CHANNELS = new HashMap<>();
protected final Map<String, Channel> CHANNELS = new HashMap<String, Channel>();

protected AbstractServer(int port) {
this.port = port;
}

public final JChannel getChannel(String clusterName) {
public final Channel getChannel(String clusterName) {
if (clusterName != null) {
synchronized (CHANNELS) {
return CHANNELS.get(clusterName);
Expand All @@ -45,7 +46,7 @@ public final JChannel getChannel(String clusterName) {
return null;
}

protected final void addChannel(JChannel channel) {
protected final void addChannel(Channel channel) {
String clusterName = getClusterName(channel);
if (clusterName != null) {
synchronized (CHANNELS) {
Expand All @@ -54,7 +55,7 @@ protected final void addChannel(JChannel channel) {
}
}

protected final void removeChannel(JChannel channel) {
protected final void removeChannel(Channel channel) {
String clusterName = getClusterName(channel);
if (clusterName != null) {
synchronized (CHANNELS) {
Expand All @@ -69,11 +70,11 @@ protected final boolean hasChannels() {
}
}

private String getClusterName(final JChannel channel) {
private String getClusterName(final Channel channel) {
if (channel != null) {
String clusterName = channel.getClusterName();
// clusterName will be null if the Channel is not yet connected, but we still need it!
if (clusterName == null) {
if (clusterName == null && channel instanceof JChannel) {
try {
Field field = JChannel.class.getDeclaredField("cluster_name");
field.setAccessible(true);
Expand All @@ -84,7 +85,7 @@ private String getClusterName(final JChannel channel) {
return null;
}

protected final void handlePingRequest(JChannel channel, InputStream stream) throws Exception {
protected final void handlePingRequest(Channel channel, InputStream stream) throws Exception {
if (channel != null) {
OpenshiftPing handler = (OpenshiftPing) channel.getProtocolStack().findProtocol(OpenshiftPing.class);
handler.handlePingRequest(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jgroups.JChannel;
import org.jgroups.Channel;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
Expand All @@ -38,7 +38,7 @@ public JDKServer(int port) {
super(port);
}

public synchronized boolean start(JChannel channel) throws Exception {
public synchronized boolean start(Channel channel) throws Exception {
boolean started = false;
if (server == null) {
try {
Expand All @@ -57,7 +57,7 @@ public synchronized boolean start(JChannel channel) throws Exception {
return started;
}

public synchronized boolean stop(JChannel channel) {
public synchronized boolean stop(Channel channel) {
boolean stopped = false;
removeChannel(channel);
if (server != null && !hasChannels()) {
Expand All @@ -82,7 +82,7 @@ public void handle(HttpExchange exchange) throws IOException {
exchange.sendResponseHeaders(200, 0);
try {
String clusterName = exchange.getRequestHeaders().getFirst(CLUSTER_NAME);
JChannel channel = server.getChannel(clusterName);
Channel channel = server.getChannel(clusterName);
try (InputStream stream = exchange.getRequestBody()) {
handlePingRequest(channel, stream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package org.jgroups.ping.common.server;

import org.jgroups.JChannel;
import org.jgroups.Channel;

/**
* @author <a href="mailto:ales.justin@jboss.org">Ales Justin</a>
*/
public interface Server {
public static final String CLUSTER_NAME = "CLUSTER_NAME";
public boolean start(JChannel channel) throws Exception;
public boolean stop(JChannel channel);
public JChannel getChannel(String clusterName);
public boolean start(Channel channel) throws Exception;
public boolean stop(Channel channel);
public Channel getChannel(String clusterName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package org.jgroups.ping.common.server;

import java.io.InputStream;

import org.jgroups.JChannel;

import io.undertow.Undertow;
import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;

import java.io.InputStream;

import org.jgroups.Channel;

/**
* @author <a href="mailto:ales.justin@jboss.org">Ales Justin</a>
*/
Expand All @@ -34,7 +34,7 @@ public UndertowServer(int port) {
super(port);
}

public synchronized boolean start(JChannel channel) throws Exception {
public synchronized boolean start(Channel channel) throws Exception {
boolean started = false;
if (server == null) {
try {
Expand All @@ -53,7 +53,7 @@ public synchronized boolean start(JChannel channel) throws Exception {
return started;
}

public synchronized boolean stop(JChannel channel) {
public synchronized boolean stop(Channel channel) {
boolean stopped = false;
removeChannel(channel);
if (server != null && !hasChannels()) {
Expand Down Expand Up @@ -82,7 +82,7 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {

exchange.startBlocking();
String clusterName = exchange.getRequestHeaders().getFirst(CLUSTER_NAME);
JChannel channel = server.getChannel(clusterName);
Channel channel = server.getChannel(clusterName);
try (InputStream stream = exchange.getInputStream()) {
handlePingRequest(channel, stream);
}
Expand Down

This file was deleted.

This file was deleted.

21 changes: 1 addition & 20 deletions kube/src/test/java/org/jgroups/ping/kube/test/PingTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,19 @@

package org.jgroups.ping.kube.test;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;

import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.jgroups.util.Util;
import org.junit.Assert;
import org.junit.Test;
import org.openshift.ping.common.compatibility.CompatibilityException;
import org.openshift.ping.common.compatibility.CompatibilityUtils;

/**
* @author <a href="mailto:ales.justin@jboss.org">Ales Justin</a>
*/
public abstract class PingTestBase extends TestBase {

//handles via reflection because of JGroups 3/4 incompatibility.
private static final Method waitForViewMethod;

static {
try {
if (CompatibilityUtils.isJGroups4()) {
waitForViewMethod = Util.class.getMethod("waitUntilAllChannelsHaveSameView", long.class, long.class, JChannel[].class);
} else {
waitForViewMethod = Util.class.getMethod("waitUntilAllChannelsHaveSameSize", long.class, long.class, Channel[].class);
}
} catch (NoSuchMethodException e) {
throw new CompatibilityException("Could not find proper 'waitUntilAllChannelsHaveSame*' method.", e);
}
}

@Test
public void testCluster() throws Exception {
doTestCluster();
Expand All @@ -64,7 +45,7 @@ public void testRestart() throws Exception {
}

protected void doTestCluster() throws Exception {
waitForViewMethod.invoke(null, 10000, 1000, channels);
Util.waitUntilAllChannelsHaveSameSize(10000, 1000, channels);

// Tests unicasts from the first to the last
JChannel first = channels[0], last = channels[getNum() - 1];
Expand Down
27 changes: 2 additions & 25 deletions kube/src/test/java/org/jgroups/ping/kube/test/ServerTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.lang.reflect.Constructor;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.Collection;

import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Message;
import org.jgroups.PhysicalAddress;
import org.jgroups.View;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.ping.common.server.Server;
import org.jgroups.ping.kube.Client;
Expand All @@ -42,8 +39,6 @@
import org.jgroups.util.Util;
import org.junit.Assert;
import org.junit.Test;
import org.openshift.ping.common.compatibility.CompatibilityException;
import org.openshift.ping.common.compatibility.CompatibilityUtils;

/**
* @author <a href="mailto:ales.justin@jboss.org">Ales Justin</a>
Expand Down Expand Up @@ -76,7 +71,8 @@ public void testResponse() throws Exception {
PhysicalAddress physical_addr = (PhysicalAddress) pinger
.down(new Event(Event.GET_PHYSICAL_ADDRESS, local_addr));
PingHeader hdr = new TestPingHeader();
PingData data = createPingData(local_addr, physical_addr);
PingData data = new PingData(local_addr, null, false, UUID.get(local_addr),
physical_addr != null ? Arrays.asList(physical_addr) : null);
Message msg = new Message(null).setFlag(Message.Flag.DONT_BUNDLE)
.putHeader(pinger.getId(), hdr).setBuffer(streamableToBuffer(data));

Expand All @@ -93,25 +89,6 @@ public void testResponse() throws Exception {
Assert.assertEquals(200, conn.getResponseCode());
}

/*
* Handled via reflection because of JGroups 3/4 incompatibility.
*/
private PingData createPingData(Address local_addr, PhysicalAddress physical_addr) {
try {
if(CompatibilityUtils.isJGroups4()) {
Constructor<PingData> constructor = PingData.class.getConstructor(Address.class, boolean.class);
return constructor.newInstance(local_addr, false);
} else {
String localAddressAsString = (String) UUID.class.getMethod("get", Address.class).invoke(null, local_addr);
Constructor<PingData> constructor = PingData.class.getConstructor(Address.class, View.class, boolean.class, String.class, Collection.class);;
return constructor.newInstance(local_addr, null, false, localAddressAsString,
physical_addr != null ? Arrays.asList(physical_addr) : null);
}
} catch (Exception e) {
throw new CompatibilityException("Could not find or invoke proper 'PingData' constructor");
}
}

private static Buffer streamableToBuffer(Streamable obj) {
final ByteArrayOutputStream out_stream = new ByteArrayOutputStream(512);
DataOutputStream out = new DataOutputStream(out_stream);
Expand Down
Loading

0 comments on commit dae1dd8

Please sign in to comment.