Skip to content

Commit

Permalink
- Initial implementation of RED (https://issues.redhat.com/browse/JGR…
Browse files Browse the repository at this point in the history
…P-2462)

- Updated manual on RED
  • Loading branch information
belaban committed Mar 31, 2020
1 parent 4c01a03 commit 878ae6d
Show file tree
Hide file tree
Showing 13 changed files with 324 additions and 16 deletions.
11 changes: 11 additions & 0 deletions doc/manual/protocols.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2263,6 +2263,17 @@ RATE_LIMITER can be used to set a limit on the data sent per time unit. When sen

${RATE_LIMITER}

==== Random Early Drop (RED)
RED is an implementation of a Random Early Detect (or Drop) protocol. It measures the queue size of
the bundler in the transport and drops a message if the bundler's queue is starting to get full. When
the queue is full, all messages will be dropped (tail drop).

The `RED` protocol should be placed above the transport.


${RED}


[[LockingProtocols]]
==== Locking protocols

Expand Down
5 changes: 2 additions & 3 deletions src/org/jgroups/protocols/BaseBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ public void viewChange(View view) {
// code removed (https://issues.jboss.org/browse/JGRP-2324)
}

/** Returns the total number of messages in the hashmap */
public int size() {
lock.lock();
try {
long num=msgs.values().stream().flatMap(Collection::stream).map(Message::size).reduce(0L, (a, b) -> a + b);
return (int)num;
return msgs.values().stream().map(List::size).reduce(0, Integer::sum);
}
finally {
lock.unlock();
Expand All @@ -62,7 +62,6 @@ public int size() {
List<Message> list=entry.getValue();
if(list.isEmpty())
continue;

output.position(0);
if(list.size() == 1)
sendSingleMessage(list.get(0));
Expand Down
7 changes: 7 additions & 0 deletions src/org/jgroups/protocols/Bundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ default void viewChange(View view) {}
/** The number of unsent messages in the bundler */
int size();

/**
* If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then
* return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.<br/>
* This method needs to be fast as it might get called on every message to be sent.
*/
default int getQueueSize() {return -1;}

/**
* Returns stats about the bundler itself.
* @return Stats, may be null
Expand Down
4 changes: 1 addition & 3 deletions src/org/jgroups/protocols/DELAY.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,7 @@ public long getDelay(TimeUnit unit) {
public int compareTo(Delayed o) {
if (o == this)
return 0;
// return Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS)); // JDK 7 only
long my_delay=this.getDelay(TimeUnit.NANOSECONDS), other_delay=o.getDelay(TimeUnit.NANOSECONDS);
return (my_delay < other_delay) ? -1 : ((my_delay == other_delay) ? 0 : 1);
return Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));
}
}

Expand Down
153 changes: 153 additions & 0 deletions src/org/jgroups/protocols/RED.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package org.jgroups.protocols;

import org.jgroups.Message;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.Property;
import org.jgroups.stack.Protocol;
import org.jgroups.util.Util;

import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Implementation of Random Early Drop: messages are discarded when the bundler's queue in the transport nears exhaustion.
* See Floyd and Van Jacobsen's paper for details.
* @author Bela Ban
* @since 5.0.0, 4.2.2
*/
@MBean(description="Implementation of Random Early Drop: messages are discarded when the bundler's queue in " +
"the transport nears exhaustion")
public class RED extends Protocol {

@Property(description="If false, all messages are passed down. Will be set to false if the bundler " +
"returns a queue size of -1")
protected boolean enabled=true;

@ManagedAttribute(description="The capacity of the queue (assumed to be constant)")
protected int queue_capacity;

@Property(description="The min threshold (percentage between 0 and 1.0) below which no message is dropped")
protected double min_threshold=0.5;
protected long min;

@Property(description="The max threshold (percentage between min_threshold and 1.0) above which all messages are dropped")
protected double max_threshold=1.0;
protected long max;

@ManagedAttribute(description="The average number of elements in the bundler's queue. Computed as " +
"o * (1 - 2^-wf) + c * (2^-wf) where o is the old average, c the current queue size amd wf the weight_factor")
protected double avg_queue_size;

@Property(description="The weight used to compute the average queue size. The higher the value is, the less the " +
"current queue size is taken into account. E.g. with 2, 25% of the current queue size and 75% of the old " +
"average is taken to compute the new average. In other words: with a high value, the average will take " +
"longer to reflect the current queueu size.")
protected double weight_factor=2;

protected final LongAdder dropped_msgs=new LongAdder(); // dropped messages
protected final LongAdder total_msgs=new LongAdder(); // total messages looked at

protected Bundler bundler;
protected final Lock lock=new ReentrantLock();
protected long span=max-min; // diff between max and min
protected double weight=Math.pow(2, -weight_factor);


public boolean isEnabled() {return enabled;}
public RED setEnabled(boolean e) {enabled=e; return this;}
public double getMinThreshold() {return min_threshold;}



@ManagedAttribute(description="The number of dropped messages")
public long getDroppedMessages() {return dropped_msgs.sum();}

@ManagedAttribute(description="Total number of messages processed")
public long getTotalMessages() {return total_msgs.sum();}

@ManagedAttribute(description="Percentage of all messages that were dropped")
public double getDropRate() {return dropped_msgs.sum() / (double)total_msgs.sum();}


public void start() throws Exception {
super.start();
bundler=getTransport().getBundler();
enabled=bundler != null && bundler.getQueueSize() >= 0;
if(enabled) {
queue_capacity=getTransport().getBundlerCapacity();
min=(long)(queue_capacity * checkRange(min_threshold, 0, 1, "min_threshold"));
max=(long)(queue_capacity * checkRange(max_threshold, 0, 1, "max_threshold"));
span=max-min;
weight=Math.pow(2, -weight_factor);
}
}

public void resetStats() {
super.resetStats();
avg_queue_size=0;
dropped_msgs.reset();
total_msgs.reset();
}

public Object down(Message msg) {
if(enabled) {
int current_queue_size=bundler.getQueueSize();
double avg;
lock.lock();
try {
avg=avg_queue_size=computeAverage(avg_queue_size, current_queue_size);
// System.out.printf("-- avg=%.2f, size=%d\n", avg, current_queue_size);
}
finally {
lock.unlock();
}

total_msgs.increment();
if(avg <= min)
; // message will be sent
else if(avg >= max)
return null; // message will be dropped
else { // min_threshold < avg < max_threshold
// message will be dropped with probability p
double p=computeDropProbability(avg);
if(Util.tossWeightedCoin(p)) {
dropped_msgs.increment();
return null; // drop the message
}
}
}
return down_prot.down(msg);
}

public String toString() {
return String.format("enabled=%b, queue capacity=%d, min=%d, max=%d, avg-queue-size=%.2f, " +
"total=%d dropped=%d (%d%%)", enabled, queue_capacity, min, max, avg_queue_size,
total_msgs.sum(), dropped_msgs.sum(), (int)(getDropRate()*100.0));
}

protected double computeAverage(double old_avg, int new_queue_size) {
return old_avg * (1 - weight) + new_queue_size * weight;
}

/** Computes a probability P with which the message should get dropped. min_threshold < avg < max_threshold.
* Probability increases linearly with min moving toward max */
protected double computeDropProbability(double avg) {
return Math.min(1, (avg-min) / span);
}

protected static double checkRange(double val, double min, double max, String name) {
if(val < min || val > max)
throw new IllegalArgumentException(String.format("%s (%.2f) needs to be in range [%.2f..%.2f]", name, val, min, max));
return val;
}

/* public static void main(String[] args) {
RED red=new RED();
for(int i=0; i <= 1030; i++) {
double p=red.computeDropProbability(i);
System.out.printf("i=%d, drop-p=%.2f\n", i, p);
}
}*/
}
4 changes: 4 additions & 0 deletions src/org/jgroups/protocols/RemoveQueueBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ public void run() {
}
}

public int getQueueSize() {
return rb.size();
}

public int size() {
return rb.size();
}
Expand Down
1 change: 1 addition & 0 deletions src/org/jgroups/protocols/RingBufferBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public RingBufferBundler(int capacity) {
public RingBuffer<Message> buf() {return rb;}
public Thread getThread() {return bundler_thread.getThread();}
public int size() {return rb.size();}
public int getQueueSize() {return rb.size();}
public int numSpins() {return num_spins;}
public RingBufferBundler numSpins(int n) {num_spins=n; return this;}
public String waitStrategy() {return print(wait_strategy);}
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/RingBufferBundlerLockless.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public RingBufferBundlerLockless(int capacity) {
public int readIndex() {return read_index;}
public int writeIndex() {return write_index;}
public int size() {return size.get();}

public int getQueueSize() {return size.get();}

public void init(TP transport) {
super.init(transport);
Expand Down
1 change: 1 addition & 0 deletions src/org/jgroups/protocols/RingBufferBundlerLockless2.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public RingBufferBundlerLockless2(int capacity, boolean padded) {
public int readIndex() {return read_index.get();}
public int writeIndex() {return write_index.get();}
public RingBufferBundlerLockless2 reset() {ri=0; read_index.set(0); write_index.set(1); return this;}
public int getQueueSize() {return size();}
public int size() {return _size(read_index.get(), write_index.get());}

protected int _size(int ri, int wi) {
Expand Down
16 changes: 8 additions & 8 deletions src/org/jgroups/protocols/TP.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,11 @@ public <T extends TP> T setMaxBundleSize(int size) {
max_bundle_size=size;
return (T)this;
}
public final int getMaxBundleSize() {return max_bundle_size;}
public int getBundlerCapacity() {return bundler_capacity;}
public int getMessageProcessingMaxBufferSize() {return msg_processing_max_buffer_size;}
public boolean useFibers() {return use_fibers;}
public final int getMaxBundleSize() {return max_bundle_size;}
public int getBundlerCapacity() {return bundler_capacity;}
public <T extends TP> T setBundlerCapacity(int c) {this.bundler_capacity=c; return (T)this;}
public int getMessageProcessingMaxBufferSize() {return msg_processing_max_buffer_size;}
public boolean useFibers() {return use_fibers;}

@ManagedAttribute public int getBundlerBufferSize() {
if(bundler instanceof TransferQueueBundler)
Expand Down Expand Up @@ -945,11 +946,10 @@ public void start() throws Exception {
}
fetchLocalAddresses();
startDiagnostics();
if(bundler == null) {
if(bundler == null)
bundler=createBundler(bundler_type);
bundler.init(this);
bundler.start();
}
bundler.init(this);
bundler.start();
// local_addr is null when shared transport
setInAllThreadFactories(cluster_name != null? cluster_name.toString() : null, local_addr, thread_naming_pattern);
}
Expand Down
4 changes: 4 additions & 0 deletions src/org/jgroups/protocols/TransferQueueBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ public int size() {
return super.size() + removeQueueSize() + getBufferSize();
}

public int getQueueSize() {
return queue.size();
}

public void send(Message msg) throws Exception {
if(running)
queue.put(msg);
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/util/AverageMinMax.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected double stddev() {
if(values == null) return -1.0;
double av=average();
int size=values.size();
double variance=values.stream().map(v -> (v - av)*(v - av)).reduce(0.0, (x, y) -> x + y) / size;
double variance=values.stream().map(v -> (v - av)*(v - av)).reduce(0.0, Double::sum) / size;
return Math.sqrt(variance);
}

Expand Down
Loading

0 comments on commit 878ae6d

Please sign in to comment.