Skip to content

Query Execution Threads self-terminate when heap usage is at critical level#16445

Open
vrajat wants to merge 15 commits intoapache:masterfrom
vrajat:rv-tracing-critical-hard-kill
Open

Query Execution Threads self-terminate when heap usage is at critical level#16445
vrajat wants to merge 15 commits intoapache:masterfrom
vrajat:rv-tracing-critical-hard-kill

Conversation

@vrajat
Copy link
Contributor

@vrajat vrajat commented Jul 29, 2025

When heap usage is at critical level, query execution threads check if the tasks belong to the most expensive query and then self-terminate.

If accounting.thread.self.terminate is true, then the watcher task does not terminate queries. Therefore this config can be used to choose between the two modes of termination.

The unit test infrastructure has also been updated.

  • TestResourceUsageAccountant has setter/getter to set specific values of heap usage.
  • BasePerQueryCPUMemAccountantTest creates threads that simulate SSE query task threads.
  • PerQueryCPUMemHardCancelTest tests hard cancel in critical and panic level.

@vrajat vrajat requested review from Copilot and xiangfu0 July 29, 2025 11:48

This comment was marked as outdated.

@vrajat vrajat marked this pull request as ready for review July 30, 2025 07:39
@vrajat vrajat force-pushed the rv-tracing-critical-hard-kill branch from 32f6ad1 to 4820110 Compare July 30, 2025 07:47
@codecov-commenter
Copy link

codecov-commenter commented Jul 30, 2025

Codecov Report

❌ Patch coverage is 71.64179% with 19 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.37%. Comparing base (058f7aa) to head (fe0d589).
⚠️ Report is 21 commits behind head on master.

Files with missing lines Patch % Lines
...re/accounting/PerQueryCPUMemAccountantFactory.java 71.64% 8 Missing and 11 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #16445      +/-   ##
============================================
+ Coverage     63.34%   63.37%   +0.03%     
- Complexity     1379     1380       +1     
============================================
  Files          3015     3027      +12     
  Lines        174795   176468    +1673     
  Branches      26788    27086     +298     
