";
+ }
+ }
+
+ public final int serializedSize() {
+ int retval=Global.BYTE_SIZE; // type
+ switch(type) {
+ case DATA:
+ retval+=Bits.size(seqno) // seqno
+ + Global.SHORT_SIZE // conn_id
+ + Global.BYTE_SIZE; // first
+ break;
+ case ACK:
+ retval+=Bits.size(seqno)
+ + Global.SHORT_SIZE // conn_id
+ + Bits.size(timestamp);
+ break;
+ case SEND_FIRST_SEQNO:
+ retval+=Bits.size(timestamp);
+ break;
+ case XMIT_REQ:
+ break;
+ case CLOSE:
+ retval+=Global.SHORT_SIZE; // conn-id
+ break;
+ }
+ return retval;
+ }
+
+ public UnicastHeader copy() {
+ return new UnicastHeader(type, seqno, conn_id, first);
+ }
+
+ /**
+ * The following types and fields are serialized:
+ *
+ * | DATA | seqno | conn_id | first |
+ * | ACK | seqno | timestamp |
+ * | SEND_FIRST_SEQNO | timestamp |
+ * | CLOSE | conn_id |
+ *
+ */
+ @Override
+ public void writeTo(DataOutput out) throws IOException {
+ out.writeByte(type);
+ switch(type) {
+ case DATA:
+ Bits.writeLongCompressed(seqno, out);
+ out.writeShort(conn_id);
+ out.writeBoolean(first);
+ break;
+ case ACK:
+ Bits.writeLongCompressed(seqno, out);
+ out.writeShort(conn_id);
+ Bits.writeIntCompressed(timestamp, out);
+ break;
+ case SEND_FIRST_SEQNO:
+ Bits.writeIntCompressed(timestamp, out);
+ break;
+ case XMIT_REQ:
+ break;
+ case CLOSE:
+ out.writeShort(conn_id);
+ break;
+ }
+ }
+
+ @Override
+ public void readFrom(DataInput in) throws IOException {
+ type=in.readByte();
+ switch(type) {
+ case DATA:
+ seqno=Bits.readLongCompressed(in);
+ conn_id=in.readShort();
+ first=in.readBoolean();
+ break;
+ case ACK:
+ seqno=Bits.readLongCompressed(in);
+ conn_id=in.readShort();
+ timestamp=Bits.readIntCompressed(in);
+ break;
+ case SEND_FIRST_SEQNO:
+ timestamp=Bits.readIntCompressed(in);
+ break;
+ case XMIT_REQ:
+ break;
+ case CLOSE:
+ conn_id=in.readShort();
+ break;
+ }
+ }
+}
diff --git a/src/org/jgroups/protocols/pbcast/CoordGmsImpl.java b/src/org/jgroups/protocols/pbcast/CoordGmsImpl.java
index b4502b9321f..7c4b25d5358 100644
--- a/src/org/jgroups/protocols/pbcast/CoordGmsImpl.java
+++ b/src/org/jgroups/protocols/pbcast/CoordGmsImpl.java
@@ -93,8 +93,6 @@ public void handleMembershipChange(Collection requests) {
for(Request req: requests) {
switch(req.type) {
case Request.JOIN:
- new_mbrs.add(req.mbr);
- break;
case Request.JOIN_WITH_STATE_TRANSFER:
new_mbrs.add(req.mbr);
break;
diff --git a/src/org/jgroups/util/Average.java b/src/org/jgroups/util/Average.java
index 41960a0bb05..fff217a898c 100644
--- a/src/org/jgroups/util/Average.java
+++ b/src/org/jgroups/util/Average.java
@@ -53,8 +53,11 @@ public T add(long num) {
public T merge(T other) {
if(other == null)
return (T)this;
- for(int i=0; i < other.samples.length(); i++)
- add(other.samples.get(i));
+ for(int i=0; i < other.samples.length(); i++) {
+ Double el=other.samples.get(i);
+ if(el != null)
+ add(el);
+ }
return (T)this;
}
@@ -91,18 +94,29 @@ public String toString(TimeUnit u) {
@Override
public void writeTo(DataOutput out) throws IOException {
Bits.writeIntCompressed(samples.length(), out);
- for(int i=0; i < samples.length(); i++)
- Bits.writeDouble(samples.get(i), out);
+ for(int i=0; i < samples.length(); i++) {
+ Double sample=samples.get(i);
+ boolean not_null=sample != null;
+ out.writeBoolean(not_null);
+ if(not_null)
+ Bits.writeDouble(sample, out);
+ }
+ out.writeInt(index);
Bits.writeDouble(total.sum(), out);
+ Bits.writeLongCompressed(count.sum(), out);
}
@Override
public void readFrom(DataInput in) throws IOException {
int len=Bits.readIntCompressed(in);
samples=new AtomicReferenceArray<>(len);
- for(int i=0; i < samples.length(); i++)
- samples.set(i, Bits.readDouble(in));
+ for(int i=0; i < samples.length(); i++) {
+ if(in.readBoolean())
+ samples.set(i, Bits.readDouble(in));
+ }
+ index=in.readInt();
total.add(Bits.readDouble(in));
+ count.add(Bits.readLongCompressed(in));
}
protected int nextIndex() {
diff --git a/src/org/jgroups/util/Buffer.java b/src/org/jgroups/util/Buffer.java
index 5070bf5d753..23020f6827b 100644
--- a/src/org/jgroups/util/Buffer.java
+++ b/src/org/jgroups/util/Buffer.java
@@ -87,6 +87,18 @@ public boolean add(long seqno, T element) {
// used: MessageBatch received
public abstract boolean add(MessageBatch batch, Function seqno_getter, boolean remove_from_batch, T const_value);
+ /**
+ * Adds elements from the list
+ * @param list The list of tuples of seqnos and elements. If remove_added_elements is true, if elements could
+ * not be added (e.g. because they were already present or the seqno was < HD), those
+ * elements will be removed from list
+ * @param remove_added_elements If true, elements that could not be added to the table are removed from list
+ * @param const_value If non-null, this value should be used rather than the values of the list tuples
+ * @return True if at least 1 element was added successfully, false otherwise.
+ */
+ // used: MessageBatch received by UNICAST3/4
+ public abstract boolean add(final List> list, boolean remove_added_elements, T const_value);
+
// used: retransmision etc
public abstract T get(long seqno);
@@ -250,7 +262,7 @@ public String dump() {
@Override
public String toString() {
- return String.format("[%,d | %,d | %,d] (%,d elements, %,d missing)", low, hd, high, size, numMissing());
+ return String.format("[%,d | %,d | %,d] (size: %,d, missing: %,d)", low, hd, high, size, numMissing());
}
public static class Options {
diff --git a/src/org/jgroups/util/DynamicBuffer.java b/src/org/jgroups/util/DynamicBuffer.java
index 0cb72ea1408..caabb42c29b 100644
--- a/src/org/jgroups/util/DynamicBuffer.java
+++ b/src/org/jgroups/util/DynamicBuffer.java
@@ -197,6 +197,32 @@ public boolean add(MessageBatch batch, Function seqno_getter, boolean re
}
}
+ public boolean add(final List> list, boolean remove_added_elements, T const_value) {
+ if(list == null || list.isEmpty())
+ return false;
+ boolean added=false;
+ // find the highest seqno (unfortunately, the list is not ordered by seqno)
+ long highest_seqno=findHighestSeqno(list);
+ lock.lock();
+ try {
+ if(highest_seqno != -1 && computeRow(highest_seqno) >= matrix.length)
+ resize(highest_seqno);
+
+ for(Iterator> it=list.iterator(); it.hasNext();) {
+ LongTuple tuple=it.next();
+ long seqno=tuple.getVal1();
+ T element=const_value != null? const_value : tuple.getVal2();
+ if(add(seqno, element, null, null))
+ added=true;
+ else if(remove_added_elements)
+ it.remove();
+ }
+ return added;
+ }
+ finally {
+ lock.unlock();
+ }
+ }
/**
* Returns an element at seqno
diff --git a/src/org/jgroups/util/FixedBuffer.java b/src/org/jgroups/util/FixedBuffer.java
index 2d30f4ace8a..9817021e65d 100644
--- a/src/org/jgroups/util/FixedBuffer.java
+++ b/src/org/jgroups/util/FixedBuffer.java
@@ -140,6 +140,27 @@ public boolean add(MessageBatch batch, Function seqno_getter, boolean re
}
}
+ public boolean add(final List> list, boolean remove_added_elements, T const_value) {
+ if(list == null || list.isEmpty())
+ return false;
+ boolean added=false;
+ lock.lock();
+ try {
+ for(Iterator> it=list.iterator(); it.hasNext();) {
+ LongTuple tuple=it.next();
+ long seqno=tuple.getVal1();
+ T element=const_value != null? const_value : tuple.getVal2();
+ if(add(seqno, element, null, null))
+ added=true;
+ else if(remove_added_elements)
+ it.remove();
+ }
+ return added;
+ }
+ finally {
+ lock.unlock();
+ }
+ }
/**
* Removes the next non-null element and advances hd
@@ -356,13 +377,6 @@ protected int index(long seqno) {
//return (int)((seqno - offset - 1) & (capacity() - 1));
}
- protected int index(long seqno, int capacity) {
- // return (int)((seqno-1) % capacity); // apparently slower than the computation below
-
- // apparently this is faster than mod for n^2 capacity
- return (int)((seqno - offset - 1) & (capacity - 1));
- }
-
@GuardedBy("lock")
protected boolean block(long seqno) {
while(open && seqno - low > capacity()) {
diff --git a/src/org/jgroups/util/MaxOneThreadPerSender.java b/src/org/jgroups/util/MaxOneThreadPerSender.java
index f6a02b10426..5baa4a99bbb 100644
--- a/src/org/jgroups/util/MaxOneThreadPerSender.java
+++ b/src/org/jgroups/util/MaxOneThreadPerSender.java
@@ -7,7 +7,8 @@
import org.jgroups.protocols.TP;
import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
@@ -148,7 +149,8 @@ protected Entry(Address sender, boolean mcast, AsciiString cluster_name) {
this.sender=sender;
this.cluster_name=cluster_name;
int cap=max_buffer_size > 0? max_buffer_size : DEFAULT_INITIAL_CAPACITY; // initial capacity
- batch=new MessageBatch(cap).dest(tp.getAddress()).sender(sender).clusterName(cluster_name).multicast(mcast);
+ batch=new MessageBatch(cap).dest(tp.getAddress()).sender(sender).clusterName(cluster_name)
+ .multicast(mcast).mode(MessageBatch.Mode.REG); // only regular messages are queued
batch.array().increment(DEFAULT_INCREMENT);
msg_queue=max_buffer_size > 0? new FastArray<>(max_buffer_size) : new FastArray<>(DEFAULT_INITIAL_CAPACITY);
msg_queue.increment(DEFAULT_INCREMENT);
@@ -228,7 +230,6 @@ protected boolean workAvailable() {
}
}
-
// unsynchronized on batch but who cares
public String toString() {
return String.format("msg_queue.size=%,d msg_queue.cap: %,d batch.cap=%,d queued msgs=%,d submitted batches=%,d",
@@ -251,8 +252,17 @@ public void run() {
while(entry.workAvailable() || entry.adders.decrementAndGet() != 0) {
try {
MessageBatch mb=entry.batch;
- if(mb == null || mb.isEmpty() || (!mb.multicast() && tp.unicastDestMismatch(mb.dest())))
+ if(mb.isEmpty())
continue;
+ if(!mb.multicast()) {
+ // due to an incorrect (e.g. late) view change, the cached batch's destination might be
+ // different from our local address. If this is the case, change the cached batch's dest address
+ if(tp.unicastDestMismatch(mb.dest())) {
+ Address dest=tp.addr();
+ if(dest != null)
+ mb.dest(dest);
+ }
+ }
tp.passBatchUp(mb, !loopback, !loopback);
}
catch(Throwable t) {
diff --git a/src/org/jgroups/util/MessageBatch.java b/src/org/jgroups/util/MessageBatch.java
index da91f76c218..9759a268514 100644
--- a/src/org/jgroups/util/MessageBatch.java
+++ b/src/org/jgroups/util/MessageBatch.java
@@ -93,6 +93,7 @@ public MessageBatch(Address dest, Address sender, AsciiString cluster_name, bool
public int capacity() {return messages.capacity();}
public long timestamp() {return timestamp;}
public MessageBatch timestamp(long ts) {timestamp=ts; return this;}
+ public MessageBatch increment(int i) {messages.increment(i); return this;}
/** Returns the underlying message array. This is only intended for testing ! */
@@ -294,7 +295,6 @@ public String toString() {
if(sb.length() > 0)
sb.append(", ");
sb.append(size() + " messages [capacity=" + messages.capacity() + "]");
-
return sb.toString();
}
diff --git a/src/org/jgroups/util/MessageCache.java b/src/org/jgroups/util/MessageCache.java
index bfed5879641..ea863ed334d 100644
--- a/src/org/jgroups/util/MessageCache.java
+++ b/src/org/jgroups/util/MessageCache.java
@@ -16,27 +16,21 @@
*/
public class MessageCache {
protected final Map> map=new ConcurrentHashMap<>();
- protected volatile boolean is_empty=true;
public MessageCache add(Address sender, Message msg) {
Queue list=map.computeIfAbsent(sender, addr -> new ConcurrentLinkedQueue<>());
list.add(msg);
- is_empty=false;
return this;
}
public Collection drain(Address sender) {
if(sender == null)
return null;
- Queue queue=map.remove(sender);
- if(map.isEmpty())
- is_empty=true;
- return queue;
+ return map.remove(sender);
}
public MessageCache clear() {
map.clear();
- is_empty=true;
return this;
}
@@ -46,7 +40,7 @@ public int size() {
}
public boolean isEmpty() {
- return is_empty;
+ return map.isEmpty();
}
public String toString() {
diff --git a/src/org/jgroups/util/MockTransport.java b/src/org/jgroups/util/MockTransport.java
new file mode 100644
index 00000000000..fe1649607ac
--- /dev/null
+++ b/src/org/jgroups/util/MockTransport.java
@@ -0,0 +1,31 @@
+package org.jgroups.util;
+
+import org.jgroups.Message;
+import org.jgroups.PhysicalAddress;
+import org.jgroups.protocols.NoBundler;
+import org.jgroups.protocols.TP;
+
+/**
+ * A dummy implementation of {@link TP}
+ * @author Bela Ban
+ * @since 5.4
+ */
+public class MockTransport extends TP {
+
+ public void init() throws Exception {
+ super.init();
+ setBundler(new NoBundler());
+ }
+ public boolean supportsMulticasting() {return true;}
+ public void sendUnicast(PhysicalAddress dest, byte[] data, int offset, int length) throws Exception {}
+ public String getInfo() {return null;}
+ protected PhysicalAddress getPhysicalAddress() {return null;}
+ public MockTransport cluster(AsciiString s) {this.cluster_name=s; return this;}
+
+
+ public Object down(Message msg) {
+ return null;
+ }
+
+
+}
diff --git a/src/org/jgroups/util/TLS.java b/src/org/jgroups/util/TLS.java
index 7dd805a48b7..8a291af58ce 100644
--- a/src/org/jgroups/util/TLS.java
+++ b/src/org/jgroups/util/TLS.java
@@ -8,8 +8,6 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLServerSocket;
-import java.io.File;
-import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.List;
@@ -120,11 +118,6 @@ public void init() throws Exception {
truststore_type=keystore_type;
truststore_password=keystore_password;
}
- if(keystore_path != null) {
- File tmp=new File(keystore_path);
- if(!tmp.exists())
- throw new FileNotFoundException(keystore_path);
- }
}
public SSLContext createContext() {
diff --git a/src/org/jgroups/util/Table.java b/src/org/jgroups/util/Table.java
index 42f7085b187..f5417d03283 100644
--- a/src/org/jgroups/util/Table.java
+++ b/src/org/jgroups/util/Table.java
@@ -105,7 +105,7 @@ public Table(int num_rows, int elements_per_row, long offset, double resize_fact
* @param num_rows the number of rows in the matrix
* @param elements_per_row the number of elements per row
* @param offset the seqno before the first seqno to be inserted. E.g. if 0 then the first seqno will be 1
- * @param resize_factor teh factor with which to increase the number of rows
+ * @param resize_factor the factor with which to increase the number of rows
* @param max_compaction_time the max time in milliseconds after we attempt a compaction
*/
public Table(int num_rows, int elements_per_row, long offset, double resize_factor, long max_compaction_time) {
diff --git a/src/org/jgroups/util/Util.java b/src/org/jgroups/util/Util.java
index afe3b09c57d..1704f709e09 100644
--- a/src/org/jgroups/util/Util.java
+++ b/src/org/jgroups/util/Util.java
@@ -7,6 +7,7 @@
import org.jgroups.blocks.cs.Connection;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.jmx.JmxConfigurator;
+import org.jgroups.jmx.ResourceDMBean;
import org.jgroups.logging.Log;
import org.jgroups.protocols.*;
import org.jgroups.protocols.pbcast.GMS;
@@ -107,7 +108,9 @@ public class Util {
private static final byte[] TYPE_BOOLEAN_TRUE={TYPE_BOOLEAN, 1};
private static final byte[] TYPE_BOOLEAN_FALSE={TYPE_BOOLEAN, 0};
- public static final Class extends Protocol>[] getUnicastProtocols() {return new Class[]{UNICAST3.class};}
+ public static final Class extends Protocol>[] getUnicastProtocols() {
+ return new Class[]{UNICAST3.class,UNICAST4.class};
+ }
public enum AddressScope {GLOBAL,SITE_LOCAL,LINK_LOCAL,LOOPBACK,NON_LOOPBACK}
@@ -380,7 +383,13 @@ public static String printViews(JChannel ... channels) {
return sb.toString();
}
-
+ public static String print(MessageBatch batch, boolean print_headers) {
+ int count=1;
+ StringBuilder sb=new StringBuilder(String.format("%s:\n", batch.toString()));
+ for(Message msg: batch)
+ sb.append(String.format(" %d: %s%s\n", count++, msg, print_headers? String.format(", hdrs: %s", msg.printHeaders()) : ""));
+ return sb.toString();
+ }
/**
* Waits until a list has the expected number of elements. Throws an exception if not met
@@ -1237,7 +1246,7 @@ public static T streamableFromByteBuffer(Class extends
public static T streamableFromByteBuffer(Class extends Streamable> cl, ByteBuffer buffer) throws Exception {
if(buffer == null) return null;
DataInput in=new ByteBufferInputStream(buffer);
- T retval=(T)cl.newInstance();
+ T retval=(T)cl.getConstructor().newInstance();
retval.readFrom(in);
return retval;
}
@@ -1279,10 +1288,10 @@ public static ByteArray messageToBuffer(Message msg) throws Exception {
}
- public static Message messageFromBuffer(byte[] buf, int offset, int length, MessageFactory mf) throws Exception {
+ public static Message messageFromBuffer(byte[] buf, int offset, int length) throws Exception {
ByteArrayDataInputStream in=new ByteArrayDataInputStream(buf, offset, length);
short type=in.readShort();
- Message msg=mf.create(type);
+ Message msg=MessageFactory.create(type);
msg.readFrom(in);
return msg;
}
@@ -1298,12 +1307,12 @@ public static ByteArray messageToByteBuffer(Message msg) throws Exception {
}
- public static Message messageFromByteBuffer(byte[] buffer, int offset, int length, MessageFactory mf) throws Exception {
+ public static Message messageFromByteBuffer(byte[] buffer, int offset, int length) throws Exception {
DataInput in=new ByteArrayDataInputStream(buffer,offset,length);
if(!in.readBoolean())
return null;
short type=in.readShort();
- Message msg=mf.create(type);
+ Message msg=MessageFactory.create(type);
msg.readFrom(in);
return msg;
}
@@ -1435,9 +1444,9 @@ public static void writeMessage(Message msg, DataOutput dos, boolean multicast)
msg.writeTo(dos);
}
- public static Message readMessage(DataInput in, MessageFactory mf) throws IOException, ClassNotFoundException {
+ public static Message readMessage(DataInput in) throws IOException, ClassNotFoundException {
short type=in.readShort();
- Message msg=mf.create(type);
+ Message msg=MessageFactory.create(type);
msg.readFrom(in);
return msg;
}
@@ -1512,7 +1521,7 @@ public static void writeMessageListHeader(Address dest, Address src, byte[] clus
}
- public static List readMessageList(DataInput in, short transport_id, MessageFactory mf)
+ public static List readMessageList(DataInput in, short transport_id)
throws IOException, ClassNotFoundException {
List list=new LinkedList<>();
Address dest=Util.readAddress(in);
@@ -1527,7 +1536,7 @@ public static List readMessageList(DataInput in, short transport_id, Me
for(int i=0; i < len; i++) {
short type=in.readShort(); // skip the
- Message msg=mf.create(type);
+ Message msg=MessageFactory.create(type);
msg.readFrom(in);
msg.setDest(dest);
if(msg.getSrc() == null)
@@ -1549,7 +1558,7 @@ public static List readMessageList(DataInput in, short transport_id, Me
*
* @return an array of 2 MessageBatches in the order above, the first batch is at index 0
*/
- public static MessageBatch[] readMessageBatch(DataInput in, boolean multicast, MessageFactory factory)
+ public static MessageBatch[] readMessageBatch(DataInput in, boolean multicast)
throws IOException, ClassNotFoundException {
MessageBatch[] batches=new MessageBatch[2]; // [0]: reg, [1]: OOB
Address dest=Util.readAddress(in);
@@ -1562,7 +1571,7 @@ public static MessageBatch[] readMessageBatch(DataInput in, boolean multicast, M
int len=in.readInt();
for(int i=0; i < len; i++) {
short type=in.readShort();
- Message msg=factory.create(type).setDest(dest).setSrc(src);
+ Message msg=MessageFactory.create(type).setDest(dest).setSrc(src);
msg.readFrom(in);
boolean oob=msg.isFlagSet(Message.Flag.OOB);
int index=0;
@@ -1590,7 +1599,6 @@ public static void parse(InputStream input, BiConsumer msg_consum
if(msg_consumer == null && batch_consumer == null)
return;
byte[] tmp=new byte[Global.INT_SIZE];
- MessageFactory mf=new DefaultMessageFactory();
try(DataInputStream dis=new DataInputStream(input)) {
for(;;) {
// for TCP, we send the length first; this needs to be skipped as it is not part of the JGroups payload
@@ -1623,7 +1631,7 @@ public static void parse(InputStream input, BiConsumer msg_consum
boolean is_message_list=(flags & LIST) == LIST;
boolean multicast=(flags & MULTICAST) == MULTICAST;
if(is_message_list) { // used if message bundling is enabled
- final MessageBatch[] batches=Util.readMessageBatch(dis,multicast, mf);
+ final MessageBatch[] batches=Util.readMessageBatch(dis,multicast);
for(MessageBatch batch: batches) {
if(batch == null)
continue;
@@ -1636,7 +1644,7 @@ public static void parse(InputStream input, BiConsumer msg_consum
}
}
else {
- Message msg=Util.readMessage(dis, mf);
+ Message msg=Util.readMessage(dis);
if(msg_consumer != null)
msg_consumer.accept(version, msg);
}
@@ -3009,7 +3017,7 @@ public static Collection determineMergeParticipants(Map m
View view=map.get(coord);
Collection mbrs=view != null? view.getMembers() : null;
if(mbrs != null)
- all_addrs.removeAll(mbrs);
+ mbrs.forEach(all_addrs::remove);
}
coords.addAll(all_addrs);
return coords;
@@ -3521,6 +3529,51 @@ public static Method findMethod(Class> target_class, String method_name, Objec
return retval;
}
+ public static Method findMethod2(Class> target_class, String method_name, Object[] args) throws Exception {
+ int len=args != null? args.length : 0;
+ Method retval=null;
+ Method[] methods=getAllMethods(target_class);
+ for(int i=0; i < methods.length; i++) {
+ Method m=methods[i];
+ if(m.getName().equals(method_name)) {
+ Class>[] parameter_types=m.getParameterTypes();
+ if(parameter_types.length == len) {
+ boolean all_args_match=true;
+ // now check if actual and parameter types match:
+ for(int j=0; j < parameter_types.length; j++) {
+ Class> parameter=parameter_types[j];
+ Class> actual=args[j] != null? args[j].getClass() : null;
+ if(actual != null && !isAssignableFrom(parameter, actual)) {
+ all_args_match=false;
+ break;
+ }
+ }
+ if(all_args_match)
+ return m;
+ }
+ }
+ }
+ return retval;
+ }
+
+ public static boolean isAssignableFrom(Class> left, Class> right) {
+ if(left == null)
+ return false;
+ if(right == null)
+ return left == null || !left.isPrimitive();
+ if(left == right)
+ return true;
+ // at this point, left and right are not null
+ if(left.isAssignableFrom(right))
+ return true;
+ return ResourceDMBean.isNumber(left) && ResourceDMBean.isNumber(right);
+ }
+
+ public static Object invoke(Object target, String method_name, Object... args) throws Exception {
+ Method method=Util.findMethod2(target.getClass(), method_name, args);
+ return method.invoke(target, args);
+ }
+
/**
* The method walks up the class hierarchy and returns all methods of this class
* and those inherited from superclasses and superinterfaces.
@@ -4955,10 +5008,7 @@ public static String substituteVariable(String input, Properties p) {
sb.append(ch);
break;
default:
- if(cache != null)
- cache.append(ch);
- else
- sb.append(ch);
+ Objects.requireNonNullElse(cache, sb).append(ch);
break;
}
}
diff --git a/tests/junit-functional/org/jgroups/protocols/ENCRYPTKeystoreTest.java b/tests/junit-functional/org/jgroups/protocols/ENCRYPTKeystoreTest.java
index 51be011dd71..67391dc6607 100644
--- a/tests/junit-functional/org/jgroups/protocols/ENCRYPTKeystoreTest.java
+++ b/tests/junit-functional/org/jgroups/protocols/ENCRYPTKeystoreTest.java
@@ -5,7 +5,6 @@
import org.jgroups.BytesMessage;
-import org.jgroups.DefaultMessageFactory;
import org.jgroups.Global;
import org.jgroups.Message;
import org.jgroups.conf.ClassConfigurator;
@@ -33,7 +32,6 @@ public void testInitWrongKeystoreProperties() {
SYM_ENCRYPT encrypt=new SYM_ENCRYPT().keystoreName("unkownKeystore.keystore");
try {
encrypt.init();
- encrypt.msgFactory(new DefaultMessageFactory());
}
catch(Exception e) {
System.out.println("didn't find incorrect keystore (as expected): " + e.getMessage());
@@ -43,7 +41,6 @@ public void testInitWrongKeystoreProperties() {
public void testInitKeystoreProperties() throws Exception {
SYM_ENCRYPT encrypt=new SYM_ENCRYPT().keystoreName("defaultStore.keystore");
encrypt.init();
- encrypt.msgFactory(new DefaultMessageFactory());
}
public void testMessageDownEncode() throws Exception {
@@ -157,7 +154,6 @@ public void testEncryptEntireMessage() throws Exception {
protected SYM_ENCRYPT create(String keystore) throws Exception {
SYM_ENCRYPT encrypt=new SYM_ENCRYPT().keystoreName(keystore).symAlgorithm(symAlgorithm()).symIvLength(symIvLength());
encrypt.init();
- encrypt.msgFactory(new DefaultMessageFactory());
return encrypt;
}
diff --git a/tests/junit-functional/org/jgroups/protocols/EncryptTest.java b/tests/junit-functional/org/jgroups/protocols/EncryptTest.java
index 6cf7a3edace..275bcf19595 100644
--- a/tests/junit-functional/org/jgroups/protocols/EncryptTest.java
+++ b/tests/junit-functional/org/jgroups/protocols/EncryptTest.java
@@ -129,7 +129,6 @@ public void testMessageSendingByRogueUsingEncryption() throws Exception {
secretKey.setAccessible(true);
Util.setField(secretKey, encrypt, secret_key);
encrypt.init();
- encrypt.msgFactory(new DefaultMessageFactory());
short encrypt_id=ClassConfigurator.getProtocolId(SYM_ENCRYPT.class);
byte[] iv = encrypt.makeIv();
diff --git a/tests/junit-functional/org/jgroups/protocols/UNICAST_ConnectionTests.java b/tests/junit-functional/org/jgroups/protocols/UNICAST_ConnectionTests.java
index 6bd48ac9284..32863b28edd 100644
--- a/tests/junit-functional/org/jgroups/protocols/UNICAST_ConnectionTests.java
+++ b/tests/junit-functional/org/jgroups/protocols/UNICAST_ConnectionTests.java
@@ -3,6 +3,7 @@
import org.jgroups.*;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
+import org.jgroups.util.MyReceiver;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
@@ -12,32 +13,32 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
-import java.util.stream.Stream;
/**
* Tests unilateral closings of UNICAST connections. The test scenarios are described in doc/design/UNICAST2.txt.
* Some of the tests may fail occasionally until https://issues.redhat.com/browse/JGRP-1594 is fixed
* @author Bela Ban
*/
-@Test(groups=Global.FUNCTIONAL,singleThreaded=true)
+@Test(groups=Global.FUNCTIONAL,singleThreaded=true,dataProvider="configProvider")
public class UNICAST_ConnectionTests {
- protected JChannel a, b;
- protected Address a_addr, b_addr;
- protected MyReceiver r1, r2;
- protected Protocol u1, u2;
+ protected JChannel a, b;
+ protected Address a_addr, b_addr;
+ protected MyReceiver r1, r2;
+ protected Protocol u1, u2;
protected static final String CLUSTER="UNICAST_ConnectionTests";
-
+
@DataProvider
static Object[][] configProvider() {
return new Object[][]{
- {UNICAST3.class}
+ {UNICAST3.class},
+ {UNICAST4.class}
};
}
- protected void setup(Class extends UNICAST3> unicast_class) throws Exception {
- r1=new MyReceiver("A");
- r2=new MyReceiver("B");
+ protected void setup(Class extends Protocol> unicast_class) throws Exception {
+ r1=new MyReceiver().name("A");
+ r2=new MyReceiver().name("B");
a=createChannel(unicast_class, "A");
a.connect(CLUSTER);
a_addr=a.getAddress();
@@ -59,7 +60,7 @@ protected void setup(Class extends UNICAST3> unicast_class) throws Exception {
* @throws Exception
*/
@Test(dataProvider="configProvider")
- public void testRegularMessageReception(Class extends UNICAST3> unicast) throws Exception {
+ public void testRegularMessageReception(Class extends Protocol> unicast) throws Exception {
setup(unicast);
sendAndCheck(a, b_addr, 100, r2);
sendAndCheck(b,a_addr,50,r1);
@@ -70,7 +71,7 @@ public void testRegularMessageReception(Class extends UNICAST3> unicast) throw
* Tests case #3 of UNICAST.new.txt
*/
@Test(dataProvider="configProvider")
- public void testBothChannelsClosing(Class extends UNICAST3> unicast) throws Exception {
+ public void testBothChannelsClosing(Class extends Protocol> unicast) throws Exception {
setup(unicast);
sendToEachOtherAndCheck(10);
@@ -78,7 +79,7 @@ public void testBothChannelsClosing(Class extends UNICAST3> unicast) throws Ex
System.out.println("==== Closing the connections on both sides");
removeConnection(u1, b_addr);
removeConnection(u2, a_addr);
- r1.clear(); r2.clear();
+ r1.reset(); r2.reset();
// causes new connection establishment
sendToEachOtherAndCheck(10);
@@ -89,7 +90,7 @@ public void testBothChannelsClosing(Class extends UNICAST3> unicast) throws Ex
* Scenario #4 (A closes the connection unilaterally (B keeps it open), then reopens it and sends messages)
*/
@Test(dataProvider="configProvider")
- public void testAClosingUnilaterally(Class extends UNICAST3> unicast) throws Exception {
+ public void testAClosingUnilaterally(Class extends Protocol> unicast) throws Exception {
setup(unicast);
sendToEachOtherAndCheck(10);
@@ -105,7 +106,7 @@ public void testAClosingUnilaterally(Class extends UNICAST3> unicast) throws E
* Scenario #5 (B closes the connection unilaterally (A keeps it open), then A sends messages to B)
*/
@Test(dataProvider="configProvider")
- public void testBClosingUnilaterally(Class extends UNICAST3> unicast) throws Exception {
+ public void testBClosingUnilaterally(Class extends Protocol> unicast) throws Exception {
setup(unicast);
sendToEachOtherAndCheck(10);
@@ -117,11 +118,7 @@ public void testBClosingUnilaterally(Class extends UNICAST3> unicast) throws E
sendAndCheck(a, b_addr, 10, r2);
}
- @Test(dataProvider="configProvider")
- public void testBRemovingUnilaterally(Class extends UNICAST3> unicast) throws Exception {
- if(!unicast.equals(UNICAST3.class))
- return; // only tested for UNICAST3
-
+ public void testBRemovingUnilaterally(Class extends Protocol> unicast) throws Exception {
setup(unicast);
sendAndCheck(a, b_addr, 10, r2);
@@ -133,13 +130,25 @@ public void testBRemovingUnilaterally(Class extends UNICAST3> unicast) throws
sendAndCheck(a, b_addr, 10, r2);
}
+ public void testBRemovingUnilaterallyOOB(Class extends Protocol> unicast) throws Exception {
+ setup(unicast);
+ sendAndCheck(a, b_addr, 10, r2);
+
+ // now remove connection on A unilaterally
+ System.out.println("==== Removing the connection on B");
+ removeConnection(u2, a_addr, true);
+
+ // then send OOB messages from A to B
+ sendAndCheck(a, b_addr, true, 10, r2);
+ }
+
/**
* Scenario #6 (A closes the connection unilaterally (B keeps it open), then reopens it and sends messages,
* but loses the first message
*/
@Test(dataProvider="configProvider")
- public void testAClosingUnilaterallyButLosingFirstMessage(Class extends UNICAST3> unicast) throws Exception {
+ public void testAClosingUnilaterallyButLosingFirstMessage(Class extends Protocol> unicast) throws Exception {
setup(unicast);
sendAndCheck(a, b_addr, 10, r2);
@@ -157,7 +166,7 @@ public void testAClosingUnilaterallyButLosingFirstMessage(Class extends UNICAS
/** Tests concurrent reception of multiple messages with a different conn_id (https://issues.redhat.com/browse/JGRP-1347) */
@Test(dataProvider="configProvider")
- public void testMultipleConcurrentResets(Class extends UNICAST3> unicast) throws Exception {
+ public void testMultipleConcurrentResets(Class extends Protocol> unicast) throws Exception {
setup(unicast);
sendAndCheck(a, b_addr, 1, r2);
@@ -165,12 +174,10 @@ public void testMultipleConcurrentResets(Class extends UNICAST3> unicast) thro
System.out.println("==== Closing the connection on A");
removeConnection(u1, b_addr);
- r2.clear();
-
+ r2.reset();
final Protocol ucast=b.getProtocolStack().findProtocol(Util.getUnicastProtocols());
int NUM=10;
-
final List msgs=new ArrayList<>(NUM);
for(int i=1; i <= NUM; i++) {
@@ -201,16 +208,17 @@ public void testMultipleConcurrentResets(Class extends UNICAST3> unicast) thro
for(Thread thread: threads)
thread.join();
- List list=r2.getMessages();
+ List list=r2.list();
System.out.println("list = " + print(list));
assert list.size() == 1 : "list must have 1 element but has " + list.size() + ": " + print(list);
}
@Test(dataProvider="configProvider")
- public void testMessageToNonExistingMember(Class extends UNICAST3> unicast) throws Exception {
+ public void testMessageToNonExistingMember(Class extends Protocol> unicast) throws Exception {
setup(unicast);
- Stream.of(a,b).forEach(ch -> ((UNICAST3)ch.getProtocolStack().findProtocol(unicast)).setMaxRetransmitTime(5000));
+ for(JChannel ch: List.of(a,b))
+ Util.invoke(ch.stack().findProtocol(unicast), "setMaxRetransmitTime", 5000L);
Address target=Util.createRandomAddress("FakeAddress");
a.send(target, "hello");
Protocol prot=a.getProtocolStack().findProtocol(unicast);
@@ -224,10 +232,11 @@ public void testMessageToNonExistingMember(Class extends UNICAST3> unicast) th
assert !(Boolean)hasSendConnectionTo.invoke(prot, target);
}
-
protected static Header createDataHeader(Protocol unicast, long seqno, short conn_id, boolean first) {
if(unicast instanceof UNICAST3)
return UnicastHeader3.createDataHeader(seqno, conn_id, first);
+ else if(unicast instanceof UNICAST4)
+ return UnicastHeader.createDataHeader(seqno, conn_id, first);
throw new IllegalArgumentException("protocol " + unicast.getClass().getSimpleName() + " needs to be UNICAST3");
}
@@ -242,8 +251,8 @@ protected void sendToEachOtherAndCheck(int num) throws Exception {
a.send(b_addr, i);
b.send(a_addr, i);
}
- List l1=r1.getMessages();
- List l2=r2.getMessages();
+ List l1=r1.list();
+ List l2=r2.list();
for(int i=0; i < 10; i++) {
if(l1.size() == num && l2.size() == num)
break;
@@ -255,70 +264,45 @@ protected void sendToEachOtherAndCheck(int num) throws Exception {
assert l2.size() == num;
}
- protected static void sendAndCheck(JChannel channel, Address dest, int num, MyReceiver receiver) throws Exception {
- receiver.clear();
- for(int i=1; i <= num; i++)
- channel.send(dest, i);
- List list=receiver.getMessages();
- for(int i=0; i < 20; i++) {
- if(list.size() == num)
- break;
- Util.sleep(500);
+ protected static void sendAndCheck(JChannel channel, Address dest, int num, MyReceiver r) throws Exception {
+ sendAndCheck(channel, dest, false, num, r);
+ }
+
+ protected static void sendAndCheck(JChannel channel, Address dest, boolean oob, int num, MyReceiver r) throws Exception {
+ r.reset();
+ for(int i=1; i <= num; i++) {
+ Message msg=new ObjectMessage(dest, i);
+ if(oob)
+ msg.setFlag(Message.Flag.OOB);
+ channel.send(msg);
}
- System.out.println("list = " + print(list));
+ List list=r.list();
+ Util.waitUntilTrue(10000, 500, () -> list.size() == num);
+ System.out.println("list = " + list);
int size=list.size();
assert size == num : "list has " + size + " elements (expected " + num + "): " + list;
}
- protected static void removeConnection(Protocol prot, Address target) {
+ protected static void removeConnection(Protocol prot, Address target) throws Exception {
removeConnection(prot, target, false);
}
- protected static void removeConnection(Protocol prot, Address target, boolean remove) {
- if(prot instanceof UNICAST3) {
- UNICAST3 unicast=(UNICAST3)prot;
- if(remove)
- unicast.removeReceiveConnection(target);
- else
- unicast.closeConnection(target);
- }
+ protected static void removeConnection(Protocol prot, Address target, boolean remove) throws Exception {
+ if(remove)
+ Util.invoke(prot, "removeReceiveConnection", target);
else
- throw new IllegalArgumentException("prot (" + prot + ") needs to be UNICAST3");
+ Util.invoke(prot, "closeConnection", target);
}
-
protected static String print(List list) {
return Util.printListWithDelimiter(list, " ");
}
-
protected static JChannel createChannel(Class extends Protocol> unicast_class, String name) throws Exception {
Protocol unicast=unicast_class.getDeclaredConstructor().newInstance();
return new JChannel(new SHARED_LOOPBACK(), unicast).name(name);
}
- protected static class MyReceiver implements Receiver {
- final String name;
- final List msgs=new ArrayList<>(20);
-
- public MyReceiver(String name) {
- this.name=name;
- }
-
- public void receive(Message msg) {
- synchronized(msgs) {
- msgs.add(msg.getObject());
- }
- }
-
- public List getMessages() {return msgs;}
- public void clear() {msgs.clear();}
- public int size() {return msgs.size();}
-
- public String toString() {
- return name;
- }
- }
protected static class Drop extends Protocol {
protected volatile boolean drop_next=false;
diff --git a/tests/junit-functional/org/jgroups/protocols/UNICAST_ContentionTest.java b/tests/junit-functional/org/jgroups/protocols/UNICAST_ContentionTest.java
index a95c53edbbf..bd004c7f34c 100644
--- a/tests/junit-functional/org/jgroups/protocols/UNICAST_ContentionTest.java
+++ b/tests/junit-functional/org/jgroups/protocols/UNICAST_ContentionTest.java
@@ -30,14 +30,15 @@ protected void tearDown() throws Exception {
@DataProvider
static Object[][] provider() {
return new Object[][] {
- {UNICAST3.class}
+ {UNICAST3.class},
+ {UNICAST4.class}
};
}
@Test(dataProvider="provider")
- public void testSimpleMessageReception(Class extends UNICAST3> unicast_class) throws Exception {
+ public void testSimpleMessageReception(Class extends Protocol> unicast_class) throws Exception {
a=create(unicast_class, "A");
b=create(unicast_class, "B");
MyReceiver r1=new MyReceiver("A"), r2=new MyReceiver("B");
@@ -56,25 +57,24 @@ public void testSimpleMessageReception(Class extends UNICAST3> unicast_class)
}
for(int i=0; i < 10; i++) {
- if(r1.getNum() == NUM * 2 && r2.getNum() == NUM * 2)
+ if(r1.num() == NUM * 2 && r2.num() == NUM * 2)
break;
Util.sleep(500);
}
- System.out.println("c1 received " + r1.getNum() + " msgs, " + getNumberOfRetransmissions(a) + " retransmissions");
- System.out.println("c2 received " + r2.getNum() + " msgs, " + getNumberOfRetransmissions(b) + " retransmissions");
+ System.out.printf("%s: %,d msgs, %s xmits\n", "c1", r1.num(),
+ Util.invoke(a.stack().findProtocol(Util.getUnicastProtocols()), "getNumXmits"));
+ System.out.printf("%s: %,d msgs, %s xmits\n", "c2", r2.num(),
+ Util.invoke(b.stack().findProtocol(Util.getUnicastProtocols()), "getNumXmits"));
- assert r1.getNum() == NUM * 2: "expected " + NUM *2 + ", but got " + r1.getNum();
- assert r2.getNum() == NUM * 2: "expected " + NUM *2 + ", but got " + r2.getNum();
+ assert r1.num() == NUM * 2: "expected " + NUM *2 + ", but got " + r1.num();
+ assert r2.num() == NUM * 2: "expected " + NUM *2 + ", but got " + r2.num();
}
- /**
- * Multiple threads (NUM_THREADS) send messages (NUM_MSGS)
- * @throws Exception
- */
+ /** Multiple threads (NUM_THREADS) send messages (NUM_MSGS) */
@Test(dataProvider="provider")
- public void testMessageReceptionUnderHighLoad(Class extends UNICAST3> unicast_class) throws Exception {
+ public void testMessageReceptionUnderHighLoad(Class extends Protocol> unicast_class) throws Exception {
CountDownLatch latch=new CountDownLatch(1);
a=create(unicast_class, "A");
b=create(unicast_class, "B");
@@ -107,31 +107,26 @@ public void testMessageReceptionUnderHighLoad(Class extends UNICAST3> unicast_
long NUM_EXPECTED_MSGS=NUM_THREADS * NUM_MSGS;
for(int i=0; i < 20; i++) {
- if(r1.getNum() == NUM_EXPECTED_MSGS && r2.getNum() == NUM_EXPECTED_MSGS)
+ if(r1.num() == NUM_EXPECTED_MSGS && r2.num() == NUM_EXPECTED_MSGS)
break;
Util.sleep(2000);
}
- System.out.println("c1 received " + r1.getNum() + " msgs, " + getNumberOfRetransmissions(a) + " retransmissions");
- System.out.println("c2 received " + r2.getNum() + " msgs, " + getNumberOfRetransmissions(b) + " retransmissions");
+ System.out.printf("%s: %,d msgs, %s xmits\n", "c1", r1.num(),
+ Util.invoke(a.stack().findProtocol(Util.getUnicastProtocols()), "getNumXmits"));
+ System.out.printf("%s: %,d msgs, %s xmits\n", "c2", r2.num(),
+ Util.invoke(b.stack().findProtocol(Util.getUnicastProtocols()), "getNumXmits"));
- assert r1.getNum() == NUM_EXPECTED_MSGS : "expected " + NUM_EXPECTED_MSGS + ", but got " + r1.getNum();
- assert r2.getNum() == NUM_EXPECTED_MSGS : "expected " + NUM_EXPECTED_MSGS + ", but got " + r2.getNum();
+ assert r1.num() == NUM_EXPECTED_MSGS : "expected " + NUM_EXPECTED_MSGS + ", but got " + r1.num();
+ assert r2.num() == NUM_EXPECTED_MSGS : "expected " + NUM_EXPECTED_MSGS + ", but got " + r2.num();
}
- protected static JChannel create(Class extends UNICAST3> unicast_class, String name) throws Exception {
- return new JChannel(new SHARED_LOOPBACK(),
- unicast_class.getDeclaredConstructor().newInstance().setXmitInterval(500)).name(name);
+ protected static JChannel create(Class extends Protocol> unicast_class, String name) throws Exception {
+ Protocol p=unicast_class.getDeclaredConstructor().newInstance();
+ Util.invoke(p, "setXmitInterval", 500L);
+ return new JChannel(new SHARED_LOOPBACK(), p).name(name);
}
- private static long getNumberOfRetransmissions(JChannel ch) {
- Protocol prot=ch.getProtocolStack().findProtocol(Util.getUnicastProtocols());
- if(prot instanceof UNICAST3)
- return ((UNICAST3)prot).getNumXmits();
- return 0;
- }
-
-
private static class MySender extends Thread {
private final JChannel ch;
@@ -178,11 +173,11 @@ public MyReceiver(String name) {
public void receive(Message msg) {
if(num.incrementAndGet() % MOD == 0) {
- System.out.println("[" + name + "] received " + getNum() + " msgs");
+ System.out.println("[" + name + "] received " + num() + " msgs");
}
}
- public int getNum() {
+ public int num() {
return num.get();
}
}
diff --git a/tests/junit-functional/org/jgroups/protocols/UNICAST_DropFirstAndLastTest.java b/tests/junit-functional/org/jgroups/protocols/UNICAST_DropFirstAndLastTest.java
index 7944ae5df31..6e3aa131ab7 100644
--- a/tests/junit-functional/org/jgroups/protocols/UNICAST_DropFirstAndLastTest.java
+++ b/tests/junit-functional/org/jgroups/protocols/UNICAST_DropFirstAndLastTest.java
@@ -14,7 +14,7 @@
import java.util.List;
/**
- * Tests UNICAST2. Created to test the last-message-dropped problem, see https://issues.redhat.com/browse/JGRP-1548.
+ * Tests UNICAST{3,4}. Created to test the last-message-dropped problem, see https://issues.redhat.com/browse/JGRP-1548.
* @author Bela Ban
* @since 3.3
*/
@@ -24,7 +24,7 @@ public class UNICAST_DropFirstAndLastTest {
protected MyReceiver rb;
protected DISCARD discard; // on A
- protected void setup(Class extends UNICAST3> unicast_class) throws Exception {
+ protected void setup(Class extends Protocol> unicast_class) throws Exception {
a=createChannel(unicast_class, "A");
discard=a.getProtocolStack().findProtocol(DISCARD.class);
assert discard != null;
@@ -39,9 +39,10 @@ protected void setup(Class extends UNICAST3> unicast_class) throws Exception {
@DataProvider
- static Object[][] configProvider() {
+ static Object[][] createUnicast() {
return new Object[][]{
- {UNICAST3.class}
+ {UNICAST3.class},
+ {UNICAST4.class}
};
}
@@ -50,13 +51,13 @@ static Object[][] configProvider() {
* https://issues.redhat.com/browse/JGRP-1548 now needs to make sure message 5 is retransmitted to B
* within a short time period, and we don't have to rely on the stable task to kick in.
*/
- @Test(dataProvider="configProvider")
- public void testLastMessageDropped(Class extends UNICAST3> unicast_class) throws Exception {
+ @Test(dataProvider="createUnicast")
+ public void testLastMessageDropped(Class extends Protocol> unicast_class) throws Exception {
setup(unicast_class);
setLevel("trace", a, b);
Address dest=b.getAddress();
for(int i=1; i <= 5; i++) {
- Message msg=new BytesMessage(dest, i);
+ Message msg=new ObjectMessage(dest, i);
if(i == 5)
discard.dropDownUnicasts(1); // drops the next unicast
a.send(msg);
@@ -72,22 +73,21 @@ public void testLastMessageDropped(Class extends UNICAST3> unicast_class) thro
* https://issues.redhat.com/browse/JGRP-1563 now needs to make sure message 1 is retransmitted to B
* within a short time period, and we don't have to rely on the stable task to kick in.
*/
- @Test(dataProvider="configProvider")
- public void testFirstMessageDropped(Class extends UNICAST3> unicast_class) throws Exception {
+ @Test(dataProvider="createUnicast")
+ public void testFirstMessageDropped(Class extends Protocol> unicast_class) throws Exception {
setup(unicast_class);
-
System.out.println("**** closing all connections ****");
// close all connections, so we can start from scratch and send message A1 to B
for(JChannel ch: Arrays.asList(a,b)) {
Protocol unicast=ch.getProtocolStack().findProtocol(Util.getUnicastProtocols());
- removeAllConnections(unicast);
+ Util.invoke(unicast, "removeAllConnections");
}
setLevel("trace", a, b);
System.out.println("--> A sending first message to B (dropped before it reaches B)");
discard.dropDownUnicasts(1); // drops the next unicast
- a.send(new BytesMessage(b.getAddress(), 1));
+ a.send(new ObjectMessage(b.getAddress(), 1));
List msgs=rb.list();
try {
@@ -104,22 +104,23 @@ public void testFirstMessageDropped(Class extends UNICAST3> unicast_class) thr
}
- protected static JChannel createChannel(Class extends UNICAST3> unicast_class, String name) throws Exception {
- UNICAST3 unicast=unicast_class.getDeclaredConstructor().newInstance();
+ protected static JChannel createChannel(Class extends Protocol> unicast_class, String name) throws Exception {
+ Protocol unicast=unicast_class.getDeclaredConstructor().newInstance();
+ Util.invoke(unicast, "setXmitInterval", 500L);
return new JChannel(new SHARED_LOOPBACK(),
new SHARED_LOOPBACK_PING(),
new NAKACK2().useMcastXmit(false),
new DISCARD(),
- unicast.setXmitInterval(500),
+ unicast,
new GMS().printLocalAddress(false))
.name(name);
}
- protected void printConnectionTables(JChannel ... channels) {
+ protected static void printConnectionTables(JChannel... channels) throws Exception {
System.out.println("**** CONNECTIONS:");
for(JChannel ch: channels) {
- Protocol ucast=ch.getProtocolStack().findProtocol(Util.getUnicastProtocols());
- System.out.println(ch.getName() + ":\n" + printConnections(ucast) + "\n");
+ Protocol p=ch.stack().findProtocol(Util.getUnicastProtocols());
+ System.out.printf("%s:\n%s\n", ch.name(), Util.invoke(p, "printConnections"));
}
}
@@ -128,22 +129,5 @@ protected static void setLevel(String level, JChannel... channels) {
ch.getProtocolStack().findProtocol(Util.getUnicastProtocols()).level(level);
}
- protected String printConnections(Protocol prot) {
- if(prot instanceof UNICAST3) {
- UNICAST3 unicast=(UNICAST3)prot;
- return unicast.printConnections();
- }
- throw new IllegalArgumentException("prot (" + prot + ") needs to be UNICAST3");
- }
-
- protected static void removeAllConnections(Protocol prot) {
- if(prot instanceof UNICAST3) {
- UNICAST3 unicast=(UNICAST3)prot;
- unicast.removeAllConnections();
- }
- else
- throw new IllegalArgumentException("prot (" + prot + ") needs to be UNICAST3");
- }
-
}
diff --git a/tests/junit-functional/org/jgroups/protocols/UNICAST_DroppedAckTest.java b/tests/junit-functional/org/jgroups/protocols/UNICAST_DroppedAckTest.java
index 67a66056c19..c456c51dc35 100644
--- a/tests/junit-functional/org/jgroups/protocols/UNICAST_DroppedAckTest.java
+++ b/tests/junit-functional/org/jgroups/protocols/UNICAST_DroppedAckTest.java
@@ -4,6 +4,7 @@
import org.jgroups.JChannel;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
+import org.jgroups.stack.Protocol;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
@@ -15,11 +16,11 @@
* @author Bela Ban
* @since 3.3
*/
-@Test(groups=Global.FUNCTIONAL,singleThreaded=true)
+@Test(groups=Global.FUNCTIONAL,singleThreaded=true,dataProvider="configProvider")
public class UNICAST_DroppedAckTest {
protected JChannel a, b;
- protected void setup(Class extends UNICAST3> unicast_class) throws Exception {
+ protected void setup(Class extends Protocol> unicast_class) throws Exception {
a=createChannel(unicast_class, "A");
b=createChannel(unicast_class, "B");
a.connect("UNICAST_DroppedAckTest");
@@ -32,39 +33,41 @@ protected void setup(Class extends UNICAST3> unicast_class) throws Exception {
@DataProvider
static Object[][] configProvider() {
return new Object[][]{
- {UNICAST3.class}
+ {UNICAST3.class},
+ {UNICAST4.class}
};
}
- @Test(dataProvider="configProvider")
- public void testNotEndlessXmits(Class extends UNICAST3> unicast_class) throws Exception {
+ public void testNotEndlessXmits(Class extends Protocol> unicast_class) throws Exception {
setup(unicast_class);
-
- DISCARD discard_a=a.getProtocolStack().findProtocol(DISCARD.class);
+ DISCARD discard_a=a.stack().findProtocol(DISCARD.class);
discard_a.dropDownUnicasts(5); // drops the next 5 ACKs
for(int i=1; i <= 5; i++)
b.send(a.getAddress(), i);
- UNICAST3 ub=b.getProtocolStack().findProtocol(UNICAST3.class);
+ Protocol ub=b.getProtocolStack().findProtocol(Util.getUnicastProtocols());
for(int i=0; i < 10; i++) {
- int num_unacked_msgs=ub.getNumUnackedMessages();
+ int num_unacked_msgs=(int)Util.invoke(ub, "getNumUnackedMessages");
System.out.println("num_unacked_msgs=" + num_unacked_msgs);
if(num_unacked_msgs == 0)
break;
Util.sleep(1000);
}
- assert ub.getNumUnackedMessages() == 0 : "num_unacked_msgs on B should be 0 but is " + ub.getNumUnackedMessages();
+ assert 0 == (int)Util.invoke(ub, "getNumUnackedMessages")
+ : "num_unacked_msgs on B should be 0 but is " + Util.invoke(ub, "getNumUnackedMessages");
}
- protected static JChannel createChannel(Class extends UNICAST3> unicast_class, String name) throws Exception {
+ protected static JChannel createChannel(Class extends Protocol> unicast_class, String name) throws Exception {
+ Protocol p=unicast_class.getDeclaredConstructor().newInstance();
+ Util.invoke(p, "setXmitInterval", 500L);
return new JChannel(new SHARED_LOOPBACK(),
new SHARED_LOOPBACK_PING(),
new MERGE3().setMaxInterval(3000).setMinInterval(1000),
new NAKACK2(),
new DISCARD(),
- unicast_class.getDeclaredConstructor().newInstance().setXmitInterval(500),
+ p,
new GMS()).name(name);
}
}
diff --git a/tests/junit-functional/org/jgroups/protocols/UNICAST_MessagesToSelfTest.java b/tests/junit-functional/org/jgroups/protocols/UNICAST_MessagesToSelfTest.java
index 9f4efa68ef2..ef37668b900 100644
--- a/tests/junit-functional/org/jgroups/protocols/UNICAST_MessagesToSelfTest.java
+++ b/tests/junit-functional/org/jgroups/protocols/UNICAST_MessagesToSelfTest.java
@@ -18,28 +18,26 @@
/**
- * Tests the UNICAST{2,3} protocols with messages sent by member A to itself
+ * Tests the UNICAST{3,4} protocols with messages sent by member A to itself
* @author Bela Ban
*/
-@Test(groups=Global.FUNCTIONAL,singleThreaded=true)
+@Test(groups=Global.FUNCTIONAL,singleThreaded=true,dataProvider="createUnicast")
public class UNICAST_MessagesToSelfTest {
protected JChannel ch;
protected Address a1;
-
- static final int SIZE=1000; // bytes
- static final int NUM_MSGS=10000;
-
+ static final int SIZE=1000; // bytes
+ static final int NUM_MSGS=10_000;
@AfterMethod void tearDown() throws Exception {Util.close(ch);}
@DataProvider
- static Object[][] configProvider() {
+ static Object[][] createUnicast() {
return new Object[][] {
- {new UNICAST3()}
+ {new UNICAST3()},
+ {new UNICAST4()}
};
}
- @Test(dataProvider="configProvider")
public void testReceptionOfAllMessages(Protocol prot) throws Throwable {
System.out.println("prot=" + prot.getClass().getSimpleName());
ch=createChannel(prot, null).name("A");
@@ -49,8 +47,6 @@ public void testReceptionOfAllMessages(Protocol prot) throws Throwable {
_testReceptionOfAllMessages();
}
-
- @Test(dataProvider="configProvider")
public void testReceptionOfAllMessagesWithDISCARD(Protocol prot) throws Throwable {
System.out.println("prot=" + prot.getClass().getSimpleName());
DISCARD discard=new DISCARD();
@@ -62,8 +58,6 @@ public void testReceptionOfAllMessagesWithDISCARD(Protocol prot) throws Throwabl
_testReceptionOfAllMessages();
}
-
-
private static byte[] createPayload(int size, int seqno) {
return ByteBuffer.allocate(size).putInt(seqno).array();
}
diff --git a/tests/junit-functional/org/jgroups/protocols/UNICAST_OOB_Test.java b/tests/junit-functional/org/jgroups/protocols/UNICAST_OOB_Test.java
index 36b303219d9..11140eac6ff 100644
--- a/tests/junit-functional/org/jgroups/protocols/UNICAST_OOB_Test.java
+++ b/tests/junit-functional/org/jgroups/protocols/UNICAST_OOB_Test.java
@@ -6,13 +6,12 @@
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
+import org.jgroups.util.MyReceiver;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
/**
@@ -20,97 +19,127 @@
* https://issues.redhat.com/browse/JGRP-2327
* @author Bela Ban
*/
-@Test(groups=Global.FUNCTIONAL,singleThreaded=true)
+@Test(groups=Global.FUNCTIONAL,singleThreaded=true,dataProvider="createUnicast")
public class UNICAST_OOB_Test {
protected JChannel a, b;
-
protected static final long XMIT_INTERVAL=500;
- @BeforeMethod
- void setup() throws Exception {
- a=createChannel("A");
- b=createChannel("B");
+ @DataProvider
+ static Object[][] createUnicast() {
+ return new Object[][]{
+ {UNICAST3.class},
+ {UNICAST4.class}
+ };
+ }
+
+ protected void setup(Class extends Protocol> unicast_class) throws Exception {
+ a=createChannel("A", unicast_class);
+ b=createChannel("B", unicast_class);
a.connect("UNICAST_OOB_Test");
b.connect("UNICAST_OOB_Test");
+ Util.waitUntilAllChannelsHaveSameView(3000, 100, a,b);
+ System.out.printf("-- cluster formed: %s\n", b.view());
}
@AfterMethod
- void tearDown() throws Exception {
- Util.close(b,a);
- }
+ protected void tearDown() throws Exception {Util.close(b,a);}
- public void testRegularMessages() throws Exception {
+ public void testRegularMessages(Class extends Protocol> unicast_class) throws Exception {
+ setup(unicast_class);
sendMessages(false);
}
- public void testOutOfBandMessages() throws Exception {
+ public void testOutOfBandMessages(Class extends Protocol> unicast_class) throws Exception {
+ setup(unicast_class);
sendMessages(true);
}
-
/**
Tests the case where B sends B1 and B2, but A receives B2 first (discards it) and requests retransmission of B1.
JIRA: https://issues.redhat.com/browse/JGRP-2327
*/
- public void testSecondMessageReceivedFirstRegular() throws Exception {
+ public void testSecondMessageReceivedFirstRegular(Class extends Protocol> unicast_class) throws Exception {
+ setup(unicast_class);
_testSecondMessageReceivedFirst(false, false);
}
- public void testSecondMessageReceivedFirstRegularBatched() throws Exception {
- _testSecondMessageReceivedFirst(false, true);
+ public void testSecondMessageReceivedFirstRegularBatched(Class extends Protocol> unicast_class) throws Exception {
+ setup(unicast_class);
+ _testSecondMessageReceivedFirst(false, true);
}
- public void testSecondMessageReceivedFirstOOB() throws Exception {
+ public void testSecondMessageReceivedFirstOOB(Class extends Protocol> unicast_class) throws Exception {
+ setup(unicast_class);
_testSecondMessageReceivedFirst(true, false);
}
- public void testSecondMessageReceivedFirstOOBBatched() throws Exception {
+ // @Test(invocationCount=100)
+ public void testSecondMessageReceivedFirstOOBBatched(Class extends Protocol> unicast_class) throws Exception {
+ setup(unicast_class);
_testSecondMessageReceivedFirst(true, true);
}
protected void _testSecondMessageReceivedFirst(boolean oob, boolean use_batches) throws Exception {
Address dest=a.getAddress(), src=b.getAddress();
- UNICAST3 u_a=a.getProtocolStack().findProtocol(UNICAST3.class), u_b=b.getProtocolStack().findProtocol(UNICAST3.class);
- u_a.removeReceiveConnection(src);
- u_a.removeSendConnection(src);
- u_b.removeReceiveConnection(dest);
- u_b.removeSendConnection(dest);
- System.out.println("=============== removed connection between A and B ===========");
+ Protocol u_a=a.getProtocolStack().findProtocol(Util.getUnicastProtocols()),
+ u_b=b.getProtocolStack().findProtocol(Util.getUnicastProtocols());
+ for(int i=0; i < 10; i++) {
+ Util.invoke(u_a, "removeReceiveConnection", src);
+ Util.invoke(u_a, "removeSendConnection", src);
+ Util.invoke(u_b, "removeReceiveConnection", dest);
+ Util.invoke(u_b, "removeSendConnection", dest);
+ int num_connections=(int)Util.invoke(u_a, "getNumConnections") +
+ (int)Util.invoke(u_b, "getNumConnections");
+ if(num_connections == 0)
+ break;
+ Util.sleep(100);
+ }
+ System.out.println("=============== removed connections between A and B ===========");
- REVERSE reverse=new REVERSE().numMessagesToReverse(5)
- .filter(msg -> msg.getDest() != null && src.equals(msg.getSrc()) && (msg.getFlags(false) == 0 || msg.isFlagSet(Message.Flag.OOB)));
- a.getProtocolStack().insertProtocol(reverse, ProtocolStack.Position.BELOW, UNICAST3.class);
+ Protocol reverse=new REVERSE().numMessagesToReverse(5)
+ .filter(msg -> msg.getDest() != null && src.equals(msg.src()) && (msg.getFlags(false) == 0 || msg.isFlagSet(Message.Flag.OOB)));
+ // REVERSE2 reverse=new REVERSE2().filter(m -> m.dest() != null && m.isFlagSet(Message.Flag.OOB) && src.equals(m.src()));
+ a.getProtocolStack().insertProtocol(reverse, ProtocolStack.Position.BELOW, UNICAST3.class,UNICAST4.class);
if(use_batches) {
MAKE_BATCH mb=new MAKE_BATCH().unicasts(true).setAddress(dest);
- a.getProtocolStack().insertProtocol(mb, ProtocolStack.Position.BELOW, UNICAST3.class);
+ a.getProtocolStack().insertProtocol(mb, ProtocolStack.Position.BELOW, UNICAST3.class,UNICAST4.class);
mb.start();
}
- MyReceiver r=new MyReceiver();
+ MyReceiver r=new MyReceiver().name(a.getName()).verbose(true);
a.setReceiver(r);
- System.out.println("========== B sends messages 1-5 to A ==========");
+ System.out.printf("========== B sending %s messages 1-5 to A ==========\n", oob? "OOB" : "regular");
+ //u_a.setLevel("trace"); u_b.setLevel("trace");
+
long start=System.currentTimeMillis();
for(int i=1; i <= 5; i++) {
- Message msg=new BytesMessage(dest, (long)i);
+ Message msg=new ObjectMessage(dest, (long)i);
if(oob) msg.setFlag(Message.Flag.OOB);
b.send(msg);
+ System.out.printf("-- %s: sent %s, hdrs: %s\n", b.address(), msg, msg.printHeaders());
}
- Util.waitUntil(10000, 10, () -> r.size() == 5);
+ if(reverse instanceof REVERSE2) {
+ REVERSE2 rr=((REVERSE2)reverse);
+ Util.waitUntilTrue(2000, 100, () -> rr.size() == 5);
+ rr.filter(null); // from now on, all msgs are passed up
+ rr.deliver();
+ }
+
+ Util.waitUntil(5000, 100, () -> r.size() == 5,
+ () -> String.format("expected 5 messages but got %s", r.list()));
long time=System.currentTimeMillis() - start;
- System.out.printf("===== list: %s (in %d ms)\n", r.getSeqnos(), time);
+ System.out.printf("===== list: %s (in %d ms)\n", r.list(), time);
long expected_time=XMIT_INTERVAL * 10; // increased because times might increase with the increase in parallel tests
- assert time < XMIT_INTERVAL *2 : String.format("expected a time < %d ms, but got %d ms", expected_time, time);
+ assert time < expected_time : String.format("expected a time < %d ms, but got %d ms", expected_time, time);
}
- /**
- * Check that 4 is received before 3
- */
+ /** Check that 4 is received before 3 */
private void sendMessages(boolean oob) throws Exception {
DISCARD_PAYLOAD discard=new DISCARD_PAYLOAD();
- MyReceiver receiver=new MyReceiver();
+ MyReceiver receiver=new MyReceiver<>();
b.setReceiver(receiver);
// the first channel will discard the unicast messages with seqno #3 two times, the let them pass down
@@ -127,20 +156,12 @@ private void sendMessages(boolean oob) throws Exception {
msg.setFlag(Message.Flag.OOB);
System.out.println("-- sending message #" + i);
a.send(msg);
- Util.sleep(100);
}
// wait until retransmission of seqno #3 happens, so that 4 and 5 are received as well
- long target_time=System.currentTimeMillis() + 30000;
- do {
- if(receiver.size() >= 5)
- break;
- Util.sleep(500);
- }
- while(target_time > System.currentTimeMillis());
+ Util.waitUntilTrue(3000, 500, () -> receiver.size() >= 5);
-
- List seqnos=receiver.getSeqnos();
+ List seqnos=receiver.list();
System.out.println("-- sequence numbers: " + seqnos);
assert seqnos.size() == 5;
@@ -161,40 +182,18 @@ private void sendMessages(boolean oob) throws Exception {
}
}
-
- protected static JChannel createChannel(String name) throws Exception {
+ protected static JChannel createChannel(String name, Class extends Protocol> unicast_class) throws Exception {
+ Protocol p=unicast_class.getConstructor().newInstance();
+ Util.invoke(p, "setXmitInterval", XMIT_INTERVAL);
return new JChannel(
- new SHARED_LOOPBACK(),
+ new SHARED_LOOPBACK(), // .bundler("nb"),
new SHARED_LOOPBACK_PING(),
new NAKACK2(),
- new UNICAST3().setXmitInterval(XMIT_INTERVAL),
- new GMS())
+ p,
+ new GMS().printLocalAddress(false).setViewAckCollectionTimeout(100))
.name(name);
}
- public static class MyReceiver implements Receiver {
- /** List of unicast sequence numbers */
- List seqnos=Collections.synchronizedList(new LinkedList<>());
-
- public MyReceiver() {
- }
-
- public List getSeqnos() {
- return seqnos;
- }
-
- public void receive(Message msg) {
- if(msg != null) {
- Long num=msg.getObject();
- System.out.println(">> received " + num);
- seqnos.add(num);
- }
- }
-
- public int size() {return seqnos.size();}
- }
-
-
}
diff --git a/tests/junit-functional/org/jgroups/protocols/UNICAST_RetransmitTest.java b/tests/junit-functional/org/jgroups/protocols/UNICAST_RetransmitTest.java
index edf747da09e..fb94ac5f197 100644
--- a/tests/junit-functional/org/jgroups/protocols/UNICAST_RetransmitTest.java
+++ b/tests/junit-functional/org/jgroups/protocols/UNICAST_RetransmitTest.java
@@ -1,14 +1,17 @@
package org.jgroups.protocols;
-import org.jgroups.*;
+import org.jgroups.Address;
+import org.jgroups.Global;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
+import org.jgroups.util.MyReceiver;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import java.util.ArrayList;
import java.util.List;
/**
@@ -16,14 +19,21 @@
* @author Bela Ban
* @since 3.6
*/
-@Test(groups=Global.FUNCTIONAL,singleThreaded=true)
+@Test(groups=Global.FUNCTIONAL,singleThreaded=true,dataProvider="createUnicast")
public class UNICAST_RetransmitTest {
protected JChannel a, b;
protected static final int MAX_BUNDLE_SIZE=10000;
protected static final int NUM_MSGS=50000;
- @BeforeMethod
- protected void setup() throws Exception {
+ @DataProvider
+ static Object[][] createUnicast() {
+ return new Object[][]{
+ {UNICAST3.class},
+ {UNICAST4.class}
+ };
+ }
+
+ protected void setup(Class extends Protocol> unicast_class) throws Exception {
a=new JChannel(Util.getTestStack()).name("A");
b=new JChannel(Util.getTestStack()).name("B");
change(a, b);
@@ -37,17 +47,18 @@ protected void setup() throws Exception {
/**
- * Sends a number of messages, but discards every other message. The retransmission task in UNICAST3 is initially
+ * Sends a number of messages, but discards every other message. The retransmission task in UNICAST3/4 is initially
* disabled. Then starts the retransmission task, which should generate an XMIT-REQ which is larger than
* TP.max_bundle_size, leading to endless retransmissions. With JGRP-1868 resolved, the receiver should get
* all messages.
*
* https://issues.redhat.com/browse/JGRP-1868
*/
- public void testLargeRetransmission() throws Exception {
- MyReceiver receiver=new MyReceiver();
+ public void testLargeRetransmission(Class extends Protocol> unicast_class) throws Exception {
+ setup(unicast_class);
+ MyReceiver receiver=new MyReceiver<>();
b.setReceiver(receiver);
- List list=receiver.getList();
+ List list=receiver.list();
stopRetransmission(a, b);
@@ -72,43 +83,28 @@ public void testLargeRetransmission() throws Exception {
setLevel("warn", a,b);
}
-
-
- protected static void change(JChannel ... channels) {
+ protected static void change(JChannel ... channels) throws Exception {
for(JChannel ch: channels) {
TP transport=ch.getProtocolStack().getTransport();
transport.getBundler().setMaxSize(MAX_BUNDLE_SIZE);
- UNICAST3 ucast=ch.getProtocolStack().findProtocol(UNICAST3.class);
+ Protocol ucast=ch.getProtocolStack().findProtocol(UNICAST3.class,UNICAST4.class);
if(ucast == null)
- throw new IllegalStateException("UNICAST3 not present in the stack");
- ucast.setMaxXmitReqSize(5000);
+ throw new IllegalStateException("UNICAST prototocol not found in the stack");
+ Util.invoke(ucast, "setMaxXmitReqSize", 5000);
}
}
-
- protected static class MyReceiver implements Receiver {
- protected final List list=new ArrayList<>();
-
- public void receive(Message msg) {
- Integer num=msg.getObject();
- list.add(num);
- }
-
- public List getList() {return list;}
- }
-
-
- protected static void stopRetransmission(JChannel... channels) {
+ protected static void stopRetransmission(JChannel... channels) throws Exception {
for(JChannel ch: channels) {
- UNICAST3 ucast=ch.getProtocolStack().findProtocol(UNICAST3.class);
- ucast.stopRetransmitTask();
+ UNICAST3 ucast=ch.getProtocolStack().findProtocol(UNICAST3.class, UNICAST4.class);
+ Util.invoke(ucast, "stopRetransmitTask");
}
}
- protected static void startRetransmission(JChannel... channels) {
+ protected static void startRetransmission(JChannel... channels) throws Exception {
for(JChannel ch: channels) {
- UNICAST3 ucast=ch.getProtocolStack().findProtocol(UNICAST3.class);
- ucast.startRetransmitTask();
+ Protocol ucast=ch.getProtocolStack().findProtocol(UNICAST3.class, UNICAST4.class);
+ Util.invoke(ucast, "startRetransmitTask");
}
}
@@ -123,7 +119,7 @@ protected static void removeDiscardProtocol(JChannel ch) {
protected static void setLevel(String level, JChannel ... channels) {
for(JChannel ch: channels) {
- Protocol prot=ch.getProtocolStack().findProtocol(UNICAST3.class);
+ Protocol prot=ch.getProtocolStack().findProtocol(UNICAST3.class,UNICAST4.class);
prot.level(level);
}
}
diff --git a/tests/junit-functional/org/jgroups/protocols/UNICAST3_Test.java b/tests/junit-functional/org/jgroups/protocols/UNICAST_Test.java
similarity index 60%
rename from tests/junit-functional/org/jgroups/protocols/UNICAST3_Test.java
rename to tests/junit-functional/org/jgroups/protocols/UNICAST_Test.java
index 0eede490b59..13d6589092e 100644
--- a/tests/junit-functional/org/jgroups/protocols/UNICAST3_Test.java
+++ b/tests/junit-functional/org/jgroups/protocols/UNICAST_Test.java
@@ -7,35 +7,39 @@
import org.jgroups.util.MessageBatch;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
/**
* @author Bela Ban
* @since 4.0
*/
-@Test(groups=Global.FUNCTIONAL,singleThreaded=true)
-public class UNICAST3_Test {
+@Test(groups=Global.FUNCTIONAL,singleThreaded=true,dataProvider="createUnicast")
+public class UNICAST_Test {
protected JChannel a, b;
- protected Address a_addr, b_addr;
- protected MyReceiver receiver=new MyReceiver();
- protected UNICAST3 uni_a, uni_b;
- protected DropUnicastAck drop_ack=new DropUnicastAck((short)499);
+ protected Protocol uni_a, uni_b;
protected static final short UNICAST3_ID=ClassConfigurator.getProtocolId(UNICAST3.class);
+ protected static final short UNICAST4_ID=ClassConfigurator.getProtocolId(UNICAST4.class);
protected static final int CONN_CLOSE_TIMEOUT=60_000; // change to a low value (e.g. 1000) to make this test fail
- @BeforeMethod protected void setup() throws Exception {
- a=create("A").connect(getClass().getSimpleName());
- b=create("B").connect(getClass().getSimpleName());
- a_addr=a.getAddress();
- b_addr=b.getAddress();
- uni_a=a.getProtocolStack().findProtocol(UNICAST3.class);
- uni_b=b.getProtocolStack().findProtocol(UNICAST3.class);
- b.getProtocolStack().insertProtocol(drop_ack, ProtocolStack.Position.BELOW, UNICAST3.class);
+ @DataProvider
+ static Object[][] createUnicast() {
+ return new Object[][]{
+ {UNICAST3.class},
+ {UNICAST4.class}
+ };
}
- @AfterMethod protected void destroy() {Util.close(b, a);}
+ protected void setup(Class extends Protocol> unicast_class) throws Exception {
+ a=create("A", unicast_class).connect(getClass().getSimpleName());
+ b=create("B", unicast_class).connect(getClass().getSimpleName());
+ uni_a=a.getProtocolStack().findProtocol(UNICAST3.class,UNICAST4.class);
+ uni_b=b.getProtocolStack().findProtocol(UNICAST3.class,UNICAST4.class);
+ DropUnicastAck drop_ack=new DropUnicastAck((short)499);
+ b.getProtocolStack().insertProtocol(drop_ack, ProtocolStack.Position.BELOW, UNICAST3.class,ReliableUnicast.class);
+ }
+ @AfterMethod protected void destroy() {Util.close(b, a);}
/**
- A and B exchanging unicast messages
@@ -43,8 +47,8 @@ public class UNICAST3_Test {
- B adds it to its table for A, and sends the ACK, then delivers the message
- The ack for 499 from B to A is dropped
- B excludes A (but A doesn't exclude B) and removes A's table
- * This happens only if conn_close_timeout is small (default: 10s)
- * If conn_close_timeout == 0, connections will not be removed
+ * This happens only if conn_close_timeout is small (default: 10s)
+ * If conn_close_timeout == 0, connections will not be removed
- A retransmits 499 to B
- B receives A:499, but asks for the first seqno
- A has its highest seqno acked at 498, so resends 499 with first==true
@@ -52,54 +56,48 @@ public class UNICAST3_Test {
The issue is fixed by setting CONN_CLOSE_TIMEOUT to a highher value, or to 0
*/
- public void testDuplicateMessageDelivery() throws Exception {
+ public void testDuplicateMessageDelivery(Class extends Protocol> unicast_class) throws Exception {
+ setup(unicast_class);
+ Address a_addr=a.getAddress();
+ Address b_addr=b.getAddress();
+ MyReceiver receiver=new MyReceiver();
b.setReceiver(receiver);
for(int i=1; i < 500; i++)
a.send(b_addr, i);
- for(int i=0; i < 10; i++) {
- if(receiver.count >= 499)
- break;
- Util.sleep(50);
- }
+ Util.waitUntilTrue(500, 50, () -> receiver.count >= 499);
System.out.printf("B: received %d messages from A\n", receiver.count);
assert receiver.count == 499;
// remove A's receive window in B:
System.out.printf("-- closing the receive-window for %s:\n", a_addr);
// e.g. caused by an asymmetric network split: B excludes A, but not vice versa
- uni_b.closeReceiveConnection(a_addr);
-
+ Util.invoke(uni_b, "closeReceiveConnection", a_addr);
uni_a.setLevel("trace");
uni_b.setLevel("trace");
- uni_b.setConnCloseTimeout(CONN_CLOSE_TIMEOUT);
+ Util.invoke(uni_b, "setConnCloseTimeout", CONN_CLOSE_TIMEOUT);
// wait until B closes the receive window for A:
for(int i=0; i < 10; i++) {
- if(uni_b.getNumReceiveConnections() == 0)
+ int num_recv_conns=(int)Util.invoke(uni_b, "getNumReceiveConnections");
+ if(num_recv_conns == 0)
break;
Util.sleep(500);
}
- // assert uni_b.getNumReceiveConnections() > 0;
-
// remove the DropUnicastAck protocol:
System.out.printf("-- removing the %s protocol\n", DropUnicastAck.class.getSimpleName());
b.getProtocolStack().removeProtocol(DropUnicastAck.class);
- for(int i=0; i < 20; i++) {
- if(receiver.count >= 500)
- break;
- Util.sleep(100);
- }
+ Util.waitUntilTrue(2000, 200, () -> receiver.count >= 500);
System.out.printf("B: received %d messages from A\n", receiver.count);
assert receiver.count == 499 : String.format("received %d messages, but should only have received 499", receiver.count);
}
-
- protected static JChannel create(String name) throws Exception {
- return new JChannel(new SHARED_LOOPBACK(), new SHARED_LOOPBACK_PING(), new UNICAST3()).name(name);
+ protected static JChannel create(String name, Class extends Protocol> unicast_class) throws Exception {
+ Protocol ucast=unicast_class.getDeclaredConstructor().newInstance();
+ return new JChannel(new SHARED_LOOPBACK(), new SHARED_LOOPBACK_PING(), ucast).name(name);
}
/**
@@ -116,10 +114,20 @@ public DropUnicastAck(short start_drop_ack) {
}
public Object down(Message msg) {
- UnicastHeader3 hdr=msg.getHeader(UNICAST3_ID);
- if(hdr != null && hdr.type() == UnicastHeader3.ACK && hdr.seqno() == start_drop_ack) {
- discarding=true;
- hdr.seqno=start_drop_ack-1; // change 499 to 489, so A nevers gets the 499 ACK
+ UnicastHeader3 hdr3=msg.getHeader(UNICAST3_ID);
+ UnicastHeader hdr4=msg.getHeader(UNICAST4_ID);
+
+ if(hdr3 != null) {
+ if(hdr3.type() == UnicastHeader3.ACK && hdr3.seqno() == start_drop_ack) {
+ discarding=true;
+ hdr3.seqno=start_drop_ack-1; // change 499 to 489, so A nevers gets the 499 ACK
+ }
+ }
+ else {
+ if(hdr4.type() == UnicastHeader.ACK && hdr4.seqno() == start_drop_ack) {
+ discarding=true;
+ hdr4.seqno=start_drop_ack-1; // change 499 to 489, so A nevers gets the 499 ACK
+ }
}
return down_prot.down(msg);
}
diff --git a/tests/junit-functional/org/jgroups/tests/AverageTest.java b/tests/junit-functional/org/jgroups/tests/AverageTest.java
index 1f6d0fdc529..df85805ab4f 100644
--- a/tests/junit-functional/org/jgroups/tests/AverageTest.java
+++ b/tests/junit-functional/org/jgroups/tests/AverageTest.java
@@ -3,9 +3,11 @@
import org.jgroups.Global;
import org.jgroups.util.Average;
import org.jgroups.util.AverageMinMax;
+import org.jgroups.util.ByteArray;
import org.jgroups.util.Util;
import org.testng.annotations.Test;
+import java.io.IOException;
import java.util.stream.IntStream;
/**
@@ -81,7 +83,7 @@ public void testMerge() {
assert avg1.max() == 2;
}
- public void testMerger2() {
+ public void testMerge2() {
AverageMinMax avg1=new AverageMinMax(10000), avg2=new AverageMinMax(10000);
IntStream.rangeClosed(1, 10000).forEach(i -> avg2.add(2));
System.out.printf("avg1: %s, avg2: %s\n", avg1, avg2);
@@ -93,10 +95,34 @@ public void testMerger2() {
assert avg1.max() == 2;
}
+ public void testMerge3() {
+ AverageMinMax avg1=new AverageMinMax(100), avg2=new AverageMinMax(200);
+ IntStream.rangeClosed(1, 100).forEach(i -> avg1.add(1));
+ IntStream.rangeClosed(1, 200).forEach(i -> avg2.add(2));
+ System.out.printf("avg1: %s, avg2: %s\n", avg1, avg2);
+ avg1.merge(avg2);
+ System.out.printf("merged avg1: %s\n", avg1);
+ assert avg1.count() == 300;
+ assert avg1.average() == 2.0;
+ assert avg1.min() == 1;
+ assert avg1.max() == 2;
+ }
+
public void testAverageWithNoElements() {
Average avg=new AverageMinMax();
double av=avg.average();
assert av == 0.0;
}
+ public void testSerialization() throws IOException, ClassNotFoundException {
+ Average avg=new Average(128);
+ for(int i=0; i < 100; i++)
+ avg.add(Util.random(128));
+ ByteArray buf=Util.objectToBuffer(avg);
+ Average avg2=Util.objectFromBuffer(buf, null);
+ assert avg2 != null;
+ assert avg.count() == avg2.count();
+ assert avg.average() == avg2.average();
+ }
+
}
diff --git a/tests/junit-functional/org/jgroups/tests/BatchMessageTest.java b/tests/junit-functional/org/jgroups/tests/BatchMessageTest.java
index 49bc69649fc..b2d8cfd8692 100644
--- a/tests/junit-functional/org/jgroups/tests/BatchMessageTest.java
+++ b/tests/junit-functional/org/jgroups/tests/BatchMessageTest.java
@@ -22,8 +22,6 @@ public class BatchMessageTest extends MessageTestBase {
protected static final Message M2=create(DEST, 1000, true, true);
protected static final Message M3=new EmptyMessage(DEST);
- protected static final MessageFactory MF=new DefaultMessageFactory();
-
public void testCreation() {
BatchMessage msg=new BatchMessage(DEST, SRC, new Message[]{M1,M2,M3}, 3);
assert msg.getNumberOfMessages() == 3;
diff --git a/tests/junit-functional/org/jgroups/tests/BufferTest.java b/tests/junit-functional/org/jgroups/tests/BufferTest.java
index d9befaee8c0..92353cd4185 100644
--- a/tests/junit-functional/org/jgroups/tests/BufferTest.java
+++ b/tests/junit-functional/org/jgroups/tests/BufferTest.java
@@ -226,7 +226,7 @@ public void testAdditionMessageBatchWithOffset(Buffer type) {
assertIndices(buf, 100, 100, 129);
}
- public void testAddListWithResizing(Buffer type) {
+ public void testAddBatchWithResizing(Buffer type) {
if(type instanceof FixedBuffer)
return;
DynamicBuffer buf=new DynamicBuffer<>(3, 5, 0);
@@ -252,7 +252,7 @@ public void testAddMessageBatchWithResizing(Buffer type) {
}
- public void testAddListWithResizingNegativeSeqnos(Buffer type) {
+ public void testAddBatchWithResizingNegativeSeqnos(Buffer type) {
if(type instanceof FixedBuffer)
return;
long seqno=Long.MAX_VALUE-50;
@@ -265,7 +265,7 @@ public void testAddListWithResizingNegativeSeqnos(Buffer type) {
assert num_resizes == 1 : "number of resizings=" + num_resizes + " (expected 1)";
}
- public void testAddListWithResizing2(Buffer type) {
+ public void testAddBatchWithResizing2(Buffer type) {
if(type instanceof FixedBuffer)
return;
DynamicBuffer buf=new DynamicBuffer<>(3, 500, 0);
@@ -410,7 +410,6 @@ public void testAddMissing(Buffer buf) {
assert num == null;
}
-
public void testDuplicateAddition(Buffer buf) {
addAndGet(buf, 1, 5, 9, 10);
assert !buf.add(5,5);
@@ -517,6 +516,92 @@ public void testAddAndRemove4(Buffer type) {
assert buf.highestDelivered() == 4;
}
+ public void testAddListWithConstValue(Buffer buf) {
+ List> msgs=createList(1,2,3,4,5,6,7,8,9,10);
+ final int DUMMY=0;
+ boolean rc=buf.add(msgs, false, DUMMY);
+ System.out.println("buf = " + buf);
+ assert rc;
+ assert buf.size() == 10;
+ List list=buf.removeMany(true, 0, element -> element.hashCode() == Integer.hashCode(DUMMY));
+ System.out.println("list = " + list);
+ assert list.size() == 10;
+ assert buf.isEmpty();
+ for(int num: list)
+ assert num == DUMMY;
+ }
+
+ public void testAddListWithResizingNegativeSeqnos(Buffer type) {
+ long seqno=Long.MAX_VALUE-50;
+ Buffer buf=type instanceof DynamicBuffer? new DynamicBuffer<>(3,5,seqno) : new FixedBuffer<>(100, seqno);
+ List> msgs=new ArrayList<>();
+ for(int i=1; i < 100; i++)
+ msgs.add(new LongTuple<>((long)i+seqno,i));
+ buf.add(msgs, false, null);
+ System.out.println("buf = " + buf);
+ if(type instanceof DynamicBuffer) {
+ int num_resizes=((DynamicBuffer>)buf).getNumResizes();
+ System.out.println("num_resizes = " + num_resizes);
+ assert num_resizes == 1 : "number of resizings=" + num_resizes + " (expected 1)";
+ }
+ }
+
+ public void testAddListWithRemoval2(Buffer buf) {
+ List> msgs=createList(1,2,3,4,5,6,7,8,9,10);
+ int size=msgs.size();
+ boolean added=buf.add(msgs, false, null);
+ System.out.println("buf = " + buf);
+ assert added;
+ assert msgs.size() == size;
+
+ added=buf.add(msgs, true, null);
+ System.out.println("buf = " + buf);
+ assert !added;
+ assert msgs.isEmpty();
+
+ msgs=createList(1,3,5,7);
+ size=msgs.size();
+ added=buf.add(msgs, true, null);
+ System.out.println("buf = " + buf);
+ assert !added;
+ assert msgs.isEmpty();
+
+ msgs=createList(1,3,5,7,9,10,11,12,13,14,15);
+ size=msgs.size();
+ added=buf.add(msgs, true, null);
+ System.out.println("buf = " + buf);
+ assert added;
+ assert msgs.size() == 5;
+ }
+
+ public void testAddListWithResizing2(Buffer type) {
+ Buffer buf=type instanceof DynamicBuffer? new DynamicBuffer<>() : new FixedBuffer<>(100, 0);
+ List> msgs=new ArrayList<>();
+ for(int i=1; i < 100; i++)
+ msgs.add(new LongTuple<>(i, i));
+ buf.add(msgs, false, null);
+ System.out.println("buf = " + buf);
+ if(buf instanceof DynamicBuffer) {
+ int num_resizes=((DynamicBuffer>)buf).getNumResizes();
+ System.out.println("num_resizes = " + num_resizes);
+ assert num_resizes == 0 : "number of resizings=" + num_resizes + " (expected 0)";
+ }
+ }
+
+
+ public void testAddListWithResizing(Buffer type) {
+ Buffer buf=type instanceof DynamicBuffer? new DynamicBuffer<>(3,5,0) : new FixedBuffer<>(100, 0);
+ List> msgs=new ArrayList<>();
+ for(int i=1; i < 100; i++)
+ msgs.add(new LongTuple<>(i, i));
+ buf.add(msgs, false, null);
+ System.out.println("buf = " + buf);
+ if(buf instanceof DynamicBuffer) {
+ int num_resizes=((DynamicBuffer>)buf).getNumResizes();
+ System.out.println("num_resizes = " + num_resizes);
+ assert num_resizes == 1 : "number of resizings=" + num_resizes + " (expected 1)";
+ }
+ }
public void testIndex(Buffer type) {
Buffer buf=type instanceof DynamicBuffer? new DynamicBuffer<>(3, 10, 5)
diff --git a/tests/junit-functional/org/jgroups/tests/CompositeMessageTest.java b/tests/junit-functional/org/jgroups/tests/CompositeMessageTest.java
index aba3d106ff7..3b034be00cc 100644
--- a/tests/junit-functional/org/jgroups/tests/CompositeMessageTest.java
+++ b/tests/junit-functional/org/jgroups/tests/CompositeMessageTest.java
@@ -21,8 +21,6 @@ public class CompositeMessageTest extends MessageTestBase {
protected static final Message M2=create(DEST, 1000, true, true);
protected static final Message M3=new EmptyMessage(DEST);
- protected static final MessageFactory MF=new DefaultMessageFactory();
-
public void testCreation() {
CompositeMessage msg=new CompositeMessage(DEST, M1, M2);
assert msg.getNumberOfMessages() == 2;
@@ -62,7 +60,7 @@ public void testCollapse() throws Exception {
CompositeMessage msg=new CompositeMessage(DEST, M1, M2, M3).collapse(true);
int length=msg.getLength();
ByteArray buf=Util.messageToBuffer(msg);
- Message msg2=Util.messageFromBuffer(buf.getArray(), buf.getOffset(), buf.getLength(), MF);
+ Message msg2=Util.messageFromBuffer(buf.getArray(), buf.getOffset(), buf.getLength());
assert msg2 instanceof BytesMessage;
assert msg2.getLength() == length;
}
@@ -75,7 +73,7 @@ public void testCollapse2() throws Exception {
.collapse(true);
int length=msg.getLength();
ByteArray buf=Util.messageToBuffer(msg);
- Message msg2=Util.messageFromBuffer(buf.getArray(), buf.getOffset(), buf.getLength(), MF);
+ Message msg2=Util.messageFromBuffer(buf.getArray(), buf.getOffset(), buf.getLength());
assert msg2 instanceof BytesMessage;
assert msg2.getLength() == length;
diff --git a/tests/junit-functional/org/jgroups/tests/FragmentedMessageTest.java b/tests/junit-functional/org/jgroups/tests/FragmentedMessageTest.java
index 9df6db3bfe3..dfde70a9875 100644
--- a/tests/junit-functional/org/jgroups/tests/FragmentedMessageTest.java
+++ b/tests/junit-functional/org/jgroups/tests/FragmentedMessageTest.java
@@ -17,7 +17,6 @@
@Test(groups=Global.FUNCTIONAL)
public class FragmentedMessageTest {
protected static final int FRAG_SIZE=500;
- protected final MessageFactory msg_factory=new DefaultMessageFactory();
protected final byte[] array=Util.generateArray(1200);
protected final Address src=Util.createRandomAddress("X"), dest=Util.createRandomAddress("D");
@@ -109,7 +108,7 @@ protected void _testFragmentation(Message original_msg, Consumer verifi
new SequenceInputStream(Util.enumerate(msgs, 0, msgs.length,
m -> new ByteArrayDataInputStream(m.getArray(),m.getOffset(),m.getLength())));
DataInput input=new DataInputStream(seq);
- Message new_msg=msg_factory.create(original_msg.getType());
+ Message new_msg=MessageFactory.create(original_msg.getType());
new_msg.readFrom(input);
assert original_msg.getLength() == new_msg.getLength();
verifier.accept(new_msg);
diff --git a/tests/junit-functional/org/jgroups/tests/MessageBatchTest.java b/tests/junit-functional/org/jgroups/tests/MessageBatchTest.java
index e8b538b1493..4e990eae3a1 100644
--- a/tests/junit-functional/org/jgroups/tests/MessageBatchTest.java
+++ b/tests/junit-functional/org/jgroups/tests/MessageBatchTest.java
@@ -378,7 +378,6 @@ public void testTotalSize() {
public void testSize() throws Exception {
- MessageFactory mf=new DefaultMessageFactory();
List msgs=createMessages();
ByteArrayOutputStream output=new ByteArrayOutputStream();
DataOutputStream out=new DataOutputStream(output);
@@ -391,7 +390,7 @@ public void testSize() throws Exception {
DataInputStream in=new DataInputStream(new ByteArrayInputStream(buf));
in.readShort(); // version
in.readByte(); // flags
- List list=Util.readMessageList(in, UDP_ID, mf);
+ List list=Util.readMessageList(in, UDP_ID);
assert msgs.size() == list.size();
}
diff --git a/tests/junit-functional/org/jgroups/tests/MessageFactoryTest.java b/tests/junit-functional/org/jgroups/tests/MessageFactoryTest.java
index 48778989596..2154d9b5328 100644
--- a/tests/junit-functional/org/jgroups/tests/MessageFactoryTest.java
+++ b/tests/junit-functional/org/jgroups/tests/MessageFactoryTest.java
@@ -15,21 +15,20 @@
*/
@Test(groups=Global.FUNCTIONAL)
public class MessageFactoryTest {
- protected final MessageFactory mf=new DefaultMessageFactory();
public void testRegistration() {
for(int i=0; i < 32; i++) {
try {
- mf.register((short)i, MyMessageFactory::new);
+ MessageFactory.register((short)i, MyMessageFactory::new);
}
catch(IllegalArgumentException ex) {
System.out.printf("received exception (as expected): %s\n", ex);
}
}
- mf.register((short)32, MyMessageFactory::new);
+ MessageFactory.register((short)32, MyMessageFactory::new);
try {
- mf.register((short)32, MyMessageFactory::new);
+ MessageFactory.register((short)32, MyMessageFactory::new);
}
catch(IllegalArgumentException ex) {
System.out.printf("received exception (as expected): %s\n", ex);
diff --git a/tests/junit-functional/org/jgroups/tests/NioMessageTest.java b/tests/junit-functional/org/jgroups/tests/NioMessageTest.java
index b790c786c8b..bd478e1bd1f 100644
--- a/tests/junit-functional/org/jgroups/tests/NioMessageTest.java
+++ b/tests/junit-functional/org/jgroups/tests/NioMessageTest.java
@@ -1,6 +1,5 @@
package org.jgroups.tests;
-import org.jgroups.DefaultMessageFactory;
import org.jgroups.Global;
import org.jgroups.Message;
import org.jgroups.NioMessage;
@@ -236,7 +235,7 @@ public void testReadonly() throws Exception {
ByteBuffer payload=ByteBuffer.allocate(4).putInt(322649).flip().asReadOnlyBuffer();
Message msg=new NioMessage(null, payload);
ByteArray buf=Util.messageToBuffer(msg);
- NioMessage msg2=(NioMessage)Util.messageFromBuffer(buf.getArray(), buf.getOffset(), buf.getLength(), new DefaultMessageFactory());
+ NioMessage msg2=(NioMessage)Util.messageFromBuffer(buf.getArray(), buf.getOffset(), buf.getLength());
ByteBuffer buf2=msg2.getBuf();
assert payload.equals(buf2);
diff --git a/tests/junit-functional/org/jgroups/tests/OrderingTest.java b/tests/junit-functional/org/jgroups/tests/OrderingTest.java
index e5fce283d9f..f5c5464be89 100644
--- a/tests/junit-functional/org/jgroups/tests/OrderingTest.java
+++ b/tests/junit-functional/org/jgroups/tests/OrderingTest.java
@@ -60,10 +60,10 @@ protected static JChannel createChannel(int index) throws Exception {
new SHUFFLE().setUp(false).setDown(false).setMaxSize(200), // reorders messages
new NAKACK2().useMcastXmit(false).setDiscardDeliveredMsgs(true).setXmitInterval(200),
new UNICAST3(),
- new STABLE().setMaxBytes(50000).setDesiredAverageGossip(1000),
+ new STABLE().setMaxBytes(50_000).setDesiredAverageGossip(1000),
new GMS().setJoinTimeout(500).printLocalAddress(false),
- new UFC().setMaxCredits(2000000),
- new MFC().setMaxCredits(2000000),
+ new UFC().setMaxCredits(2_000_000),
+ new MFC().setMaxCredits(2_000_000),
new FRAG2())
.name(String.valueOf((char)('A' +index)));
}
@@ -114,20 +114,9 @@ protected void checkOrder(int expected_msgs) {
}
System.out.println("\n-- waiting for message reception by all receivers:");
- for(int i=0; i < 20; i++) {
- boolean done=true;
- for(JChannel ch: channels) {
- MyReceiver receiver=(MyReceiver)ch.getReceiver();
- int received=receiver.getReceived();
- if(received != expected_msgs) {
- done=false;
- break;
- }
- }
- if(done)
- break;
- Util.sleep(500);
- }
+ Util.waitUntilTrue(10000, 500,
+ () -> Stream.of(channels).map(JChannel::getReceiver)
+ .allMatch(r -> ((MyReceiver)r).getReceived() == expected_msgs));
Stream.of(channels).forEach(ch -> System.out.printf("%s: %d\n", ch.getAddress(),
((MyReceiver)ch.getReceiver()).getReceived()));
diff --git a/tests/junit-functional/org/jgroups/tests/ReliableUnicastTest.java b/tests/junit-functional/org/jgroups/tests/ReliableUnicastTest.java
new file mode 100644
index 00000000000..213bfd89042
--- /dev/null
+++ b/tests/junit-functional/org/jgroups/tests/ReliableUnicastTest.java
@@ -0,0 +1,368 @@
+package org.jgroups.tests;
+
+import org.jgroups.Address;
+import org.jgroups.Global;
+import org.jgroups.Message;
+import org.jgroups.ObjectMessage;
+import org.jgroups.protocols.ReliableUnicast;
+import org.jgroups.protocols.TP;
+import org.jgroups.protocols.UNICAST4;
+import org.jgroups.protocols.UnicastHeader;
+import org.jgroups.stack.Protocol;
+import org.jgroups.util.*;
+import org.testng.annotations.*;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.jgroups.util.MessageBatch.Mode.OOB;
+import static org.jgroups.util.MessageBatch.Mode.REG;
+
+/**
+ * Tests {@link ReliableUnicast}, ie. methods
+ * {@link ReliableUnicast#_getReceiverEntry(Address, long, boolean, short, Address)} and
+ * {@link ReliableUnicast#up(MessageBatch)}
+ * @author Bela Ban
+ * @since 5.4
+ */
+@Test(groups=Global.FUNCTIONAL,singleThreaded=true,dataProvider="createUnicast")
+public class ReliableUnicastTest {
+ protected ReliableUnicast unicast;
+ protected UpProtocol up_prot;
+ protected DownProtocol down_prot;
+ protected TP transport;
+ protected TimeScheduler timer;
+ protected static final Address DEST=Util.createRandomAddress("A");
+ protected static final Address SRC=Util.createRandomAddress("B");
+ protected static final AsciiString CLUSTER=new AsciiString("cluster");
+
+ @DataProvider
+ static Object[][] createUnicast() {
+ return new Object[][]{
+ {UNICAST4.class}
+ };
+ }
+
+ @BeforeClass
+ protected void setupTimer() {
+ timer=new TimeScheduler3();
+ }
+
+ @AfterClass
+ protected void stopTimer() {
+ timer.stop();
+ }
+
+ protected void setup(Class extends ReliableUnicast> unicast_cl) throws Exception {
+ unicast=unicast_cl.getConstructor().newInstance();
+ unicast.addr(DEST);
+ up_prot=new UpProtocol();
+ down_prot=new DownProtocol();
+ transport=new MockTransport().cluster(CLUSTER).addr(DEST);
+ up_prot.setDownProtocol(unicast);
+ unicast.setUpProtocol(up_prot);
+ unicast.setDownProtocol(down_prot);
+ down_prot.setUpProtocol(unicast);
+ down_prot.setDownProtocol(transport);
+ transport.setUpProtocol(down_prot);
+ TimeService time_service=new TimeService(timer);
+ unicast.timeService(time_service);
+ unicast.lastSync(new ExpiryCache<>(5000));
+ transport.init();
+ }
+
+ @AfterMethod
+ protected void destroy() {
+ unicast.stop();
+ transport.stop();
+ transport.destroy();
+ }
+
+ public void testGetReceiverEntryFirst(Class extends ReliableUnicast> cl) throws Exception {
+ setup(cl);
+ ReliableUnicast.ReceiverEntry entry=unicast._getReceiverEntry(DEST, 1L, true, (short)0, null);
+ assert entry != null && entry.connId() == 0;
+ entry=unicast._getReceiverEntry(DEST, 1L, true, (short)0, null);
+ assert entry != null && entry.connId() == 0;
+ assert unicast.getNumReceiveConnections() == 1;
+ }
+
+ public void testGetReceiverEntryNotFirst(Class extends ReliableUnicast> cl) throws Exception {
+ setup(cl);
+ ReliableUnicast.ReceiverEntry entry=unicast._getReceiverEntry(DEST, 2L, false, (short)0, null);
+ assert entry == null;
+ assert down_prot.numSendFirstReqs() == 1;
+ }
+
+ public void testGetReceiverEntryExists(Class extends ReliableUnicast> cl) throws Exception {
+ setup(cl);
+ ReliableUnicast.ReceiverEntry entry=unicast._getReceiverEntry(DEST, 1L, true, (short)1, null);
+ ReliableUnicast.ReceiverEntry old=entry;
+ assert entry != null && entry.connId() == 1;
+
+ // entry exists, but this conn-ID is smaller
+ entry=unicast._getReceiverEntry(DEST, 1L, true, (short)0, null);
+ assert entry == null;
+
+ // entry exists and conn-IDs match
+ ReliableUnicast.ReceiverEntry e=unicast._getReceiverEntry(DEST, 2L, true, (short)1, null);
+ assert e != null && e == old;
+
+ // entry exists, but is replaced by higher conn_id
+ entry=unicast._getReceiverEntry(DEST, 5L, true, (short)2, null);
+ assert entry.connId() == 2;
+ assert entry.buf().high() == 4;
+
+ entry=unicast._getReceiverEntry(DEST, 10L, false, (short)3, null);
+ assert entry == null;
+ assert down_prot.numSendFirstReqs() == 1;
+ }
+
+ public void testBatch(Class extends ReliableUnicast> cl) throws Exception {
+ setup(cl);
+ testBatch(false);
+ }
+
+ public void testBatchOOB(Class extends ReliableUnicast> cl) throws Exception {
+ setup(cl);
+ testBatch(true);
+ }
+
+ public void testBatchWithFirstMissing(Class extends ReliableUnicast> cl) throws Exception {
+ setup(cl);
+ testBatchWithFirstMissing(false);
+ }
+
+ public void testBatchWithFirstMissingOOB(Class extends ReliableUnicast> cl) throws Exception {
+ setup(cl);
+ testBatchWithFirstMissing(true);
+ }
+
+ public void testBatchWithFirstMissingAndExistingMessages(Class extends ReliableUnicast> cl) throws Exception {
+ setup(cl);
+ testBatchWithFirstMissingAndExistingMessages(false);
+ }
+
+ public void testBatchWithFirstMissingAndExistingMessagesOOB(Class extends ReliableUnicast> cl) throws Exception {
+ setup(cl);
+ testBatchWithFirstMissingAndExistingMessages(true);
+ }
+
+ public void testBatchWithFirstMissingAndEmptyBatch(Class extends ReliableUnicast> cl) throws Exception {
+ setup(cl);
+ testBatchWithFirstMissingAndEmptyBatch(false);
+ }
+
+ public void testBatchWithFirstMissingAndEmptyBatchOOB(Class extends ReliableUnicast> cl) throws Exception {
+ setup(cl);
+ testBatchWithFirstMissingAndEmptyBatch(true);
+ }
+
+ public void testBatchWithDifferentConnIds(Class extends ReliableUnicast> cl) throws Exception {
+ setup(cl);
+ testBatchWithDifferentConnIds(false);
+ }
+
+ public void testBatchWithDifferentConnIdsOOB(Class extends ReliableUnicast> cl) throws Exception {
+ setup(cl);
+ testBatchWithDifferentConnIds(true);
+ }
+
+ public void testBatchWithDifferentConnIds2(Class extends ReliableUnicast> cl) throws Exception {
+ setup(cl);
+ testBatchWithDifferentConnIds2(false);
+ }
+
+ public void testBatchWithDifferentConnIds2OOB(Class extends ReliableUnicast> cl) throws Exception {
+ setup(cl);
+ testBatchWithDifferentConnIds2(true);
+ }
+
+ protected void testBatch(boolean oob) throws Exception {
+ MessageBatch mb=create(DEST, SRC, oob, 1, 10, (short)0);
+ unicast.up(mb);
+ List list=up_prot.list();
+ Util.waitUntilTrue(2000, 200, () -> list.size() == 10);
+ assert list.size() == 10;
+ List expected=IntStream.rangeClosed(1, 10).boxed().collect(Collectors.toList());
+ assert list.equals(expected);
+ }
+
+ protected void testBatchWithFirstMissing(boolean oob) throws Exception {
+ MessageBatch mb=create(DEST, SRC, oob, 1, 10, (short)0);
+ mb.array().set(0, null);
+ unicast.up(mb);
+ List list=up_prot.list();
+ Util.waitUntilTrue(1000, 200, () -> list.size() == 9);
+ assert list.isEmpty();
+ // Now send the first seqno:
+ mb=create(DEST, SRC, oob, 11, 10, (short)0);
+ Message msg=new ObjectMessage(DEST, 1).src(SRC)
+ .putHeader(unicast.getId(), UnicastHeader.createDataHeader(1L, (short)0, true));
+ if(oob)
+ msg.setFlag(Message.Flag.OOB);
+ mb.add(msg);
+ unicast.up(mb);
+ Util.waitUntil(2000, 200, () -> list.size() == 20);
+ List expected=IntStream.rangeClosed(1, 20).boxed().collect(Collectors.toList());
+ if(oob) {
+ expected.remove(0);
+ expected.add(1);
+ }
+ assert list.equals(expected);
+ }
+
+ protected void testBatchWithFirstMissingAndExistingMessages(boolean oob) throws Exception {
+ MessageBatch mb=create(DEST, SRC, oob, 1, 10, (short)0);
+ mb.array().set(0, null);
+ unicast.up(mb);
+ List list=up_prot.list();
+ Util.waitUntilTrue(1000, 200, () -> list.size() == 9);
+ assert list.isEmpty();
+
+ // Now send the first seqno, but also existing messages 1-10 (and new messages 11-20)
+ mb=create(DEST, SRC, oob, 1, 20, (short)0);
+ unicast.up(mb);
+ Util.waitUntil(2000, 200, () -> list.size() == 20);
+ List expected=IntStream.rangeClosed(1, 20).boxed().collect(Collectors.toList());
+ if(oob) {
+ expected.remove((Object)1);
+ expected.add(9, 1);
+ }
+ assert list.equals(expected);
+ }
+
+ protected void testBatchWithFirstMissingAndEmptyBatch(boolean oob) throws Exception {
+ MessageBatch mb=create(DEST, SRC, oob, 1, 10, (short)0);
+ mb.array().set(0, null);
+ unicast.up(mb);
+ List list=up_prot.list();
+ Util.waitUntilTrue(1000, 200, () -> list.size() == 9);
+ assert list.isEmpty();
+
+ // Now send the first seqno, but also existing messages 1-10 (and new messages 11-20)
+ mb=create(DEST, SRC, oob, 1, 1, (short)0);
+ unicast.up(mb);
+ Util.waitUntil(2000, 200, () -> list.size() == 10);
+ List expected=IntStream.rangeClosed(1, 10).boxed().collect(Collectors.toList());
+ if(oob) {
+ expected.remove((Object)1);
+ expected.add(9, 1);
+ }
+ assert list.equals(expected);
+ }
+
+ protected void testBatchWithDifferentConnIds(boolean oob) throws TimeoutException {
+ MessageBatch mb=create(DEST, SRC, oob, 1, 20, (short)0);
+ List buf=mb.array();
+ for(int i=0; i < buf.size(); i++) {
+ short conn_id=(short)Math.min(i, 10);
+ ((UnicastHeader)buf.get(i).getHeader(unicast.getId())).connId(conn_id);
+ }
+ List list=up_prot.list();
+ unicast.up(mb);
+ Util.waitUntilTrue(1000, 200, () -> list.size() == 10);
+ assert list.isEmpty();
+
+ Message msg=new ObjectMessage(DEST, 10).src(SRC)
+ .putHeader(unicast.getId(), UnicastHeader.createDataHeader(10, (short)10, true));
+ if(oob)
+ msg.setFlag(Message.Flag.OOB);
+ unicast.up(msg);
+ Util.waitUntil(2000, 200, () -> list.size() == 11);
+ List expected=IntStream.rangeClosed(10, 20).boxed().collect(Collectors.toList());
+ if(oob) {
+ expected.remove(0);
+ expected.add(10);
+ }
+ assert list.equals(expected) : String.format("expected %s, but got: %s", expected, list);
+ }
+
+ protected void testBatchWithDifferentConnIds2(boolean oob) throws TimeoutException {
+ MessageBatch mb=new MessageBatch(20).dest(DEST).sender(SRC).setMode(oob? OOB : REG);
+ short conn_id=5;
+ for(int i=20; i > 0; i--) {
+ if(i % 5 == 0)
+ conn_id--;
+ Message msg=new ObjectMessage(DEST, i).src(SRC)
+ .putHeader(unicast.getId(), UnicastHeader.createDataHeader(i, conn_id, false));
+ if(oob)
+ msg.setFlag(Message.Flag.OOB);
+ mb.add(msg);
+ }
+ List list=up_prot.list();
+ unicast.up(mb);
+ Util.waitUntilTrue(1000, 200, () -> list.size() == 5);
+ assert list.isEmpty();
+ Message msg=new ObjectMessage(DEST, 16).src(SRC)
+ .putHeader(unicast.getId(), UnicastHeader.createDataHeader(16, (short)4, true));
+ if(oob)
+ msg.setFlag(Message.Flag.OOB);
+ unicast.up(msg);
+ Util.waitUntilTrue(2000, 200, () -> list.size() == 5);
+ List expected=IntStream.rangeClosed(16, 20).boxed().collect(Collectors.toList());
+ if(oob)
+ Collections.reverse(expected);
+ assert list.equals(expected) : String.format("expected %s, but got: %s", expected, list);
+ }
+
+ protected MessageBatch create(Address dest, Address sender, boolean oob, int start_seqno, int num_msgs, short conn_id) {
+ MessageBatch mb=new MessageBatch(dest, sender, CLUSTER, false, oob? OOB : REG, 16);
+ for(int i=start_seqno; i < start_seqno+num_msgs; i++) {
+ Message msg=new ObjectMessage(dest, i).src(sender)
+ .putHeader(unicast.getId(), UnicastHeader.createDataHeader(i, conn_id, i == 1));
+ if(oob)
+ msg.setFlag(Message.Flag.OOB);
+ mb.add(msg);
+ }
+ return mb;
+ }
+
+ protected static class DownProtocol extends Protocol {
+ protected final LongAdder num_send_first_reqs=new LongAdder();
+
+ protected long numSendFirstReqs() {return num_send_first_reqs.sum();}
+ protected DownProtocol clear() {num_send_first_reqs.reset(); return this;}
+
+ @Override
+ public Object down(Message msg) {
+ UnicastHeader hdr=msg.getHeader(up_prot.getId());
+ if(hdr != null && hdr.type() == UnicastHeader.SEND_FIRST_SEQNO)
+ num_send_first_reqs.increment();
+ return null;
+ }
+ }
+
+ protected static class UpProtocol extends Protocol {
+ protected final List list=new ArrayList<>();
+ protected boolean raw_msgs;
+
+ protected List list() {return list;}
+ protected UpProtocol clear() {list.clear(); return this;}
+ public UpProtocol rawMsgs(boolean flag) {this.raw_msgs=flag; return this;}
+
+ @Override
+ public Object up(Message msg) {
+ T obj=raw_msgs? (T)msg : (T)msg.getObject();
+ synchronized(list) {
+ list.add(obj);
+ }
+ return null;
+ }
+
+ @Override
+ public void up(MessageBatch batch) {
+ synchronized(list) {
+ for(Message m: batch) {
+ T obj=raw_msgs? (T)m : (T)m.getObject();
+ list.add(obj);
+ }
+ }
+ }
+ }
+}
diff --git a/tests/junit-functional/org/jgroups/tests/UnicastUnitTest.java b/tests/junit-functional/org/jgroups/tests/UnicastUnitTest.java
index 4e7c0fc40ca..7a5ce40b808 100644
--- a/tests/junit-functional/org/jgroups/tests/UnicastUnitTest.java
+++ b/tests/junit-functional/org/jgroups/tests/UnicastUnitTest.java
@@ -7,15 +7,18 @@
import org.jgroups.protocols.pbcast.STABLE;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
+import org.jgroups.util.MyReceiver;
import org.jgroups.util.ResourceManager;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -23,50 +26,55 @@
* Tests unicast functionality
* @author Bela Ban
*/
-@Test(groups=Global.FUNCTIONAL,singleThreaded=true)
+@Test(groups=Global.FUNCTIONAL,singleThreaded=true,dataProvider="create")
public class UnicastUnitTest {
protected JChannel a, b, c, d;
+ @DataProvider
+ static Object[][] create() {
+ return new Object[][]{
+ {UNICAST3.class},
+ {UNICAST4.class}
+ };
+ }
+
@AfterMethod protected void tearDown() throws Exception {Util.closeReverse(a,b,c,d);}
- public void testUnicastMessageInCallbackExistingMember() throws Throwable {
+ public void testUnicastMessageInCallbackExistingMember(Class extends Protocol> cl) throws Throwable {
String mcast_addr=ResourceManager.getNextMulticastAddress();
- a=create("A", false, mcast_addr);
- b=create("B", false, mcast_addr);
+ a=create(cl, "A", false, mcast_addr);
+ b=create(cl, "B", false, mcast_addr);
a.connect("UnicastUnitTest");
- MyReceiver receiver=new MyReceiver(a);
+ MyReceiver> receiver=new MyReceiver<>();
a.setReceiver(receiver);
b.connect("UnicastUnitTest");
a.setReceiver(null); // so the receiver doesn't get a view change
- Throwable ex=receiver.getEx();
- if(ex != null)
- throw ex;
}
/** Tests sending msgs from A to B */
// @Test(invocationCount=10)
- public void testMessagesToOther() throws Exception {
+ public void testMessagesToOther(Class extends Protocol> cl) throws Exception {
String mcast_addr=ResourceManager.getNextMulticastAddress();
- a=create("A", false, mcast_addr);
- b=create("B", false, mcast_addr);
+ a=create(cl, "A", false, mcast_addr);
+ b=create(cl, "B", false, mcast_addr);
_testMessagesToOther();
}
- public void testMessagesToOtherBatching() throws Exception {
+ public void testMessagesToOtherBatching(Class extends Protocol> cl) throws Exception {
String mcast_addr=ResourceManager.getNextMulticastAddress();
- a=create("A", true, mcast_addr);
- b=create("B", true, mcast_addr);
+ a=create(cl, "A", true, mcast_addr);
+ b=create(cl, "B", true, mcast_addr);
_testMessagesToOther();
}
- public void testMessagesToEverybodyElse() throws Exception {
- MyReceiver r1=new MyReceiver(), r2=new MyReceiver(), r3=new MyReceiver(), r4=new MyReceiver();
+ public void testMessagesToEverybodyElse(Class extends Protocol> cl) throws Exception {
+ MyReceiver r1=new MyReceiver<>(), r2=new MyReceiver<>(), r3=new MyReceiver<>(), r4=new MyReceiver<>();
String mcast_addr=ResourceManager.getNextMulticastAddress();
- a=create("A", false, mcast_addr);
- b=create("B", false, mcast_addr);
- c=create("C", false, mcast_addr);
- d=create("D", false, mcast_addr);
+ a=create(cl, "A", false, mcast_addr);
+ b=create(cl, "B", false, mcast_addr);
+ c=create(cl, "C", false, mcast_addr);
+ d=create(cl, "D", false, mcast_addr);
connect(a,b,c,d);
a.setReceiver(r1);
@@ -89,8 +97,7 @@ public void testMessagesToEverybodyElse() throws Exception {
Util.sleep(500);
}
- Stream.of(r1,r2,r3,r4).forEach(r -> System.out.printf("%s\n", r.list));
-
+ Stream.of(r1,r2,r3,r4).forEach(r -> System.out.printf("%s\n", r.list()));
List expected_list=Arrays.asList(1,2,3,4,5);
System.out.print("Checking (per-sender) FIFO ordering of messages: ");
Stream.of(r1,r2,r3,r4).forEach(r -> Stream.of(a, b, c, d).forEach(ch -> {
@@ -103,10 +110,10 @@ public void testMessagesToEverybodyElse() throws Exception {
System.out.println("OK");
}
- public void testPartition() throws Exception {
+ public void testPartition(Class extends Protocol> cl) throws Exception {
String mcast_addr=ResourceManager.getNextMulticastAddress();
- a=create("A", false, mcast_addr);
- b=create("B", false, mcast_addr);
+ a=create(cl, "A", false, mcast_addr);
+ b=create(cl, "B", false, mcast_addr);
connect(a,b);
System.out.println("-- Creating network partition");
Stream.of(a,b).forEach(ch -> {
@@ -152,26 +159,28 @@ protected void _testMessagesToOther() throws Exception {
b.setReceiver(receiver);
send(a, msgs);
checkReception(receiver, false, 1,2,3,4,5);
+ checkUnackedMessages(0, a);
}
// @Test(invocationCount=10)
- public void testMessagesToSelf() throws Exception {
+ public void testMessagesToSelf(Class extends Protocol> cl) throws Exception {
String mcast_addr=ResourceManager.getNextMulticastAddress();
- a=create("A", false, mcast_addr);
- b=create("B", false, mcast_addr);
+ a=create(cl, "A", false, mcast_addr);
+ b=create(cl, "B", false, mcast_addr);
_testMessagesToSelf();
}
- public void testMessagesToSelfBatching() throws Exception {
+ public void testMessagesToSelfBatching(Class extends Protocol> cl) throws Exception {
String mcast_addr=ResourceManager.getNextMulticastAddress();
- a=create("A", true, mcast_addr);
- b=create("B", true, mcast_addr);
+ a=create(cl, "A", true, mcast_addr);
+ b=create(cl, "B", true, mcast_addr);
_testMessagesToSelf();
}
protected void _testMessagesToSelf() throws Exception {
connect(a,b);
+ Util.waitUntilAllChannelsHaveSameView(3000, 100, a,b);
Address dest=a.getAddress();
Message[] msgs={
msg(dest),
@@ -189,24 +198,26 @@ protected void _testMessagesToSelf() throws Exception {
a.setReceiver(receiver);
send(a, msgs);
checkReception(receiver, false, 1,2,3,5,8,9);
+ checkUnackedMessages(0, a);
}
- public void testMessagesToSelf2() throws Exception {
+ public void testMessagesToSelf2(Class extends Protocol> cl) throws Exception {
String mcast_addr=ResourceManager.getNextMulticastAddress();
- a=create("A", false, mcast_addr);
- b=create("B", false, mcast_addr);
+ a=create(cl, "A", false, mcast_addr);
+ b=create(cl, "B", false, mcast_addr);
_testMessagesToSelf2();
}
- public void testMessagesToSelf2Batching() throws Exception {
+ public void testMessagesToSelf2Batching(Class extends Protocol> cl) throws Exception {
String mcast_addr=ResourceManager.getNextMulticastAddress();
- a=create("A", true, mcast_addr);
- b=create("B", true, mcast_addr);
+ a=create(cl, "A", true, mcast_addr);
+ b=create(cl, "B", true, mcast_addr);
_testMessagesToSelf2();
}
protected void _testMessagesToSelf2() throws Exception {
connect(a,b);
+ Util.waitUntilAllChannelsHaveSameView(3000, 100, a,b);
Address dest=a.getAddress();
Message[] msgs={
msg(dest).setFlag(Message.Flag.OOB).setFlag(Message.TransientFlag.DONT_LOOPBACK),
@@ -226,9 +237,9 @@ protected void _testMessagesToSelf2() throws Exception {
a.setReceiver(receiver);
send(a, msgs);
checkReception(receiver, false, 2,5,6,10);
+ checkUnackedMessages(0, a);
}
-
protected static void send(JChannel ch, Message... msgs) throws Exception {
int cnt=1;
for(Message msg: msgs) {
@@ -240,11 +251,7 @@ protected static void send(JChannel ch, Message... msgs) throws Exception {
protected static void checkReception(MyReceiver r, boolean check_order, int... num) {
List received=r.list();
- for(int i=0; i < 10; i++) {
- if(received.size() == num.length)
- break;
- Util.sleep(500);
- }
+ Util.waitUntilTrue(3000, 500, () -> received.size() == num.length);
List expected=new ArrayList<>(num.length);
for(int n: num) expected.add(n);
System.out.println("received=" + received + ", expected=" + expected);
@@ -255,9 +262,17 @@ protected static void checkReception(MyReceiver r, boolean check_order,
assert num[i] == received.get(i);
}
- protected static Message msg(Address dest) {return new BytesMessage(dest);}
+ protected static void checkUnackedMessages(int expected, JChannel ... channels) throws TimeoutException {
+ Util.waitUntil(3000, 100,
+ () -> Stream.of(channels).map(ch -> ch.stack().findProtocol(UNICAST3.class, UNICAST4.class))
+ .map(rp -> rp instanceof UNICAST4? ((UNICAST4)rp).getNumUnackedMessages()
+ : ((UNICAST3)rp).getNumUnackedMessages())
+ .allMatch(num -> num == expected));
+ }
+
+ protected static Message msg(Address dest) {return new ObjectMessage(dest);}
- protected static JChannel create(String name, boolean use_batching, String mcast_addr) throws Exception {
+ protected static JChannel create(Class extends Protocol> cl, String name, boolean use_batching, String mcast_addr) throws Exception {
Protocol[] protocols={
new UDP().setMcastGroupAddr(InetAddress.getByName(mcast_addr)).setBindAddress(Util.getLoopback()),
new LOCAL_PING(),
@@ -265,7 +280,8 @@ protected static JChannel create(String name, boolean use_batching, String mcast
new FD_ALL3().setTimeout(2000).setInterval(500),
new NAKACK2(),
new MAKE_BATCH().sleepTime(100).unicasts(use_batching),
- new UNICAST3(),
+ //new UNBATCH(),
+ cl.getConstructor().newInstance(),
new STABLE(),
new GMS().setJoinTimeout(1000),
new FRAG2().setFragSize(8000),
@@ -279,41 +295,4 @@ protected static void connect(JChannel... channels) throws Exception {
Util.waitUntilAllChannelsHaveSameView(10000, 1000, channels);
}
-
- protected static class MyReceiver implements Receiver {
- protected JChannel channel;
- protected Throwable ex;
- protected final List list=new ArrayList<>();
-
- public MyReceiver() {this(null);}
- public MyReceiver(JChannel ch) {this.channel=ch;}
- public Throwable getEx() {return ex;}
- public List