-
Notifications
You must be signed in to change notification settings - Fork 356
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: Use BlockingQueue for predictions #9107
Conversation
Will have a look on Monday. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you chose a good plan of addressing the issue. I am a bit surprised by your performance tests results. It makes sense to me that a very small queue isn't as good but a fixed capacity queue of 10-20 when backed by an array should at least be as good as capacity 5. This is a bit odd. Anyhow, good you did some tests and I think the idea now is a lot more clear and therefore easier to maintain. Hopefully this does fix the hickups on CI.
There were some smaller points but nothing that necessarily must be addressed now.
...dhis-service-reporting/src/main/java/org/hisp/dhis/predictor/PredictionDataValueFetcher.java
Outdated
Show resolved
Hide resolved
...dhis-service-reporting/src/main/java/org/hisp/dhis/predictor/PredictionDataValueFetcher.java
Outdated
Show resolved
Hide resolved
Kudos, SonarCloud Quality Gate passed! |
* refactor: Use BlockingQueue for predictions * fix: SonarCloud errors * fix: respond to minor review comments (cherry picked from commit e5d7c0c)
* refactor: Use BlockingQueue for predictions * fix: SonarCloud errors * fix: respond to minor review comments (cherry picked from commit e5d7c0c)
Background
PredictionDataValueFetcher
uses a second thread to fetch (deflated) aggregateDataValues
. This is done for performance reasons. It allows a single SQL query to fetch all the aggregate input data for a predictor in a new thread, and pipelines the data into the prediction engine running in the main thread.Refactor Purpose
This refactor targets the code within
PredictionDataValueFetcher
that coordinates between the two threads. It had used a callback method fromHibernateDataValueStore
to pass the values on the second thread back intoPredictionDataValueFetcher
, and used producer and consumerSemaphores
to coordinate between the two threads. It now uses aBlockingQueue
, so the (deflated) data values can be queued byHibernateDataValueStore
from the second thread and dequeued byPredictionDataValueFetcher
on the primary thread.There are two reasons for this refactor: (1) it attempts to address occasional
PredictionServiceTest
integration test fails during builds (and the more serious possibility that the code might malfunction on a production instance), and (2) it makes better use of Java concurrency features for more reliable and maintainable code, instead of hand-craftedSemaphore
logic.Integration test failures
I was unable to reproduce the integration test failures on my machine. I've been collecting some stack traces over the last few months, and the most frequent failing test I've seen is
PredictionServiceTest.testPredictSequential
. I tried running this test in a loop overnight (700,000+ runs) but it did not fail once. It appears that there is some quality of the build test environment that is not present on my machine, that affects this test in some way, possibly a race condition between the threads that was not properly handled by the code.I also saw from the build environment one failure of
PredictionDataValueFetcherTest.testGetDataValuesWithFastDb
. This test has been removed from trunk (with no explanation as to why), but it was originally written to verify that some possible race conditions between the producer and consumer threads were properly handled. I copied this test from the old code and also ran it in a loop, 20,000+ times. It did not fail on my machine.I had hoped to find a way to reproduce the test failure on my machine, so I could then attempt a fix and rerun the test to see if the fix worked. I was not able to do this. It is possible that the tests were failing because of a logic error in handling the
Semaphores
. By replacingSemaphores
withBlockingQueue
, I hope that this will prevent the integration test failures, but I have no way to know this.What the new code does
At first, I looked for ways of using
Future
orCompletableFuture
as replacement forSemaphores
. I did not find a natural way to use them for the producer-consumer pattern needed here. But as I was looking around at other concurrency mechanisms,BlockingQueue
seemed to be a good fit for this pattern. One thread can supply data to the queue (blocked until there is space), and the other thread can remove the data (blocked until data has been supplied).A special, static
DeflatedDataValue
instanceDataValueStore.END_OF_DDV_DATA
is used as an End of file marker in the queue. After all theDeflatedDataValues
have been posted byHibernateDataValueStore.getDeflatedDataValues
, it then posts this special value. This is detected byPredictionDataValueFetcher.getNextDataValue
by testing the object reference using==
.Calls to queue and dequeue the data use timeouts instead of waiting indefinitely for the queue. On the producer side, this is to prevent the thread from waiting forever in case the main thread has been unexpectedly terminated. On the consumer side, this is to prevent the main thread from waiting forever if the producer thread has terminated for some reason without queueing the next value (although see the next paragraph). The wait time is set to 10 minutes (the same amount of time that the original code waited to clean up orphan threads.) This seems on the long side, but should be plenty long to not interfere with successful operation while not leaving orphan threads hanging around forever.
However, any unexpected producer thread runtime exceptions are caught, and stored in a variable that the consumer thread can read, and then
END_OF_DDV_DATA
is queued in case the consumer is blocking waiting for the next entry. The consumer then re-throws the exception that was caught on the producer thread. (This functionality is unchanged by this refactor.)Performance
The new code using
BlockingQueue
causes predictors in the tests I ran to perform very slightly worse (about 1% worse) than the previous code usingSemaphores
, even after tweaking theBlockingQueue
size to optimize its performance. This was a surprise to me, but something in the queueing and/or synchronization code ofBlockingQueue
appears to perform slightly worse than using theSemaphores
directly.A
BlockingQueue
maximum size of 1 would be perfectly adequate for this application. (TheSemaphore
-based code just passed one value at a time between threads.) In a test with a realistically large system, a max queue size of 5 performed better than 1 or 2. It also performed better than 10 or 20. It appears that having more than 1 or 2 entries benefits from the parallelism of moving the next values from theSqlRowSet
into the queue at the same time as earlier values are being processed by the prediction engine on the other thread. But it appears that having 10 or 20 entries has no further benefit from the parallelism while requiring more time to manage the queue. The implementation uses a max queue size of 5 for best performance, which still makes it about 1% slower than theSemaphore
implementation. This is a very small price to pay for code that is more reliable and maintainable.The
BlockingQueue
used isArrayBlockingQueue
. I also triedLinkedBlockingQueue
, but it performed less well in the tests I ran.