-
Notifications
You must be signed in to change notification settings - Fork 643
QFJ-943: optional watermarks-based back pressure propagation from inbo… #168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…und queue to the socket
Hi @mrbald , thanks for the PR and the benchmarks you did. |
long lowerWatermark, long upperWatermark, | ||
Function<E, S> classifier, | ||
Consumer<S> onLowerWatermarkCrossed, Consumer<S> onUpperWatermarkCrossed) { | ||
assert lowerWatermark >= 0 && lowerWatermark < upperWatermark; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not a fan of assertions in non-test code. Assertions are disabled by default anyway. Could you please remove them? Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a documenting assertion (as you said, they are disabled in prod). Anyway, I'll remove in the next commit.
BlockingQueue<E> queue, | ||
long lowerWatermark, long upperWatermark, | ||
Runnable onLowerWatermarkCrossed, Runnable onUpperWatermarkCrossed) { | ||
assert lowerWatermark >= 0 && lowerWatermark < upperWatermark; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not a fan of assertions in non-test code. Assertions are disabled by default anyway. Could you please remove them? Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a documenting assertion (as you said, they are disabled in prod). Anyway, I'll remove in the next commit.
******************************************************************************/ | ||
|
||
/* | ||
* Copyright (c) 2018 Vladimir Lysyy (mrbald@github) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if we can mix different licenses and copyrights here. Will have to check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to "donate" it to any license, and strip this comment. Will include in the next commit.
} | ||
}); | ||
} | ||
private static IoSession lookupIoSession(Session qfSession) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a similar method in the other EventHandlingStrategy. Can this be moved to the super-interface maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be possible. static or default method on the EventHandlingStrategy
interface sounds ok?
final IoSession ioSession = lookupIoSession(qfSession); | ||
if (ioSession != null && ioSession.isReadSuspended()) { | ||
ioSession.resumeRead(); | ||
LOG.info("{}: inbound queue size < lower watermark ({}), socket reads resumed", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO this should be DEBUG logging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They point to a slow consumer problem - something to be picked up by prod monitoring and actioned upon, therefore definitely not debug. Also, if/when they are logged frequently (up/down/up/down) it is a sign that they should be reconfigured. But I understand there may be established rules for what to log when in QuickFIX, so happy to change. Let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. But could you please use qfSession.getLog().onEvent( logMsg )
? That way you don't have to specify the sessionID and the message gets written to the correct event log file. Please note that you cannot use "{ }" in the log String then.
final IoSession ioSession = lookupIoSession(qfSession); | ||
if (ioSession != null && !ioSession.isReadSuspended()) { | ||
ioSession.suspendRead(); | ||
LOG.info("{}: inbound queue size > upper watermark ({}), socket reads suspended", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO this should be DEBUG logging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They point to a slow consumer problem - something to be picked up by prod monitoring and actioned upon, therefore definitely not debug. Also, if/when they are logged frequently (up/down/up/down) it is a sign that they should be reconfigured. But I understand there may be established rules for what to log when in QuickFIX, so happy to change. Let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. But could you please use qfSession.getLog().onEvent( logMsg )
? That way you don't have to specify the sessionID and the message gets written to the correct event log file. Please note that you cannot use "{ }" in the log String then.
quickfixj-core/pom.xml
Outdated
</goals> | ||
<configuration> | ||
<sources> | ||
<source>${project.build.directory}/generated-sources</source> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this add all the generated source files to the JAR file? Is this needed? They are generated on a fresh build anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this helps IntelliJ automatically detect the generated files (this was the reason), if they also get added to the *-sources.jar I'd love it too as generated or not, sometimes it's nice to click down to sources to see what they do.
Having said the above - if there are reservations about this change, let's drop it. Let me know then - I'll update the pull request.
@chrjohn - pushed the requested changes. My production code is on 1.6.x - I'd like it in a 1.6.x patch, if you plan to do more of those. Would that need a pull request into the 1.6 branch? |
Hi @mrbald , there will not be any 1.6.x release anymore. Even this change is probably more a 2.1.x change (than 2.0.x) since it introduces new functionality. |
Hi @mrbald , |
Hi @chrjohn, Benchmarks - I just run them from the IDE - open the benchmark source code (say net/bobah/ufwj/queue/WatermarkTrackerBenchmark.java, and "Run as Java Application...". It forks the JVM, so the fact or running from the IDE should not disturb numbers. Importantly, JMH does code generation during compilation so the IDE should be asked to do the same when building (IntelliJ does if the repo is imported as Maven project). It should be possible to run the same class from the command line, or with an appropriate maven plugin as well, just that I haven't done it that way in that repo. Let me know if you have problems running it, I'll get something command-line based to work. |
if (ioSession != null && ioSession.isReadSuspended()) { | ||
ioSession.resumeRead(); | ||
LOG.info("{}: inbound queue size < lower watermark ({}), socket reads resumed", | ||
quickfixSession.getSessionID(), queueLowerWatermark); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comments on logging in SingleThreadedEventHandlingStrategy.
Hi @mrbald , |
@chrjohn, logging updated, please review. if you don't mind, can you please share the changes you needed to do to the pom file to run the benchmarks? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, did a review once more. Thanks for checking.
******************************************************************************/ | ||
|
||
/* | ||
* Copyright (c) 2018 Vladimir Lysyy (mrbald@github) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please remove this copyright/license? You can use "@author" if desired. Thanks
/** | ||
* Processes messages for all sessions in a single thread. | ||
*/ | ||
public class SingleThreadedEventHandlingStrategy implements EventHandlingStrategy { | ||
private static final Logger LOG = LoggerFactory.getLogger(EventHandlingStrategy.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOG shouldn't be required anymore now.
/** | ||
* Processes messages in a session-specific thread. | ||
*/ | ||
public class ThreadPerSessionEventHandlingStrategy implements EventHandlingStrategy { | ||
private static final Logger LOG = LoggerFactory.getLogger(EventHandlingStrategy.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOG shouldn't be required anymore now.
queueTracker = WatermarkTracker.newMulti(eventQueue, queueLowerWatermark, queueUpperWatermark, | ||
evt -> evt.quickfixSession, | ||
qfSession -> { // lower watermark crossed down, while reads suspended | ||
final IoSession ioSession = lookupIoSession(qfSession); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The onWatermarkCrossed actions are duplicated in SingleThreadedEventHandlingStrategy and ThreadPerSessionEventHandlingStrategy. Can this be unified somehow?
Hi @mrbald ,
This ran the benchmarks on build. I could then also run the benchmarks alone because the classes and information in META-INF were generated. I had the following error without these changes: Another question: which one of the benchmarks is the one where the "threaded initiator tested under load in a prod-like environment" (as mentioned in description of this PR)? Thanks in advance! |
…eftover mrbald copyright
Thanks @mrbald |
The background for this change is described in the QFJ-943
To make sure writes to the inbound FIX Message queue don't block the MINA IO loop a blocking queue can be replaced with watermarks-based (configured with lower and upper watermark values instead of fixed queue capacity.
When using watermarks the queue is no longer limited in size.
Instead, when queue grows above the upper watermark, the socket reads of the respective session are suspended by unsubscribing from reads with MINA selector (IoSession.suspendRead()), nicely propagating back pressure onto the remote end of the TCP session by the TCP flow control.
When the queue drains below the lower watermark the socket is resubscribed for reads (IoSession.resumeRead()and the back pressure is relieved.
The functionality is implemented with a standalone class WatermarkTracker.
Watermarks - enabled queue in SPSC scenario shows somewhat near 300ns enqueue/dequeue times, which should not be a problem considering general latencies of QuickFIXj, so I left it with JDK collections.
The implementation is sandboxed / benchmarked in https://github.com/mrbald/ufwj. Additionally, the threaded initiator tested under load in a prod-like environment
NB: watermarks do not save the day if one side of the fix session outperforms the other side on the average. If socket reads get disabled for long enough the receiver starts missing heartbeats and eventually drops the link.