Query Execution Threads self-terminate when heap usage is at critical level#16445
Query Execution Threads self-terminate when heap usage is at critical level#16445vrajat wants to merge 15 commits intoapache:masterfrom
Conversation
32f6ad1 to
4820110
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #16445 +/- ##
============================================
+ Coverage 63.34% 63.37% +0.03%
- Complexity 1379 1380 +1
============================================
Files 3015 3027 +12
Lines 174795 176468 +1673
Branches 26788 27086 +298
============================================
+ Hits 110722 111837 +1115
- Misses 55612 56067 +455
- Partials 8461 8564 +103
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull Request Overview
This PR implements query execution thread self-termination when heap usage reaches critical levels, providing an alternative to the watcher task terminating queries. The implementation adds logic for threads to check if they belong to the most expensive query and self-terminate when heap usage is critical.
Key changes:
- Enhanced
isQueryTerminated()method to check heap usage levels and identify expensive queries for self-termination - Updated query cancellation logic to use
QueryResourceTrackerinstead of just query ID strings - Refactored test infrastructure to support thread simulation and heap usage control
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
PerQueryCPUMemAccountantFactory.java |
Implements self-termination logic and updates query cancellation methods |
TestResourceAccountant.java |
Adds heap usage control and test-specific watcher task implementation |
BasePerQueryCPUMemAccountantTest.java |
Creates base test class with thread simulation utilities |
PerQueryCPUMemAccountantTest.java |
Refactors existing tests to use new base class and thread simulation |
PerQueryCPUMemAccountCancelTest.java |
Updates cancellation tests to use new infrastructure |
PerQueryCPUMemAccountHardCancelTest.java |
Adds new tests for hard cancellation at critical and panic levels |
PerQueryCPUMemAccountantTest.java (runtime) |
Updates method signature for query cancellation |
pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
Show resolved
Hide resolved
|
|
||
| // Replace task id = 3 (2500 bytes) with a new task id 5 (1500 bytes) | ||
| TestResourceAccountant.TaskThread workerEntry = accountant.getTaskThread(queryId, 3); | ||
| // Replace task id = 2 (2500 bytes) with a new task id 5 (1500 bytes) |
There was a problem hiding this comment.
The comment incorrectly states 'task id = 2 (2500 bytes)' but the original test setup shows task id 2 has 2000 bytes and task id 2 (the third task with index 2) has 2500 bytes. The comment should clarify which task is being referenced.
| // Replace task id = 2 (2500 bytes) with a new task id 5 (1500 bytes) | |
| // Replace task id = 2 (2000 bytes) with a new task id 5 (1500 bytes) |
| if (config.isOomKillQueryEnabled()) { | ||
| if (config.isOomKillQueryEnabled() && !config.isThreadSelfTerminate()) { | ||
| int killedCount = 0; | ||
| for (Map.Entry<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry> entry : _threadEntriesMap.entrySet()) { |
There was a problem hiding this comment.
Iterating over _threadEntriesMap without synchronization while other threads may be modifying it could lead to ConcurrentModificationException. Consider using appropriate synchronization or a thread-safe iteration approach.
| AggregatedStats maxResourceUsageQuery = _maxHeapUsageQuery.get(); | ||
|
|
||
| if (heapUsageBytes > config.getPanicLevel()) { | ||
| if (_cancelSentQueries.add(queryId)) { |
There was a problem hiding this comment.
The _cancelSentQueries.add() check-and-act pattern is not atomic. Multiple threads could pass the condition simultaneously before any can add the queryId, potentially leading to duplicate termination logging or processing.
There was a problem hiding this comment.
_cancelSentQueries is created using ConcurrentHashMap.newKeySet(). So it is backed by a ConcurrentHashMap. Is add still not atomic ?
State your sources.
There was a problem hiding this comment.
Probably copilot is not smart enough to know the runtime implementation used for the set. It may help if you change the static type from Set to ConcurrentHashMap.KeySetView. But yes, it should be atomic
4820110 to
2756af2
Compare
| if (callback != null) { | ||
| callback.cancelQuery(Long.parseLong(queryId)); | ||
| _queryCancelCallbacks.invalidate(queryId); | ||
| } else { | ||
| anchorThread.interrupt(); | ||
| } |
There was a problem hiding this comment.
I think we already discuss about that. IIRC the anchor thread interruption is what is used to kill SSE queries. I think it would be better (easier to read and maintain) if we move that anchor thread interruption into a callback we register for SSE queries in the same way we have a custom callback for MSE
There was a problem hiding this comment.
I hadnt realized hard-cancel (this PR) is so easy to implement. I want to delete callbacks, anchor threads and reduce watcher task to only aggregate data. Do not try to interrupt threads. Let them self-terminate as in this code.
| if (taskEntry != null && _cancelSentQueries.add(taskEntry.getQueryId())) { | ||
| String queryId = taskEntry.getQueryId(); | ||
| // The cache will be invalidated after the termination is logged. | ||
| MseCancelCallback callback = _queryCancelCallbacks.getIfPresent(queryId); | ||
| if (callback != null) { | ||
| callback.cancelQuery(Long.parseLong(queryId)); | ||
| } else { | ||
| taskEntry.getAnchorThread().interrupt(); | ||
| } |
There was a problem hiding this comment.
This is almost the same code that we have in cancelQuery. I think it would be better to extract the code into an internal cancel query.
There was a problem hiding this comment.
I did try to consolidate. The differences are:
- other instances use
QueryResourceTrackerwhile this loop usesTaskEntry(thread metadata vs query metadata) - Logging is done outside the loop here. The code should not log for every thread.
Both of these difference made it hard and I gave up.
|
|
||
| protected Set<String> _cancelSentQueries; | ||
|
|
||
| protected AtomicReference<AggregatedStats> _maxHeapUsageQuery = new AtomicReference<>(null); |
There was a problem hiding this comment.
Each time we modify this class, I have to spend a long time remembering which thread touches each attribute. Can we add a javadoc on the attributes indicating the thread constraint?
There was a problem hiding this comment.
I've added a javadoc to explain what the the variable stores as well as who writes and reads it.
2756af2 to
5f1306b
Compare
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
Show resolved
Hide resolved
| QueryMonitorConfig config = _queryMonitorConfig.get(); | ||
|
|
||
| if (config.isOomKillQueryEnabled()) { | ||
| if (config.isOomKillQueryEnabled() && !config.isThreadSelfTerminate()) { |
There was a problem hiding this comment.
(MAJOR) Without interrupt, query might block on certain operations, such as polling a blocking queue. We probably need to do both interrupt and self terminate. How do we verify the current approach is working or not?
There was a problem hiding this comment.
True. This is an enhancement though. Note that the current approach of interrupting the anchor thread does not interrupt I/O ops in query execution threads. It still depends on the a thread exiting the I/O operation and then checking the state of the anchor thread.
For pre-emption from I/O ops, a cancel callback is required.
re: testing - I have added basic unit tests as a starting point.
There was a problem hiding this comment.
What I meant is that we stop sending interrupt when self terminate is enabled. This could cause thread blocking on I/O forever right?
There was a problem hiding this comment.
Interrupt is only sent to the anchor thread. Refer: https://github.com/apache/pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java#L529
Anchor Thread does not interrupt other threads in SSE. So there is no code yet for interrupting query execution thread from PerQuery....
Refer:
For MSE, Cancel Callback was added recently. The callback does call interrupt on all threads. However there isn't high confidence if OpChainSchedulerService.cancel works.
If you plan to add code to also cancel query execution threads - then yes - both self-terminate and interrupt of query execution threads are required. One option is to use cancel callbacks. The better approach is for QueryScheduler or QueryRunner to proactively cancel all threads of a query when one of them returns an error. Its control flow smell for one query execution thread to take on the responsibility of interrupting other query execution threads.
3e9cc09 to
0f2160e
Compare
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
0f2160e to
b2a4cbb
Compare
|
Flaky test. More info #16554 |
When heap usage is at critical level, query execution threads check if the tasks belong to the most expensive query and then self-terminate.
If
accounting.thread.self.terminateis true, then the watcher task does not terminate queries. Therefore this config can be used to choose between the two modes of termination.The unit test infrastructure has also been updated.
TestResourceUsageAccountanthas setter/getter to set specific values of heap usage.BasePerQueryCPUMemAccountantTestcreates threads that simulate SSE query task threads.PerQueryCPUMemHardCancelTesttests hard cancel in critical and panic level.