-
Notifications
You must be signed in to change notification settings - Fork 648
QFJ-943: optional watermarks-based back pressure propagation from inbo… #168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
2ea92ac
b6a7131
9cccdb3
a65915e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package quickfix; | ||
|
||
public abstract class AbstractSessionConnectorBuilder<Derived, Product> { | ||
private final Class<Derived> derived; | ||
Application application; | ||
MessageStoreFactory messageStoreFactory; | ||
SessionSettings settings; | ||
LogFactory logFactory; | ||
MessageFactory messageFactory; | ||
|
||
int queueCapacity = -1; | ||
int queueLowerWatermark = -1; | ||
int queueUpperWatermark = -1; | ||
|
||
AbstractSessionConnectorBuilder(Class<Derived> derived) { | ||
this.derived = derived; | ||
} | ||
|
||
public Derived withApplication(Application val) throws ConfigError { | ||
application = val; | ||
return derived.cast(this); | ||
} | ||
|
||
public Derived withMessageStoreFactory(MessageStoreFactory val) throws ConfigError { | ||
messageStoreFactory = val; | ||
return derived.cast(this); | ||
} | ||
|
||
public Derived withSettings(SessionSettings val) { | ||
settings = val; | ||
return derived.cast(this); | ||
} | ||
|
||
public Derived withLogFactory(LogFactory val) throws ConfigError { | ||
logFactory = val; | ||
return derived.cast(this); | ||
} | ||
|
||
public Derived withMessageFactory(MessageFactory val) throws ConfigError { | ||
messageFactory = val; | ||
return derived.cast(this); | ||
} | ||
|
||
public Derived withQueueCapacity(int val) throws ConfigError { | ||
if (queueLowerWatermark >= 0) { | ||
throw new ConfigError("queue capacity and watermarks may not be configured together"); | ||
} else if (queueCapacity < 0) { | ||
throw new ConfigError("negative queue capacity"); | ||
} | ||
queueCapacity = val; | ||
return derived.cast(this); | ||
} | ||
|
||
public Derived withQueueWatermarks(int lower, int upper) throws ConfigError { | ||
if (queueCapacity >= 0) { | ||
throw new ConfigError("queue capacity and watermarks may not be configured together"); | ||
} else if (queueLowerWatermark < 0 || queueUpperWatermark <= queueLowerWatermark) { | ||
throw new ConfigError("invalid queue watermarks, required: 0 <= lower watermark < upper watermark"); | ||
} | ||
queueLowerWatermark = lower; | ||
queueUpperWatermark = upper; | ||
return derived.cast(this); | ||
} | ||
|
||
public final Product build() throws ConfigError { | ||
if (logFactory == null) { | ||
logFactory = new ScreenLogFactory(settings); | ||
} | ||
|
||
return doBuild(); | ||
} | ||
|
||
protected abstract Product doBuild() throws ConfigError; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -95,4 +95,7 @@ public String getRemoteAddress() { | |
return null; | ||
} | ||
|
||
IoSession getIoSession() { | ||
return ioSession; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
package quickfix.mina; | ||
|
||
import java.util.Collection; | ||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
interface QueueTracker<E> { | ||
void put(E e) throws InterruptedException; | ||
E poll(long timeout, TimeUnit unit) throws InterruptedException; | ||
int drainTo(Collection<E> collection); | ||
|
||
static <E> QueueTracker<E> wrap(BlockingQueue<E> queue) { | ||
return new QueueTracker<E>() { | ||
@Override | ||
public void put(E e) throws InterruptedException { | ||
queue.put(e); | ||
} | ||
|
||
@Override | ||
public E poll(long timeout, TimeUnit unit) throws InterruptedException { | ||
return queue.poll(timeout, unit); | ||
} | ||
|
||
@Override | ||
public int drainTo(Collection<E> collection) { | ||
return queue.drainTo(collection); | ||
} | ||
}; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,11 +20,10 @@ | |
|
||
package quickfix.mina; | ||
|
||
import quickfix.LogUtil; | ||
import quickfix.Message; | ||
import quickfix.Session; | ||
import quickfix.SessionID; | ||
import quickfix.SystemTime; | ||
import org.apache.mina.core.session.IoSession; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import quickfix.*; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
@@ -38,8 +37,11 @@ | |
* Processes messages for all sessions in a single thread. | ||
*/ | ||
public class SingleThreadedEventHandlingStrategy implements EventHandlingStrategy { | ||
private static final Logger LOG = LoggerFactory.getLogger(EventHandlingStrategy.class); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LOG shouldn't be required anymore now. |
||
|
||
public static final String MESSAGE_PROCESSOR_THREAD_NAME = "QFJ Message Processor"; | ||
private final BlockingQueue<SessionMessageEvent> eventQueue; | ||
private final QueueTracker<SessionMessageEvent> queueTracker; | ||
private final SessionConnector sessionConnector; | ||
private volatile ThreadAdapter messageProcessingThread; | ||
private volatile boolean isStopped; | ||
|
@@ -49,6 +51,39 @@ public class SingleThreadedEventHandlingStrategy implements EventHandlingStrateg | |
public SingleThreadedEventHandlingStrategy(SessionConnector connector, int queueCapacity) { | ||
sessionConnector = connector; | ||
eventQueue = new LinkedBlockingQueue<>(queueCapacity); | ||
queueTracker = QueueTracker.wrap(eventQueue); | ||
} | ||
|
||
public SingleThreadedEventHandlingStrategy(SessionConnector connector, int queueLowerWatermark, int queueUpperWatermark) { | ||
sessionConnector = connector; | ||
eventQueue = new LinkedBlockingQueue<>(); | ||
queueTracker = WatermarkTracker.newMulti(eventQueue, queueLowerWatermark, queueUpperWatermark, | ||
evt -> evt.quickfixSession, | ||
qfSession -> { // lower watermark crossed down, while reads suspended | ||
final IoSession ioSession = lookupIoSession(qfSession); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The onWatermarkCrossed actions are duplicated in SingleThreadedEventHandlingStrategy and ThreadPerSessionEventHandlingStrategy. Can this be unified somehow? |
||
if (ioSession != null && ioSession.isReadSuspended()) { | ||
ioSession.resumeRead(); | ||
LOG.info("{}: inbound queue size < lower watermark ({}), socket reads resumed", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO this should be DEBUG logging. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They point to a slow consumer problem - something to be picked up by prod monitoring and actioned upon, therefore definitely not debug. Also, if/when they are logged frequently (up/down/up/down) it is a sign that they should be reconfigured. But I understand there may be established rules for what to log when in QuickFIX, so happy to change. Let me know. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. But could you please use |
||
qfSession.getSessionID(), queueLowerWatermark); | ||
} | ||
}, | ||
qfSession -> { // upper watermark crossed up, while reads active | ||
final IoSession ioSession = lookupIoSession(qfSession); | ||
if (ioSession != null && !ioSession.isReadSuspended()) { | ||
ioSession.suspendRead(); | ||
LOG.info("{}: inbound queue size > upper watermark ({}), socket reads suspended", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO this should be DEBUG logging. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They point to a slow consumer problem - something to be picked up by prod monitoring and actioned upon, therefore definitely not debug. Also, if/when they are logged frequently (up/down/up/down) it is a sign that they should be reconfigured. But I understand there may be established rules for what to log when in QuickFIX, so happy to change. Let me know. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. But could you please use |
||
qfSession.getSessionID(), queueUpperWatermark); | ||
} | ||
}); | ||
} | ||
private static IoSession lookupIoSession(Session qfSession) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a similar method in the other EventHandlingStrategy. Can this be moved to the super-interface maybe? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be possible. static or default method on the |
||
final Responder responder = qfSession.getResponder(); | ||
|
||
if (responder instanceof IoSessionResponder) { | ||
return ((IoSessionResponder)responder).getIoSession(); | ||
} else { | ||
return null; | ||
} | ||
} | ||
|
||
public void setExecutor(Executor executor) { | ||
|
@@ -61,7 +96,7 @@ public void onMessage(Session quickfixSession, Message message) { | |
return; | ||
} | ||
try { | ||
eventQueue.put(new SessionMessageEvent(quickfixSession, message)); | ||
queueTracker.put(new SessionMessageEvent(quickfixSession, message)); | ||
} catch (InterruptedException e) { | ||
isStopped = true; | ||
throw new RuntimeException(e); | ||
|
@@ -79,7 +114,7 @@ public void block() { | |
if (isStopped) { | ||
if (!eventQueue.isEmpty()) { | ||
final List<SessionMessageEvent> tempList = new ArrayList<>(); | ||
eventQueue.drainTo(tempList); | ||
queueTracker.drainTo(tempList); | ||
for (SessionMessageEvent event : tempList) { | ||
event.processMessage(); | ||
} | ||
|
@@ -107,7 +142,7 @@ public void block() { | |
} | ||
|
||
private SessionMessageEvent getMessage() throws InterruptedException { | ||
return eventQueue.poll(THREAD_WAIT_FOR_MESSAGE_MS, TimeUnit.MILLISECONDS); | ||
return queueTracker.poll(THREAD_WAIT_FOR_MESSAGE_MS, TimeUnit.MILLISECONDS); | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this add all the generated source files to the JAR file? Is this needed? They are generated on a fresh build anyway.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this helps IntelliJ automatically detect the generated files (this was the reason), if they also get added to the *-sources.jar I'd love it too as generated or not, sometimes it's nice to click down to sources to see what they do.
Having said the above - if there are reservations about this change, let's drop it. Let me know then - I'll update the pull request.