Skip to content

QFJ-943: optional watermarks-based back pressure propagation from inbo… #168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package quickfix;

public abstract class AbstractSessionConnectorBuilder<Derived, Product> {
private final Class<Derived> derived;
Application application;
MessageStoreFactory messageStoreFactory;
SessionSettings settings;
LogFactory logFactory;
MessageFactory messageFactory;

int queueCapacity = -1;
int queueLowerWatermark = -1;
int queueUpperWatermark = -1;

AbstractSessionConnectorBuilder(Class<Derived> derived) {
this.derived = derived;
}

public Derived withApplication(Application val) throws ConfigError {
application = val;
return derived.cast(this);
}

public Derived withMessageStoreFactory(MessageStoreFactory val) throws ConfigError {
messageStoreFactory = val;
return derived.cast(this);
}

public Derived withSettings(SessionSettings val) {
settings = val;
return derived.cast(this);
}

public Derived withLogFactory(LogFactory val) throws ConfigError {
logFactory = val;
return derived.cast(this);
}

public Derived withMessageFactory(MessageFactory val) throws ConfigError {
messageFactory = val;
return derived.cast(this);
}

public Derived withQueueCapacity(int val) throws ConfigError {
if (queueLowerWatermark >= 0) {
throw new ConfigError("queue capacity and watermarks may not be configured together");
} else if (queueCapacity < 0) {
throw new ConfigError("negative queue capacity");
}
queueCapacity = val;
return derived.cast(this);
}

public Derived withQueueWatermarks(int lower, int upper) throws ConfigError {
if (queueCapacity >= 0) {
throw new ConfigError("queue capacity and watermarks may not be configured together");
} else if (queueLowerWatermark < 0 || queueUpperWatermark <= queueLowerWatermark) {
throw new ConfigError("invalid queue watermarks, required: 0 <= lower watermark < upper watermark");
}
queueLowerWatermark = lower;
queueUpperWatermark = upper;
return derived.cast(this);
}

public final Product build() throws ConfigError {
if (logFactory == null) {
logFactory = new ScreenLogFactory(settings);
}

return doBuild();
}

protected abstract Product doBuild() throws ConfigError;
}
28 changes: 28 additions & 0 deletions quickfixj-core/src/main/java/quickfix/SocketAcceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,34 @@ public class SocketAcceptor extends AbstractSocketAcceptor {
private volatile Boolean isStarted = Boolean.FALSE;
private final SingleThreadedEventHandlingStrategy eventHandlingStrategy;

private SocketAcceptor(Builder builder) throws ConfigError {
super(builder.application, builder.messageStoreFactory, builder.settings,
builder.logFactory, builder.messageFactory);

if (builder.queueCapacity >= 0) {
eventHandlingStrategy
= new SingleThreadedEventHandlingStrategy(this, builder.queueCapacity);
} else {
eventHandlingStrategy
= new SingleThreadedEventHandlingStrategy(this, builder.queueLowerWatermark, builder.queueUpperWatermark);
}
}

public static Builder newBuilder() {
return new Builder();
}

public static final class Builder extends AbstractSessionConnectorBuilder<Builder, SocketAcceptor> {
private Builder() {
super(Builder.class);
}

@Override
protected SocketAcceptor doBuild() throws ConfigError {
return new SocketAcceptor(this);
}
}

public SocketAcceptor(Application application, MessageStoreFactory messageStoreFactory,
SessionSettings settings, LogFactory logFactory, MessageFactory messageFactory,
int queueCapacity)
Expand Down
28 changes: 28 additions & 0 deletions quickfixj-core/src/main/java/quickfix/SocketInitiator.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,34 @@ public class SocketInitiator extends AbstractSocketInitiator {
private volatile Boolean isStarted = Boolean.FALSE;
private final SingleThreadedEventHandlingStrategy eventHandlingStrategy;

private SocketInitiator(Builder builder) throws ConfigError {
super(builder.application, builder.messageStoreFactory, builder.settings,
builder.logFactory, builder.messageFactory);

if (builder.queueCapacity >= 0) {
eventHandlingStrategy
= new SingleThreadedEventHandlingStrategy(this, builder.queueCapacity);
} else {
eventHandlingStrategy
= new SingleThreadedEventHandlingStrategy(this, builder.queueLowerWatermark, builder.queueUpperWatermark);
}
}

public static Builder newBuilder() {
return new Builder();
}

public static final class Builder extends AbstractSessionConnectorBuilder<Builder, SocketInitiator> {
private Builder() {
super(Builder.class);
}

@Override
protected SocketInitiator doBuild() throws ConfigError {
return new SocketInitiator(this);
}
}

public SocketInitiator(Application application, MessageStoreFactory messageStoreFactory,
SessionSettings settings, MessageFactory messageFactory, int queueCapacity) throws ConfigError {
super(application, messageStoreFactory, settings, new ScreenLogFactory(settings),
Expand Down
28 changes: 28 additions & 0 deletions quickfixj-core/src/main/java/quickfix/ThreadedSocketAcceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,34 @@
public class ThreadedSocketAcceptor extends AbstractSocketAcceptor {
private final ThreadPerSessionEventHandlingStrategy eventHandlingStrategy;

private ThreadedSocketAcceptor(Builder builder) throws ConfigError {
super(builder.application, builder.messageStoreFactory, builder.settings,
builder.logFactory, builder.messageFactory);

if (builder.queueCapacity >= 0) {
eventHandlingStrategy
= new ThreadPerSessionEventHandlingStrategy(this, builder.queueCapacity);
} else {
eventHandlingStrategy
= new ThreadPerSessionEventHandlingStrategy(this, builder.queueLowerWatermark, builder.queueUpperWatermark);
}
}

public static Builder newBuilder() {
return new Builder();
}

public static final class Builder extends AbstractSessionConnectorBuilder<Builder, ThreadedSocketAcceptor> {
private Builder() {
super(Builder.class);
}

@Override
protected ThreadedSocketAcceptor doBuild() throws ConfigError {
return new ThreadedSocketAcceptor(this);
}
}

public ThreadedSocketAcceptor(Application application, MessageStoreFactory messageStoreFactory,
SessionSettings settings, LogFactory logFactory, MessageFactory messageFactory,
int queueCapacity )
Expand Down
28 changes: 28 additions & 0 deletions quickfixj-core/src/main/java/quickfix/ThreadedSocketInitiator.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,34 @@
public class ThreadedSocketInitiator extends AbstractSocketInitiator {
private final ThreadPerSessionEventHandlingStrategy eventHandlingStrategy;

private ThreadedSocketInitiator(Builder builder) throws ConfigError {
super(builder.application, builder.messageStoreFactory, builder.settings,
builder.logFactory, builder.messageFactory);

if (builder.queueCapacity >= 0) {
eventHandlingStrategy
= new ThreadPerSessionEventHandlingStrategy(this, builder.queueCapacity);
} else {
eventHandlingStrategy
= new ThreadPerSessionEventHandlingStrategy(this, builder.queueLowerWatermark, builder.queueUpperWatermark);
}
}

public static Builder newBuilder() {
return new Builder();
}

public static final class Builder extends AbstractSessionConnectorBuilder<Builder, ThreadedSocketInitiator> {
private Builder() {
super(Builder.class);
}

@Override
protected ThreadedSocketInitiator doBuild() throws ConfigError {
return new ThreadedSocketInitiator(this);
}
}

public ThreadedSocketInitiator(Application application,
MessageStoreFactory messageStoreFactory, SessionSettings settings,
LogFactory logFactory, MessageFactory messageFactory, int queueCapacity) throws ConfigError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
* it only handles message reception events.
*/
public interface EventHandlingStrategy {

/**
* Constant indicating how long we wait for an incoming message. After
* thread has been asked to stop, it can take up to this long to terminate.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,7 @@ public String getRemoteAddress() {
return null;
}

IoSession getIoSession() {
return ioSession;
}
}
11 changes: 11 additions & 0 deletions quickfixj-core/src/main/java/quickfix/mina/QueueTracker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package quickfix.mina;

import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

interface QueueTracker<E> {
void put(E e) throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int drainTo(Collection<E> collection);
}
89 changes: 89 additions & 0 deletions quickfixj-core/src/main/java/quickfix/mina/QueueTrackers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package quickfix.mina;

import org.apache.mina.core.session.IoSession;
import quickfix.Responder;
import quickfix.Session;

import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static java.lang.String.format;

/**
* Queue trackers factory methods
*/
final class QueueTrackers {
private static final String LOWER_WATERMARK_FMT = "inbound queue size < lower watermark (%d), socket reads resumed";
private static final String UPPER_WATERMARK_FMT = "inbound queue size > upper watermark (%d), socket reads suspended";

/**
* Watermarks-based queue tracker
*/
static <E> WatermarkTracker<E, Session> newMultiSessionWatermarkTracker(
BlockingQueue<E> queue,
long lowerWatermark, long upperWatermark,
Function<E, Session> classifier) {
return WatermarkTracker.newMulti(queue, lowerWatermark, upperWatermark, classifier,
qfSession -> resumeReads(qfSession, (int)lowerWatermark),
qfSession -> suspendReads(qfSession, (int)upperWatermark));
}

/**
* Default no-op queue tracker
*/
static <E> QueueTracker<E> newDefaultQueueTracker(BlockingQueue<E> queue) {
return new QueueTracker<E>() {
@Override
public void put(E e) throws InterruptedException {
queue.put(e);
}

@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
}

@Override
public int drainTo(Collection<E> collection) {
return queue.drainTo(collection);
}
};
}

private static IoSession lookupIoSession(Session qfSession) {
final Responder responder = qfSession.getResponder();

if (responder instanceof IoSessionResponder) {
return ((IoSessionResponder)responder).getIoSession();
} else {
return null;
}
}

private static void resumeReads(Session qfSession, int queueLowerWatermark) {
final IoSession ioSession = lookupIoSession(qfSession);
if (ioSession != null && ioSession.isReadSuspended()) {
ioSession.resumeRead();
qfSession.getLog().onEvent(format(LOWER_WATERMARK_FMT, queueLowerWatermark));
}
}

private static void suspendReads(Session qfSession, int queueUpperWatermark) {
final IoSession ioSession = lookupIoSession(qfSession);
if (ioSession != null && !ioSession.isReadSuspended()) {
ioSession.suspendRead();
qfSession.getLog().onEvent(format(UPPER_WATERMARK_FMT, queueUpperWatermark));
}
}

static <E, Void> WatermarkTracker<E, Void> newSingleSessionWatermarkTracker(
BlockingQueue<E> queue,
long lowerWatermark, long upperWatermark,
Session qfSession) {
return WatermarkTracker.newMono(queue, lowerWatermark, upperWatermark,
() -> resumeReads(qfSession, (int)lowerWatermark),
() -> suspendReads(qfSession, (int)upperWatermark));
}
}
Loading