============================================
+ Hits         110722   111837    +1115     
- Misses        55612    56067     +455     
- Partials       8461     8564     +103     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.33% <71.64%> (+0.02%) ⬆️
java-21 63.28% <70.14%> (-0.02%) ⬇️
temurin 63.37% <71.64%> (+0.03%) ⬆️
unittests 63.37% <71.64%> (+0.03%) ⬆️
unittests1 56.47% <71.64%> (+0.06%) ⬆️
unittests2 33.18% <0.00%> (-0.22%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@vrajat vrajat requested review from Copilot and gortiz July 30, 2025 12:40
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements query execution thread self-termination when heap usage reaches critical levels, providing an alternative to the watcher task terminating queries. The implementation adds logic for threads to check if they belong to the most expensive query and self-terminate when heap usage is critical.

Key changes:

  • Enhanced isQueryTerminated() method to check heap usage levels and identify expensive queries for self-termination
  • Updated query cancellation logic to use QueryResourceTracker instead of just query ID strings
  • Refactored test infrastructure to support thread simulation and heap usage control

Reviewed Changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
PerQueryCPUMemAccountantFactory.java Implements self-termination logic and updates query cancellation methods
TestResourceAccountant.java Adds heap usage control and test-specific watcher task implementation
BasePerQueryCPUMemAccountantTest.java Creates base test class with thread simulation utilities
PerQueryCPUMemAccountantTest.java Refactors existing tests to use new base class and thread simulation
PerQueryCPUMemAccountCancelTest.java Updates cancellation tests to use new infrastructure
PerQueryCPUMemAccountHardCancelTest.java Adds new tests for hard cancellation at critical and panic levels
PerQueryCPUMemAccountantTest.java (runtime) Updates method signature for query cancellation


// Replace task id = 3 (2500 bytes) with a new task id 5 (1500 bytes)
TestResourceAccountant.TaskThread workerEntry = accountant.getTaskThread(queryId, 3);
// Replace task id = 2 (2500 bytes) with a new task id 5 (1500 bytes)
Copy link

Copilot AI Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment incorrectly states 'task id = 2 (2500 bytes)' but the original test setup shows task id 2 has 2000 bytes and task id 2 (the third task with index 2) has 2500 bytes. The comment should clarify which task is being referenced.

Suggested change
// Replace task id = 2 (2500 bytes) with a new task id 5 (1500 bytes)
// Replace task id = 2 (2000 bytes) with a new task id 5 (1500 bytes)

Copilot uses AI. Check for mistakes.
if (config.isOomKillQueryEnabled()) {
if (config.isOomKillQueryEnabled() && !config.isThreadSelfTerminate()) {
int killedCount = 0;
for (Map.Entry<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry> entry : _threadEntriesMap.entrySet()) {
Copy link

Copilot AI Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterating over _threadEntriesMap without synchronization while other threads may be modifying it could lead to ConcurrentModificationException. Consider using appropriate synchronization or a thread-safe iteration approach.

Copilot uses AI. Check for mistakes.
AggregatedStats maxResourceUsageQuery = _maxHeapUsageQuery.get();

if (heapUsageBytes > config.getPanicLevel()) {
if (_cancelSentQueries.add(queryId)) {
Copy link

Copilot AI Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _cancelSentQueries.add() check-and-act pattern is not atomic. Multiple threads could pass the condition simultaneously before any can add the queryId, potentially leading to duplicate termination logging or processing.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_cancelSentQueries is created using ConcurrentHashMap.newKeySet(). So it is backed by a ConcurrentHashMap. Is add still not atomic ?

State your sources.

Copy link
Contributor

@gortiz gortiz Aug 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably copilot is not smart enough to know the runtime implementation used for the set. It may help if you change the static type from Set to ConcurrentHashMap.KeySetView. But yes, it should be atomic

@vrajat vrajat force-pushed the rv-tracing-critical-hard-kill branch from 4820110 to 2756af2 Compare August 4, 2025 03:32
@vrajat vrajat requested a review from Jackie-Jiang August 4, 2025 03:33
Comment on lines +567 to +561
if (callback != null) {
callback.cancelQuery(Long.parseLong(queryId));
_queryCancelCallbacks.invalidate(queryId);
} else {
anchorThread.interrupt();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we already discuss about that. IIRC the anchor thread interruption is what is used to kill SSE queries. I think it would be better (easier to read and maintain) if we move that anchor thread interruption into a callback we register for SSE queries in the same way we have a custom callback for MSE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadnt realized hard-cancel (this PR) is so easy to implement. I want to delete callbacks, anchor threads and reduce watcher task to only aggregate data. Do not try to interrupt threads. Let them self-terminate as in this code.

Comment on lines +909 to +911
if (taskEntry != null && _cancelSentQueries.add(taskEntry.getQueryId())) {
String queryId = taskEntry.getQueryId();
// The cache will be invalidated after the termination is logged.
MseCancelCallback callback = _queryCancelCallbacks.getIfPresent(queryId);
if (callback != null) {
callback.cancelQuery(Long.parseLong(queryId));
} else {
taskEntry.getAnchorThread().interrupt();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is almost the same code that we have in cancelQuery. I think it would be better to extract the code into an internal cancel query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did try to consolidate. The differences are:

  • other instances use QueryResourceTracker while this loop uses TaskEntry (thread metadata vs query metadata)
  • Logging is done outside the loop here. The code should not log for every thread.

Both of these difference made it hard and I gave up.


protected Set<String> _cancelSentQueries;

protected AtomicReference<AggregatedStats> _maxHeapUsageQuery = new AtomicReference<>(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each time we modify this class, I have to spend a long time remembering which thread touches each attribute. Can we add a javadoc on the attributes indicating the thread constraint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a javadoc to explain what the the variable stores as well as who writes and reads it.

@vrajat vrajat force-pushed the rv-tracing-critical-hard-kill branch from 2756af2 to 5f1306b Compare August 4, 2025 15:01
QueryMonitorConfig config = _queryMonitorConfig.get();

if (config.isOomKillQueryEnabled()) {
if (config.isOomKillQueryEnabled() && !config.isThreadSelfTerminate()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(MAJOR) Without interrupt, query might block on certain operations, such as polling a blocking queue. We probably need to do both interrupt and self terminate. How do we verify the current approach is working or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. This is an enhancement though. Note that the current approach of interrupting the anchor thread does not interrupt I/O ops in query execution threads. It still depends on the a thread exiting the I/O operation and then checking the state of the anchor thread.

For pre-emption from I/O ops, a cancel callback is required.

re: testing - I have added basic unit tests as a starting point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant is that we stop sending interrupt when self terminate is enabled. This could cause thread blocking on I/O forever right?

Copy link
Contributor Author

@vrajat vrajat Aug 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interrupt is only sent to the anchor thread. Refer: https://github.com/apache/pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java#L529

Anchor Thread does not interrupt other threads in SSE. So there is no code yet for interrupting query execution thread from PerQuery....

Refer:

Tracing.ThreadAccountantOps.setupRunner(QueryThreadContext.getCid(), workloadName);

For MSE, Cancel Callback was added recently. The callback does call interrupt on all threads. However there isn't high confidence if OpChainSchedulerService.cancel works.

Refer: https://github.com/apache/pinot/blob/master/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java#L120

If you plan to add code to also cancel query execution threads - then yes - both self-terminate and interrupt of query execution threads are required. One option is to use cancel callbacks. The better approach is for QueryScheduler or QueryRunner to proactively cancel all threads of a query when one of them returns an error. Its control flow smell for one query execution thread to take on the responsibility of interrupting other query execution threads.

@vrajat vrajat force-pushed the rv-tracing-critical-hard-kill branch from 3e9cc09 to 0f2160e Compare August 14, 2025 06:00
@vrajat vrajat force-pushed the rv-tracing-critical-hard-kill branch from 0f2160e to b2a4cbb Compare August 14, 2025 06:14
@vrajat
Copy link
Contributor Author

vrajat commented Aug 18, 2025

Flaky test. More info #16554

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants