Skip to content

QFJ-943: optional watermarks-based back pressure propagation from inbo… #168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 6, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions quickfixj-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,24 @@
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.9.1</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.build.directory}/generated-sources</source>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this add all the generated source files to the JAR file? Is this needed? They are generated on a fresh build anyway.

Copy link
Contributor Author

@mrbald mrbald Jan 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this helps IntelliJ automatically detect the generated files (this was the reason), if they also get added to the *-sources.jar I'd love it too as generated or not, sometimes it's nice to click down to sources to see what they do.

Having said the above - if there are reservations about this change, let's drop it. Let me know then - I'll update the pull request.

</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package quickfix;

public abstract class AbstractSessionConnectorBuilder<Derived, Product> {
private final Class<Derived> derived;
Application application;
MessageStoreFactory messageStoreFactory;
SessionSettings settings;
LogFactory logFactory;
MessageFactory messageFactory;

int queueCapacity = -1;
int queueLowerWatermark = -1;
int queueUpperWatermark = -1;

AbstractSessionConnectorBuilder(Class<Derived> derived) {
this.derived = derived;
}

public Derived withApplication(Application val) throws ConfigError {
application = val;
return derived.cast(this);
}

public Derived withMessageStoreFactory(MessageStoreFactory val) throws ConfigError {
messageStoreFactory = val;
return derived.cast(this);
}

public Derived withSettings(SessionSettings val) {
settings = val;
return derived.cast(this);
}

public Derived withLogFactory(LogFactory val) throws ConfigError {
logFactory = val;
return derived.cast(this);
}

public Derived withMessageFactory(MessageFactory val) throws ConfigError {
messageFactory = val;
return derived.cast(this);
}

public Derived withQueueCapacity(int val) throws ConfigError {
if (queueLowerWatermark >= 0) {
throw new ConfigError("queue capacity and watermarks may not be configured together");
} else if (queueCapacity < 0) {
throw new ConfigError("negative queue capacity");
}
queueCapacity = val;
return derived.cast(this);
}

public Derived withQueueWatermarks(int lower, int upper) throws ConfigError {
if (queueCapacity >= 0) {
throw new ConfigError("queue capacity and watermarks may not be configured together");
} else if (queueLowerWatermark < 0 || queueUpperWatermark <= queueLowerWatermark) {
throw new ConfigError("invalid queue watermarks, required: 0 <= lower watermark < upper watermark");
}
queueLowerWatermark = lower;
queueUpperWatermark = upper;
return derived.cast(this);
}

public final Product build() throws ConfigError {
if (logFactory == null) {
logFactory = new ScreenLogFactory(settings);
}

return doBuild();
}

protected abstract Product doBuild() throws ConfigError;
}
28 changes: 28 additions & 0 deletions quickfixj-core/src/main/java/quickfix/SocketAcceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,34 @@ public class SocketAcceptor extends AbstractSocketAcceptor {
private volatile Boolean isStarted = Boolean.FALSE;
private final SingleThreadedEventHandlingStrategy eventHandlingStrategy;

private SocketAcceptor(Builder builder) throws ConfigError {
super(builder.application, builder.messageStoreFactory, builder.settings,
builder.logFactory, builder.messageFactory);

if (builder.queueCapacity >= 0) {
eventHandlingStrategy
= new SingleThreadedEventHandlingStrategy(this, builder.queueCapacity);
} else {
eventHandlingStrategy
= new SingleThreadedEventHandlingStrategy(this, builder.queueLowerWatermark, builder.queueUpperWatermark);
}
}

public static Builder newBuilder() {
return new Builder();
}

public static final class Builder extends AbstractSessionConnectorBuilder<Builder, SocketAcceptor> {
private Builder() {
super(Builder.class);
}

@Override
protected SocketAcceptor doBuild() throws ConfigError {
return new SocketAcceptor(this);
}
}

public SocketAcceptor(Application application, MessageStoreFactory messageStoreFactory,
SessionSettings settings, LogFactory logFactory, MessageFactory messageFactory,
int queueCapacity)
Expand Down
28 changes: 28 additions & 0 deletions quickfixj-core/src/main/java/quickfix/SocketInitiator.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,34 @@ public class SocketInitiator extends AbstractSocketInitiator {
private volatile Boolean isStarted = Boolean.FALSE;
private final SingleThreadedEventHandlingStrategy eventHandlingStrategy;

private SocketInitiator(Builder builder) throws ConfigError {
super(builder.application, builder.messageStoreFactory, builder.settings,
builder.logFactory, builder.messageFactory);

if (builder.queueCapacity >= 0) {
eventHandlingStrategy
= new SingleThreadedEventHandlingStrategy(this, builder.queueCapacity);
} else {
eventHandlingStrategy
= new SingleThreadedEventHandlingStrategy(this, builder.queueLowerWatermark, builder.queueUpperWatermark);
}
}

public static Builder newBuilder() {
return new Builder();
}

public static final class Builder extends AbstractSessionConnectorBuilder<Builder, SocketInitiator> {
private Builder() {
super(Builder.class);
}

@Override
protected SocketInitiator doBuild() throws ConfigError {
return new SocketInitiator(this);
}
}

public SocketInitiator(Application application, MessageStoreFactory messageStoreFactory,
SessionSettings settings, MessageFactory messageFactory, int queueCapacity) throws ConfigError {
super(application, messageStoreFactory, settings, new ScreenLogFactory(settings),
Expand Down
28 changes: 28 additions & 0 deletions quickfixj-core/src/main/java/quickfix/ThreadedSocketAcceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,34 @@
public class ThreadedSocketAcceptor extends AbstractSocketAcceptor {
private final ThreadPerSessionEventHandlingStrategy eventHandlingStrategy;

private ThreadedSocketAcceptor(Builder builder) throws ConfigError {
super(builder.application, builder.messageStoreFactory, builder.settings,
builder.logFactory, builder.messageFactory);

if (builder.queueCapacity >= 0) {
eventHandlingStrategy
= new ThreadPerSessionEventHandlingStrategy(this, builder.queueCapacity);
} else {
eventHandlingStrategy
= new ThreadPerSessionEventHandlingStrategy(this, builder.queueLowerWatermark, builder.queueUpperWatermark);
}
}

public static Builder newBuilder() {
return new Builder();
}

public static final class Builder extends AbstractSessionConnectorBuilder<Builder, ThreadedSocketAcceptor> {
private Builder() {
super(Builder.class);
}

@Override
protected ThreadedSocketAcceptor doBuild() throws ConfigError {
return new ThreadedSocketAcceptor(this);
}
}

public ThreadedSocketAcceptor(Application application, MessageStoreFactory messageStoreFactory,
SessionSettings settings, LogFactory logFactory, MessageFactory messageFactory,
int queueCapacity )
Expand Down
28 changes: 28 additions & 0 deletions quickfixj-core/src/main/java/quickfix/ThreadedSocketInitiator.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,34 @@
public class ThreadedSocketInitiator extends AbstractSocketInitiator {
private final ThreadPerSessionEventHandlingStrategy eventHandlingStrategy;

private ThreadedSocketInitiator(Builder builder) throws ConfigError {
super(builder.application, builder.messageStoreFactory, builder.settings,
builder.logFactory, builder.messageFactory);

if (builder.queueCapacity >= 0) {
eventHandlingStrategy
= new ThreadPerSessionEventHandlingStrategy(this, builder.queueCapacity);
} else {
eventHandlingStrategy
= new ThreadPerSessionEventHandlingStrategy(this, builder.queueLowerWatermark, builder.queueUpperWatermark);
}
}

public static Builder newBuilder() {
return new Builder();
}

public static final class Builder extends AbstractSessionConnectorBuilder<Builder, ThreadedSocketInitiator> {
private Builder() {
super(Builder.class);
}

@Override
protected ThreadedSocketInitiator doBuild() throws ConfigError {
return new ThreadedSocketInitiator(this);
}
}

public ThreadedSocketInitiator(Application application,
MessageStoreFactory messageStoreFactory, SessionSettings settings,
LogFactory logFactory, MessageFactory messageFactory, int queueCapacity) throws ConfigError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,7 @@ public String getRemoteAddress() {
return null;
}

IoSession getIoSession() {
return ioSession;
}
}
30 changes: 30 additions & 0 deletions quickfixj-core/src/main/java/quickfix/mina/QueueTracker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package quickfix.mina;

import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

interface QueueTracker<E> {
void put(E e) throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int drainTo(Collection<E> collection);

static <E> QueueTracker<E> wrap(BlockingQueue<E> queue) {
return new QueueTracker<E>() {
@Override
public void put(E e) throws InterruptedException {
queue.put(e);
}

@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
}

@Override
public int drainTo(Collection<E> collection) {
return queue.drainTo(collection);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@

package quickfix.mina;

import quickfix.LogUtil;
import quickfix.Message;
import quickfix.Session;
import quickfix.SessionID;
import quickfix.SystemTime;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import quickfix.*;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -38,8 +37,11 @@
* Processes messages for all sessions in a single thread.
*/
public class SingleThreadedEventHandlingStrategy implements EventHandlingStrategy {
private static final Logger LOG = LoggerFactory.getLogger(EventHandlingStrategy.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG shouldn't be required anymore now.


public static final String MESSAGE_PROCESSOR_THREAD_NAME = "QFJ Message Processor";
private final BlockingQueue<SessionMessageEvent> eventQueue;
private final QueueTracker<SessionMessageEvent> queueTracker;
private final SessionConnector sessionConnector;
private volatile ThreadAdapter messageProcessingThread;
private volatile boolean isStopped;
Expand All @@ -49,6 +51,39 @@ public class SingleThreadedEventHandlingStrategy implements EventHandlingStrateg
public SingleThreadedEventHandlingStrategy(SessionConnector connector, int queueCapacity) {
sessionConnector = connector;
eventQueue = new LinkedBlockingQueue<>(queueCapacity);
queueTracker = QueueTracker.wrap(eventQueue);
}

public SingleThreadedEventHandlingStrategy(SessionConnector connector, int queueLowerWatermark, int queueUpperWatermark) {
sessionConnector = connector;
eventQueue = new LinkedBlockingQueue<>();
queueTracker = WatermarkTracker.newMulti(eventQueue, queueLowerWatermark, queueUpperWatermark,
evt -> evt.quickfixSession,
qfSession -> { // lower watermark crossed down, while reads suspended
final IoSession ioSession = lookupIoSession(qfSession);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The onWatermarkCrossed actions are duplicated in SingleThreadedEventHandlingStrategy and ThreadPerSessionEventHandlingStrategy. Can this be unified somehow?

if (ioSession != null && ioSession.isReadSuspended()) {
ioSession.resumeRead();
LOG.info("{}: inbound queue size < lower watermark ({}), socket reads resumed",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO this should be DEBUG logging.

Copy link
Contributor Author

@mrbald mrbald Jan 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They point to a slow consumer problem - something to be picked up by prod monitoring and actioned upon, therefore definitely not debug. Also, if/when they are logged frequently (up/down/up/down) it is a sign that they should be reconfigured. But I understand there may be established rules for what to log when in QuickFIX, so happy to change. Let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. But could you please use qfSession.getLog().onEvent( logMsg )? That way you don't have to specify the sessionID and the message gets written to the correct event log file. Please note that you cannot use "{ }" in the log String then.

qfSession.getSessionID(), queueLowerWatermark);
}
},
qfSession -> { // upper watermark crossed up, while reads active
final IoSession ioSession = lookupIoSession(qfSession);
if (ioSession != null && !ioSession.isReadSuspended()) {
ioSession.suspendRead();
LOG.info("{}: inbound queue size > upper watermark ({}), socket reads suspended",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO this should be DEBUG logging.

Copy link
Contributor Author

@mrbald mrbald Jan 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They point to a slow consumer problem - something to be picked up by prod monitoring and actioned upon, therefore definitely not debug. Also, if/when they are logged frequently (up/down/up/down) it is a sign that they should be reconfigured. But I understand there may be established rules for what to log when in QuickFIX, so happy to change. Let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. But could you please use qfSession.getLog().onEvent( logMsg )? That way you don't have to specify the sessionID and the message gets written to the correct event log file. Please note that you cannot use "{ }" in the log String then.

qfSession.getSessionID(), queueUpperWatermark);
}
});
}
private static IoSession lookupIoSession(Session qfSession) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a similar method in the other EventHandlingStrategy. Can this be moved to the super-interface maybe?

Copy link
Contributor Author

@mrbald mrbald Jan 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be possible. static or default method on the EventHandlingStrategy interface sounds ok?

final Responder responder = qfSession.getResponder();

if (responder instanceof IoSessionResponder) {
return ((IoSessionResponder)responder).getIoSession();
} else {
return null;
}
}

public void setExecutor(Executor executor) {
Expand All @@ -61,7 +96,7 @@ public void onMessage(Session quickfixSession, Message message) {
return;
}
try {
eventQueue.put(new SessionMessageEvent(quickfixSession, message));
queueTracker.put(new SessionMessageEvent(quickfixSession, message));
} catch (InterruptedException e) {
isStopped = true;
throw new RuntimeException(e);
Expand All @@ -79,7 +114,7 @@ public void block() {
if (isStopped) {
if (!eventQueue.isEmpty()) {
final List<SessionMessageEvent> tempList = new ArrayList<>();
eventQueue.drainTo(tempList);
queueTracker.drainTo(tempList);
for (SessionMessageEvent event : tempList) {
event.processMessage();
}
Expand Down Expand Up @@ -107,7 +142,7 @@ public void block() {
}

private SessionMessageEvent getMessage() throws InterruptedException {
return eventQueue.poll(THREAD_WAIT_FOR_MESSAGE_MS, TimeUnit.MILLISECONDS);
return queueTracker.poll(THREAD_WAIT_FOR_MESSAGE_MS, TimeUnit.MILLISECONDS);
}

/**
Expand Down
Loading