Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Use BlockingQueue for predictions #9107

Merged
merged 3 commits into from
Oct 25, 2021
Merged

Conversation

jimgrace
Copy link
Member

@jimgrace jimgrace commented Oct 21, 2021

Background

PredictionDataValueFetcher uses a second thread to fetch (deflated) aggregate DataValues. This is done for performance reasons. It allows a single SQL query to fetch all the aggregate input data for a predictor in a new thread, and pipelines the data into the prediction engine running in the main thread.

Refactor Purpose

This refactor targets the code within PredictionDataValueFetcher that coordinates between the two threads. It had used a callback method from HibernateDataValueStore to pass the values on the second thread back into PredictionDataValueFetcher, and used producer and consumer Semaphores to coordinate between the two threads. It now uses a BlockingQueue, so the (deflated) data values can be queued by HibernateDataValueStore from the second thread and dequeued by PredictionDataValueFetcher on the primary thread.

There are two reasons for this refactor: (1) it attempts to address occasional PredictionServiceTest integration test fails during builds (and the more serious possibility that the code might malfunction on a production instance), and (2) it makes better use of Java concurrency features for more reliable and maintainable code, instead of hand-crafted Semaphore logic.

Integration test failures

I was unable to reproduce the integration test failures on my machine. I've been collecting some stack traces over the last few months, and the most frequent failing test I've seen is PredictionServiceTest.testPredictSequential. I tried running this test in a loop overnight (700,000+ runs) but it did not fail once. It appears that there is some quality of the build test environment that is not present on my machine, that affects this test in some way, possibly a race condition between the threads that was not properly handled by the code.

I also saw from the build environment one failure of PredictionDataValueFetcherTest.testGetDataValuesWithFastDb. This test has been removed from trunk (with no explanation as to why), but it was originally written to verify that some possible race conditions between the producer and consumer threads were properly handled. I copied this test from the old code and also ran it in a loop, 20,000+ times. It did not fail on my machine.

I had hoped to find a way to reproduce the test failure on my machine, so I could then attempt a fix and rerun the test to see if the fix worked. I was not able to do this. It is possible that the tests were failing because of a logic error in handling the Semaphores. By replacing Semaphores with BlockingQueue, I hope that this will prevent the integration test failures, but I have no way to know this.

What the new code does

At first, I looked for ways of using Future or CompletableFuture as replacement for Semaphores. I did not find a natural way to use them for the producer-consumer pattern needed here. But as I was looking around at other concurrency mechanisms, BlockingQueue seemed to be a good fit for this pattern. One thread can supply data to the queue (blocked until there is space), and the other thread can remove the data (blocked until data has been supplied).

A special, static DeflatedDataValue instance DataValueStore.END_OF_DDV_DATA is used as an End of file marker in the queue. After all the DeflatedDataValues have been posted by HibernateDataValueStore.getDeflatedDataValues, it then posts this special value. This is detected by PredictionDataValueFetcher.getNextDataValue by testing the object reference using ==.

Calls to queue and dequeue the data use timeouts instead of waiting indefinitely for the queue. On the producer side, this is to prevent the thread from waiting forever in case the main thread has been unexpectedly terminated. On the consumer side, this is to prevent the main thread from waiting forever if the producer thread has terminated for some reason without queueing the next value (although see the next paragraph). The wait time is set to 10 minutes (the same amount of time that the original code waited to clean up orphan threads.) This seems on the long side, but should be plenty long to not interfere with successful operation while not leaving orphan threads hanging around forever.

However, any unexpected producer thread runtime exceptions are caught, and stored in a variable that the consumer thread can read, and then END_OF_DDV_DATA is queued in case the consumer is blocking waiting for the next entry. The consumer then re-throws the exception that was caught on the producer thread. (This functionality is unchanged by this refactor.)

Performance

The new code using BlockingQueue causes predictors in the tests I ran to perform very slightly worse (about 1% worse) than the previous code using Semaphores, even after tweaking the BlockingQueue size to optimize its performance. This was a surprise to me, but something in the queueing and/or synchronization code of BlockingQueue appears to perform slightly worse than using the Semaphores directly.

A BlockingQueue maximum size of 1 would be perfectly adequate for this application. (The Semaphore-based code just passed one value at a time between threads.) In a test with a realistically large system, a max queue size of 5 performed better than 1 or 2. It also performed better than 10 or 20. It appears that having more than 1 or 2 entries benefits from the parallelism of moving the next values from the SqlRowSet into the queue at the same time as earlier values are being processed by the prediction engine on the other thread. But it appears that having 10 or 20 entries has no further benefit from the parallelism while requiring more time to manage the queue. The implementation uses a max queue size of 5 for best performance, which still makes it about 1% slower than the Semaphore implementation. This is a very small price to pay for code that is more reliable and maintainable.

The BlockingQueue used is ArrayBlockingQueue. I also tried LinkedBlockingQueue, but it performed less well in the tests I ran.

@jbee
Copy link
Contributor

jbee commented Oct 22, 2021

Will have a look on Monday.

Copy link
Contributor

@jbee jbee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you chose a good plan of addressing the issue. I am a bit surprised by your performance tests results. It makes sense to me that a very small queue isn't as good but a fixed capacity queue of 10-20 when backed by an array should at least be as good as capacity 5. This is a bit odd. Anyhow, good you did some tests and I think the idea now is a lot more clear and therefore easier to maintain. Hopefully this does fix the hickups on CI.

There were some smaller points but nothing that necessarily must be addressed now.

@sonarqubecloud
Copy link

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 1 Bug
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 0 Code Smells

0.0% 0.0% Coverage
0.0% 0.0% Duplication

@jimgrace jimgrace merged commit e5d7c0c into master Oct 25, 2021
@jimgrace jimgrace deleted the prediction-blockingqueue branch October 25, 2021 17:16
jimgrace added a commit that referenced this pull request Oct 25, 2021
* refactor: Use BlockingQueue for predictions

* fix: SonarCloud errors

* fix: respond to minor review comments

(cherry picked from commit e5d7c0c)
jimgrace added a commit that referenced this pull request Oct 25, 2021
* refactor: Use BlockingQueue for predictions

* fix: SonarCloud errors

* fix: respond to minor review comments

(cherry picked from commit e5d7c0c)
jimgrace added a commit that referenced this pull request Oct 25, 2021
* refactor: Use BlockingQueue for predictions

* fix: SonarCloud errors

* fix: respond to minor review comments

(cherry picked from commit e5d7c0c)
jimgrace added a commit that referenced this pull request Oct 25, 2021
* refactor: Use BlockingQueue for predictions

* fix: SonarCloud errors

* fix: respond to minor review comments

(cherry picked from commit e5d7c0c)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants