From 878ae6dbcf2143fb942a79f3a69971f4dc5938e8 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Tue, 31 Mar 2020 16:21:42 +0200 Subject: [PATCH] - Initial implementation of RED (https://issues.redhat.com/browse/JGRP-2462) - Updated manual on RED --- doc/manual/protocols.adoc | 11 ++ src/org/jgroups/protocols/BaseBundler.java | 5 +- src/org/jgroups/protocols/Bundler.java | 7 + src/org/jgroups/protocols/DELAY.java | 4 +- src/org/jgroups/protocols/RED.java | 153 ++++++++++++++++++ .../jgroups/protocols/RemoveQueueBundler.java | 4 + .../jgroups/protocols/RingBufferBundler.java | 1 + .../protocols/RingBufferBundlerLockless.java | 2 +- .../protocols/RingBufferBundlerLockless2.java | 1 + src/org/jgroups/protocols/TP.java | 16 +- .../protocols/TransferQueueBundler.java | 4 + src/org/jgroups/util/AverageMinMax.java | 2 +- .../org/jgroups/protocols/RED_Test.java | 130 +++++++++++++++ 13 files changed, 324 insertions(+), 16 deletions(-) create mode 100644 src/org/jgroups/protocols/RED.java create mode 100644 tests/junit-functional/org/jgroups/protocols/RED_Test.java diff --git a/doc/manual/protocols.adoc b/doc/manual/protocols.adoc index bc2c0cdf23e..381f9d898cc 100644 --- a/doc/manual/protocols.adoc +++ b/doc/manual/protocols.adoc @@ -2263,6 +2263,17 @@ RATE_LIMITER can be used to set a limit on the data sent per time unit. When sen ${RATE_LIMITER} +==== Random Early Drop (RED) +RED is an implementation of a Random Early Detect (or Drop) protocol. It measures the queue size of +the bundler in the transport and drops a message if the bundler's queue is starting to get full. When +the queue is full, all messages will be dropped (tail drop). + +The `RED` protocol should be placed above the transport. + + +${RED} + + [[LockingProtocols]] ==== Locking protocols diff --git a/src/org/jgroups/protocols/BaseBundler.java b/src/org/jgroups/protocols/BaseBundler.java index 39e35202a97..84459bdf679 100644 --- a/src/org/jgroups/protocols/BaseBundler.java +++ b/src/org/jgroups/protocols/BaseBundler.java @@ -42,11 +42,11 @@ public void viewChange(View view) { // code removed (https://issues.jboss.org/browse/JGRP-2324) } + /** Returns the total number of messages in the hashmap */ public int size() { lock.lock(); try { - long num=msgs.values().stream().flatMap(Collection::stream).map(Message::size).reduce(0L, (a, b) -> a + b); - return (int)num; + return msgs.values().stream().map(List::size).reduce(0, Integer::sum); } finally { lock.unlock(); @@ -62,7 +62,6 @@ public int size() { List list=entry.getValue(); if(list.isEmpty()) continue; - output.position(0); if(list.size() == 1) sendSingleMessage(list.get(0)); diff --git a/src/org/jgroups/protocols/Bundler.java b/src/org/jgroups/protocols/Bundler.java index 29a751bd561..083427c672f 100644 --- a/src/org/jgroups/protocols/Bundler.java +++ b/src/org/jgroups/protocols/Bundler.java @@ -26,6 +26,13 @@ default void viewChange(View view) {} /** The number of unsent messages in the bundler */ int size(); + /** + * If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then + * return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.
+ * This method needs to be fast as it might get called on every message to be sent. + */ + default int getQueueSize() {return -1;} + /** * Returns stats about the bundler itself. * @return Stats, may be null diff --git a/src/org/jgroups/protocols/DELAY.java b/src/org/jgroups/protocols/DELAY.java index b7323b68a1a..370e43ad549 100644 --- a/src/org/jgroups/protocols/DELAY.java +++ b/src/org/jgroups/protocols/DELAY.java @@ -121,9 +121,7 @@ public long getDelay(TimeUnit unit) { public int compareTo(Delayed o) { if (o == this) return 0; - // return Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS)); // JDK 7 only - long my_delay=this.getDelay(TimeUnit.NANOSECONDS), other_delay=o.getDelay(TimeUnit.NANOSECONDS); - return (my_delay < other_delay) ? -1 : ((my_delay == other_delay) ? 0 : 1); + return Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS)); } } diff --git a/src/org/jgroups/protocols/RED.java b/src/org/jgroups/protocols/RED.java new file mode 100644 index 00000000000..1f600f22872 --- /dev/null +++ b/src/org/jgroups/protocols/RED.java @@ -0,0 +1,153 @@ +package org.jgroups.protocols; + +import org.jgroups.Message; +import org.jgroups.annotations.MBean; +import org.jgroups.annotations.ManagedAttribute; +import org.jgroups.annotations.Property; +import org.jgroups.stack.Protocol; +import org.jgroups.util.Util; + +import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Implementation of Random Early Drop: messages are discarded when the bundler's queue in the transport nears exhaustion. + * See Floyd and Van Jacobsen's paper for details. + * @author Bela Ban + * @since 5.0.0, 4.2.2 + */ +@MBean(description="Implementation of Random Early Drop: messages are discarded when the bundler's queue in " + + "the transport nears exhaustion") +public class RED extends Protocol { + + @Property(description="If false, all messages are passed down. Will be set to false if the bundler " + + "returns a queue size of -1") + protected boolean enabled=true; + + @ManagedAttribute(description="The capacity of the queue (assumed to be constant)") + protected int queue_capacity; + + @Property(description="The min threshold (percentage between 0 and 1.0) below which no message is dropped") + protected double min_threshold=0.5; + protected long min; + + @Property(description="The max threshold (percentage between min_threshold and 1.0) above which all messages are dropped") + protected double max_threshold=1.0; + protected long max; + + @ManagedAttribute(description="The average number of elements in the bundler's queue. Computed as " + + "o * (1 - 2^-wf) + c * (2^-wf) where o is the old average, c the current queue size amd wf the weight_factor") + protected double avg_queue_size; + + @Property(description="The weight used to compute the average queue size. The higher the value is, the less the " + + "current queue size is taken into account. E.g. with 2, 25% of the current queue size and 75% of the old " + + "average is taken to compute the new average. In other words: with a high value, the average will take " + + "longer to reflect the current queueu size.") + protected double weight_factor=2; + + protected final LongAdder dropped_msgs=new LongAdder(); // dropped messages + protected final LongAdder total_msgs=new LongAdder(); // total messages looked at + + protected Bundler bundler; + protected final Lock lock=new ReentrantLock(); + protected long span=max-min; // diff between max and min + protected double weight=Math.pow(2, -weight_factor); + + + public boolean isEnabled() {return enabled;} + public RED setEnabled(boolean e) {enabled=e; return this;} + public double getMinThreshold() {return min_threshold;} + + + + @ManagedAttribute(description="The number of dropped messages") + public long getDroppedMessages() {return dropped_msgs.sum();} + + @ManagedAttribute(description="Total number of messages processed") + public long getTotalMessages() {return total_msgs.sum();} + + @ManagedAttribute(description="Percentage of all messages that were dropped") + public double getDropRate() {return dropped_msgs.sum() / (double)total_msgs.sum();} + + + public void start() throws Exception { + super.start(); + bundler=getTransport().getBundler(); + enabled=bundler != null && bundler.getQueueSize() >= 0; + if(enabled) { + queue_capacity=getTransport().getBundlerCapacity(); + min=(long)(queue_capacity * checkRange(min_threshold, 0, 1, "min_threshold")); + max=(long)(queue_capacity * checkRange(max_threshold, 0, 1, "max_threshold")); + span=max-min; + weight=Math.pow(2, -weight_factor); + } + } + + public void resetStats() { + super.resetStats(); + avg_queue_size=0; + dropped_msgs.reset(); + total_msgs.reset(); + } + + public Object down(Message msg) { + if(enabled) { + int current_queue_size=bundler.getQueueSize(); + double avg; + lock.lock(); + try { + avg=avg_queue_size=computeAverage(avg_queue_size, current_queue_size); + // System.out.printf("-- avg=%.2f, size=%d\n", avg, current_queue_size); + } + finally { + lock.unlock(); + } + + total_msgs.increment(); + if(avg <= min) + ; // message will be sent + else if(avg >= max) + return null; // message will be dropped + else { // min_threshold < avg < max_threshold + // message will be dropped with probability p + double p=computeDropProbability(avg); + if(Util.tossWeightedCoin(p)) { + dropped_msgs.increment(); + return null; // drop the message + } + } + } + return down_prot.down(msg); + } + + public String toString() { + return String.format("enabled=%b, queue capacity=%d, min=%d, max=%d, avg-queue-size=%.2f, " + + "total=%d dropped=%d (%d%%)", enabled, queue_capacity, min, max, avg_queue_size, + total_msgs.sum(), dropped_msgs.sum(), (int)(getDropRate()*100.0)); + } + + protected double computeAverage(double old_avg, int new_queue_size) { + return old_avg * (1 - weight) + new_queue_size * weight; + } + + /** Computes a probability P with which the message should get dropped. min_threshold < avg < max_threshold. + * Probability increases linearly with min moving toward max */ + protected double computeDropProbability(double avg) { + return Math.min(1, (avg-min) / span); + } + + protected static double checkRange(double val, double min, double max, String name) { + if(val < min || val > max) + throw new IllegalArgumentException(String.format("%s (%.2f) needs to be in range [%.2f..%.2f]", name, val, min, max)); + return val; + } + + /* public static void main(String[] args) { + RED red=new RED(); + for(int i=0; i <= 1030; i++) { + double p=red.computeDropProbability(i); + System.out.printf("i=%d, drop-p=%.2f\n", i, p); + } + }*/ +} diff --git a/src/org/jgroups/protocols/RemoveQueueBundler.java b/src/org/jgroups/protocols/RemoveQueueBundler.java index 6d233cefd40..b80dded2812 100644 --- a/src/org/jgroups/protocols/RemoveQueueBundler.java +++ b/src/org/jgroups/protocols/RemoveQueueBundler.java @@ -106,6 +106,10 @@ public void run() { } } + public int getQueueSize() { + return rb.size(); + } + public int size() { return rb.size(); } diff --git a/src/org/jgroups/protocols/RingBufferBundler.java b/src/org/jgroups/protocols/RingBufferBundler.java index 2722ca3dfd1..2096b600575 100644 --- a/src/org/jgroups/protocols/RingBufferBundler.java +++ b/src/org/jgroups/protocols/RingBufferBundler.java @@ -56,6 +56,7 @@ public RingBufferBundler(int capacity) { public RingBuffer buf() {return rb;} public Thread getThread() {return bundler_thread.getThread();} public int size() {return rb.size();} + public int getQueueSize() {return rb.size();} public int numSpins() {return num_spins;} public RingBufferBundler numSpins(int n) {num_spins=n; return this;} public String waitStrategy() {return print(wait_strategy);} diff --git a/src/org/jgroups/protocols/RingBufferBundlerLockless.java b/src/org/jgroups/protocols/RingBufferBundlerLockless.java index 44600a55129..76a405d7770 100644 --- a/src/org/jgroups/protocols/RingBufferBundlerLockless.java +++ b/src/org/jgroups/protocols/RingBufferBundlerLockless.java @@ -49,7 +49,7 @@ public RingBufferBundlerLockless(int capacity) { public int readIndex() {return read_index;} public int writeIndex() {return write_index;} public int size() {return size.get();} - + public int getQueueSize() {return size.get();} public void init(TP transport) { super.init(transport); diff --git a/src/org/jgroups/protocols/RingBufferBundlerLockless2.java b/src/org/jgroups/protocols/RingBufferBundlerLockless2.java index c726bc156a0..78863d123df 100644 --- a/src/org/jgroups/protocols/RingBufferBundlerLockless2.java +++ b/src/org/jgroups/protocols/RingBufferBundlerLockless2.java @@ -50,6 +50,7 @@ public RingBufferBundlerLockless2(int capacity, boolean padded) { public int readIndex() {return read_index.get();} public int writeIndex() {return write_index.get();} public RingBufferBundlerLockless2 reset() {ri=0; read_index.set(0); write_index.set(1); return this;} + public int getQueueSize() {return size();} public int size() {return _size(read_index.get(), write_index.get());} protected int _size(int ri, int wi) { diff --git a/src/org/jgroups/protocols/TP.java b/src/org/jgroups/protocols/TP.java index c362980c9f6..3c3b20845a6 100644 --- a/src/org/jgroups/protocols/TP.java +++ b/src/org/jgroups/protocols/TP.java @@ -257,10 +257,11 @@ public T setMaxBundleSize(int size) { max_bundle_size=size; return (T)this; } - public final int getMaxBundleSize() {return max_bundle_size;} - public int getBundlerCapacity() {return bundler_capacity;} - public int getMessageProcessingMaxBufferSize() {return msg_processing_max_buffer_size;} - public boolean useFibers() {return use_fibers;} + public final int getMaxBundleSize() {return max_bundle_size;} + public int getBundlerCapacity() {return bundler_capacity;} + public T setBundlerCapacity(int c) {this.bundler_capacity=c; return (T)this;} + public int getMessageProcessingMaxBufferSize() {return msg_processing_max_buffer_size;} + public boolean useFibers() {return use_fibers;} @ManagedAttribute public int getBundlerBufferSize() { if(bundler instanceof TransferQueueBundler) @@ -945,11 +946,10 @@ public void start() throws Exception { } fetchLocalAddresses(); startDiagnostics(); - if(bundler == null) { + if(bundler == null) bundler=createBundler(bundler_type); - bundler.init(this); - bundler.start(); - } + bundler.init(this); + bundler.start(); // local_addr is null when shared transport setInAllThreadFactories(cluster_name != null? cluster_name.toString() : null, local_addr, thread_naming_pattern); } diff --git a/src/org/jgroups/protocols/TransferQueueBundler.java b/src/org/jgroups/protocols/TransferQueueBundler.java index f759a064f3a..36253c12392 100644 --- a/src/org/jgroups/protocols/TransferQueueBundler.java +++ b/src/org/jgroups/protocols/TransferQueueBundler.java @@ -92,6 +92,10 @@ public int size() { return super.size() + removeQueueSize() + getBufferSize(); } + public int getQueueSize() { + return queue.size(); + } + public void send(Message msg) throws Exception { if(running) queue.put(msg); diff --git a/src/org/jgroups/util/AverageMinMax.java b/src/org/jgroups/util/AverageMinMax.java index 76af88bb48f..8b770bdecc5 100644 --- a/src/org/jgroups/util/AverageMinMax.java +++ b/src/org/jgroups/util/AverageMinMax.java @@ -71,7 +71,7 @@ protected double stddev() { if(values == null) return -1.0; double av=average(); int size=values.size(); - double variance=values.stream().map(v -> (v - av)*(v - av)).reduce(0.0, (x, y) -> x + y) / size; + double variance=values.stream().map(v -> (v - av)*(v - av)).reduce(0.0, Double::sum) / size; return Math.sqrt(variance); } diff --git a/tests/junit-functional/org/jgroups/protocols/RED_Test.java b/tests/junit-functional/org/jgroups/protocols/RED_Test.java new file mode 100644 index 00000000000..48025d7e319 --- /dev/null +++ b/tests/junit-functional/org/jgroups/protocols/RED_Test.java @@ -0,0 +1,130 @@ +package org.jgroups.protocols; + +import org.jgroups.Address; +import org.jgroups.Global; +import org.jgroups.JChannel; +import org.jgroups.Message; +import org.jgroups.protocols.pbcast.GMS; +import org.jgroups.stack.ProtocolStack; +import org.jgroups.util.AverageMinMax; +import org.jgroups.util.Util; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.LongAdder; +import java.util.stream.Stream; + +/** + * Tests {@link RED} + * @author Bela Ban + * @since 5.0.0 + */ +@Test(groups=Global.FUNCTIONAL,singleThreaded=true) +public class RED_Test { + protected JChannel ch; + protected DelayBundler bundler; + protected RED red; + protected TP transport; + protected static final Address TARGET=Util.createRandomAddress("B"); + protected static final int NUM_SENDERS=10, NUM_MSGS=1000, TOT_MSGS=NUM_SENDERS*NUM_MSGS; + + @BeforeMethod protected void setup() throws Exception { + ch=create("A").connect(RED_Test.class.getSimpleName()); + } + + @AfterMethod protected void destroy() {Util.close(ch);} + + public void testNoMessageDrops() throws Exception { + for(int i=1; i <= 10; i++) + ch.send(TARGET, i); + System.out.printf("red: %s\nbundler: %s\n", red, bundler); + Util.waitUntil(10000, 500, () -> bundler.getSentMessages() + red.getDroppedMessages() >= 10, + () -> String.format("sent msgs (%d) and dropped msgs (%d) need to be >= %d", + bundler.getSentMessages(), red.getDroppedMessages(), 10)); + } + + public void testMessageDrops() throws TimeoutException { + final Thread[] senders=new Thread[NUM_SENDERS]; + for(int i=0; i < senders.length; i++) { + senders[i]=new Thread(() -> { + long start=System.currentTimeMillis(); + for(int j=0; j < NUM_MSGS; j++) { + try { + ch.send(TARGET, Thread.currentThread().getId() + "-" + j); + } + catch(Exception e) { + e.printStackTrace(); + } + } + long time=System.currentTimeMillis()-start; + System.out.printf("%s: sent %d messages in %d ms\n", Thread.currentThread(), NUM_MSGS, time); + }); + } + Stream.of(senders).parallel().forEach(Thread::start); + + Stream.of(senders).forEach(t -> { + try { + t.join(30000); + } + catch(InterruptedException e) { + e.printStackTrace(); + } + }); + + assert Stream.of(senders).noneMatch(Thread::isAlive); + Util.waitUntil(10000, 500, () -> bundler.getSentMessages() + red.getDroppedMessages() >= TOT_MSGS, + () -> String.format("sent msgs (%d) and dropped msgs (%d) need to be >= %d", + bundler.getSentMessages(), red.getDroppedMessages(), TOT_MSGS)); + System.out.printf("red: %s\nbundler: %s\n", red, bundler); + assert red.getDroppedMessages() > 0; + } + + + protected JChannel create(String name) throws Exception { + JChannel retval=new JChannel(Util.getTestStack()).name(name); + red=new RED(); + transport=retval.getProtocolStack().getTransport(); + transport.setBundlerCapacity(1024); + transport.getProtocolStack().removeProtocol(UNICAST3.class); + retval.getProtocolStack().insertProtocolInStack(red, transport, ProtocolStack.Position.ABOVE); + bundler=new DelayBundler(); + bundler.init(transport); + transport.setBundler(bundler); + ((GMS)retval.getProtocolStack().findProtocol(GMS.class)).setJoinTimeout(5); + return retval; + } + + protected static class DelayBundler extends TransferQueueBundler { + protected final LongAdder sent=new LongAdder(), single=new LongAdder(), batches=new LongAdder(); + protected final AverageMinMax avg_batch_size=new AverageMinMax(); + + protected long getSentMessages() {return sent.sum();} + protected long getSingle() {return single.sum();} + protected long getBatches() {return batches.sum();} + protected double getAvgBatchSize() {return avg_batch_size.getAverage();} + + protected void sendSingleMessage(Message msg) { + sent.increment(); + single.increment(); + Util.sleepRandom(5, 100); + } + + protected void sendMessageList(Address dest, Address src, List list) { + if(list != null) { + int size=list.size(); + batches.increment(); + sent.add(size); + avg_batch_size.add(size); + } + Util.sleepRandom(2, 100); + } + + public String toString() { + return String.format("sent=%d (single=%d, batches=%d) avg-batch=%s", + getSentMessages(), getSingle(), getBatches(), avg_batch_size); + } + } +}