Skip to content

QFJ-943: optional watermarks-based back pressure propagation from inbo… #168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 6, 2018

Conversation

mrbald
Copy link
Contributor

@mrbald mrbald commented Jan 28, 2018

The background for this change is described in the QFJ-943

To make sure writes to the inbound FIX Message queue don't block the MINA IO loop a blocking queue can be replaced with watermarks-based (configured with lower and upper watermark values instead of fixed queue capacity.

When using watermarks the queue is no longer limited in size.
Instead, when queue grows above the upper watermark, the socket reads of the respective session are suspended by unsubscribing from reads with MINA selector (IoSession.suspendRead()), nicely propagating back pressure onto the remote end of the TCP session by the TCP flow control.

When the queue drains below the lower watermark the socket is resubscribed for reads (IoSession.resumeRead()and the back pressure is relieved.

The functionality is implemented with a standalone class WatermarkTracker.

Watermarks - enabled queue in SPSC scenario shows somewhat near 300ns enqueue/dequeue times, which should not be a problem considering general latencies of QuickFIXj, so I left it with JDK collections.

The implementation is sandboxed / benchmarked in https://github.com/mrbald/ufwj. Additionally, the threaded initiator tested under load in a prod-like environment

NB: watermarks do not save the day if one side of the fix session outperforms the other side on the average. If socket reads get disabled for long enough the receiver starts missing heartbeats and eventually drops the link.

@chrjohn
Copy link
Member

chrjohn commented Jan 30, 2018

Hi @mrbald , thanks for the PR and the benchmarks you did.
Will take a more thorough look at it soon.
Cheers
Chris.

long lowerWatermark, long upperWatermark,
Function<E, S> classifier,
Consumer<S> onLowerWatermarkCrossed, Consumer<S> onUpperWatermarkCrossed) {
assert lowerWatermark >= 0 && lowerWatermark < upperWatermark;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a fan of assertions in non-test code. Assertions are disabled by default anyway. Could you please remove them? Thanks

Copy link
Contributor Author

@mrbald mrbald Jan 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a documenting assertion (as you said, they are disabled in prod). Anyway, I'll remove in the next commit.

BlockingQueue<E> queue,
long lowerWatermark, long upperWatermark,
Runnable onLowerWatermarkCrossed, Runnable onUpperWatermarkCrossed) {
assert lowerWatermark >= 0 && lowerWatermark < upperWatermark;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a fan of assertions in non-test code. Assertions are disabled by default anyway. Could you please remove them? Thanks

Copy link
Contributor Author

@mrbald mrbald Jan 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a documenting assertion (as you said, they are disabled in prod). Anyway, I'll remove in the next commit.

******************************************************************************/

/*
* Copyright (c) 2018 Vladimir Lysyy (mrbald@github)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we can mix different licenses and copyrights here. Will have to check.

Copy link
Contributor Author

@mrbald mrbald Jan 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to "donate" it to any license, and strip this comment. Will include in the next commit.

}
});
}
private static IoSession lookupIoSession(Session qfSession) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a similar method in the other EventHandlingStrategy. Can this be moved to the super-interface maybe?

Copy link
Contributor Author

@mrbald mrbald Jan 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be possible. static or default method on the EventHandlingStrategy interface sounds ok?

final IoSession ioSession = lookupIoSession(qfSession);
if (ioSession != null && ioSession.isReadSuspended()) {
ioSession.resumeRead();
LOG.info("{}: inbound queue size < lower watermark ({}), socket reads resumed",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO this should be DEBUG logging.

Copy link
Contributor Author

@mrbald mrbald Jan 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They point to a slow consumer problem - something to be picked up by prod monitoring and actioned upon, therefore definitely not debug. Also, if/when they are logged frequently (up/down/up/down) it is a sign that they should be reconfigured. But I understand there may be established rules for what to log when in QuickFIX, so happy to change. Let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. But could you please use qfSession.getLog().onEvent( logMsg )? That way you don't have to specify the sessionID and the message gets written to the correct event log file. Please note that you cannot use "{ }" in the log String then.

final IoSession ioSession = lookupIoSession(qfSession);
if (ioSession != null && !ioSession.isReadSuspended()) {
ioSession.suspendRead();
LOG.info("{}: inbound queue size > upper watermark ({}), socket reads suspended",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO this should be DEBUG logging.

Copy link
Contributor Author

@mrbald mrbald Jan 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They point to a slow consumer problem - something to be picked up by prod monitoring and actioned upon, therefore definitely not debug. Also, if/when they are logged frequently (up/down/up/down) it is a sign that they should be reconfigured. But I understand there may be established rules for what to log when in QuickFIX, so happy to change. Let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. But could you please use qfSession.getLog().onEvent( logMsg )? That way you don't have to specify the sessionID and the message gets written to the correct event log file. Please note that you cannot use "{ }" in the log String then.

</goals>
<configuration>
<sources>
<source>${project.build.directory}/generated-sources</source>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this add all the generated source files to the JAR file? Is this needed? They are generated on a fresh build anyway.

Copy link
Contributor Author

@mrbald mrbald Jan 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this helps IntelliJ automatically detect the generated files (this was the reason), if they also get added to the *-sources.jar I'd love it too as generated or not, sometimes it's nice to click down to sources to see what they do.

Having said the above - if there are reservations about this change, let's drop it. Let me know then - I'll update the pull request.

@chrjohn chrjohn added this to the QFJ 2.1.0 milestone Jan 30, 2018
@mrbald
Copy link
Contributor Author

mrbald commented Jan 30, 2018

@chrjohn - pushed the requested changes. My production code is on 1.6.x - I'd like it in a 1.6.x patch, if you plan to do more of those. Would that need a pull request into the 1.6 branch?

@chrjohn chrjohn changed the title QFJ-943: optional watermarks-based back pressure propagaion from inbo… QFJ-943: optional watermarks-based back pressure propagation from inbo… Feb 1, 2018
@chrjohn
Copy link
Member

chrjohn commented Feb 1, 2018

Hi @mrbald , there will not be any 1.6.x release anymore. Even this change is probably more a 2.1.x change (than 2.0.x) since it introduces new functionality.

@chrjohn
Copy link
Member

chrjohn commented Feb 5, 2018

Hi @mrbald ,
regarding the benchmarks. How do I run them? I tried the usual JMH way with building the JAR and then doing java -jar <benchmarkfile> but it did not work.
Thanks,
Chris.

@mrbald
Copy link
Contributor Author

mrbald commented Feb 6, 2018

Hi @chrjohn,

Benchmarks - I just run them from the IDE - open the benchmark source code (say net/bobah/ufwj/queue/WatermarkTrackerBenchmark.java, and "Run as Java Application...". It forks the JVM, so the fact or running from the IDE should not disturb numbers.

Importantly, JMH does code generation during compilation so the IDE should be asked to do the same when building (IntelliJ does if the repo is imported as Maven project).

It should be possible to run the same class from the command line, or with an appropriate maven plugin as well, just that I haven't done it that way in that repo.

Let me know if you have problems running it, I'll get something command-line based to work.

if (ioSession != null && ioSession.isReadSuspended()) {
ioSession.resumeRead();
LOG.info("{}: inbound queue size < lower watermark ({}), socket reads resumed",
quickfixSession.getSessionID(), queueLowerWatermark);
Copy link
Member

@chrjohn chrjohn Feb 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments on logging in SingleThreadedEventHandlingStrategy.

@chrjohn
Copy link
Member

chrjohn commented Feb 14, 2018

Hi @mrbald ,
I use Netbeans but I had to change the POM for it to work but looking good now.
Thanks

@mrbald
Copy link
Contributor Author

mrbald commented Feb 16, 2018

@chrjohn, logging updated, please review. if you don't mind, can you please share the changes you needed to do to the pom file to run the benchmarks?

Copy link
Member

@chrjohn chrjohn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, did a review once more. Thanks for checking.

******************************************************************************/

/*
* Copyright (c) 2018 Vladimir Lysyy (mrbald@github)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please remove this copyright/license? You can use "@author" if desired. Thanks

/**
* Processes messages for all sessions in a single thread.
*/
public class SingleThreadedEventHandlingStrategy implements EventHandlingStrategy {
private static final Logger LOG = LoggerFactory.getLogger(EventHandlingStrategy.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG shouldn't be required anymore now.

/**
* Processes messages in a session-specific thread.
*/
public class ThreadPerSessionEventHandlingStrategy implements EventHandlingStrategy {
private static final Logger LOG = LoggerFactory.getLogger(EventHandlingStrategy.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG shouldn't be required anymore now.

queueTracker = WatermarkTracker.newMulti(eventQueue, queueLowerWatermark, queueUpperWatermark,
evt -> evt.quickfixSession,
qfSession -> { // lower watermark crossed down, while reads suspended
final IoSession ioSession = lookupIoSession(qfSession);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The onWatermarkCrossed actions are duplicated in SingleThreadedEventHandlingStrategy and ThreadPerSessionEventHandlingStrategy. Can this be unified somehow?

@chrjohn
Copy link
Member

chrjohn commented Feb 16, 2018

Hi @mrbald ,
I had to add the following to the benchmark POM:

<plugin>
               <groupId>org.codehaus.mojo</groupId>
               <artifactId>exec-maven-plugin</artifactId>
               <executions>
                   <execution>
                       <id>run-benchmarks</id>
                       <phase>integration-test</phase>
                       <goals>
                           <goal>exec</goal>
                       </goals>
                       <configuration>
                           <classpathScope>test</classpathScope>
                           <executable>java</executable>
                           <arguments>
                               <argument>-classpath</argument>
                               <classpath />
                               <argument>org.openjdk.jmh.Main</argument>
                               <argument>.*</argument>
                           </arguments>
                       </configuration>
                   </execution>
               </executions>
           </plugin>

This ran the benchmarks on build. I could then also run the benchmarks alone because the classes and information in META-INF were generated. I had the following error without these changes:
Unable to find the resource: /META-INF/BenchmarkList

Another question: which one of the benchmarks is the one where the "threaded initiator tested under load in a prod-like environment" (as mentioned in description of this PR)?

Thanks in advance!

@chrjohn chrjohn merged commit 746a35b into quickfix-j:master Mar 6, 2018
@chrjohn
Copy link
Member

chrjohn commented Mar 7, 2018

Thanks @mrbald

@mrbald mrbald deleted the jira/QFJ-943 branch November 9, 2018 22:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants