Skip to content

Add proactive per-query memory checking to prevent OOM#16378

Open
xiangfu0 wants to merge 8 commits intoapache:masterfrom
xiangfu0:query-sucide-when-oom
Open

Add proactive per-query memory checking to prevent OOM#16378
xiangfu0 wants to merge 8 commits intoapache:masterfrom
xiangfu0:query-sucide-when-oom

Conversation

@xiangfu0
Copy link
Contributor

@xiangfu0 xiangfu0 commented Jul 18, 2025

Summary

This PR adds a new feature that allows operators to examine per-thread memory usage and proactively terminate queries when they exceed configured memory limits, preventing system-wide OOM issues.

Key Changes

  • Add checkMemoryAndInterruptIfExceeded() to ThreadResourceUsageAccountant interface
  • Implement memory checking logic in PerQueryCPUMemAccountantFactory
  • Add configuration options for enabling and setting per-query memory limits
  • Integrate memory checks into BaseOperator and MultiStageOperator nextBlock() calls
  • Add ThreadAccountantOps.checkMemoryAndInterruptIfExceeded() utility method
  • Add comprehensive unit tests covering all scenarios
  • Make default memory limit adaptive (1/3 of heap size instead of fixed 1GB)

Configuration

  • accounting.per.query.memory.check.enabled (default: false)
  • accounting.per.query.memory.limit.bytes (default: adaptive based on heap size)

Adaptive Default Memory Limit

The default memory limit now dynamically scales with the JVM heap size:

  • Small deployments (6GB heap): 2GB per-query limit
  • Medium deployments (12GB heap): 4GB per-query limit
  • Large deployments (24GB heap): 8GB per-query limit

This provides better resource utilization while maintaining the ability to override via configuration.

Usage

Operators now automatically check memory usage on each nextBlock() call and will terminate queries with detailed error messages if limits are exceeded.

Testing

  • Added comprehensive unit tests in PerQueryCPUMemAccountantTest
  • Tests cover enabled/disabled scenarios, memory limit enforcement, and edge cases
  • All existing tests continue to pass

Configuration Examples

Enable with Dynamic Limits (Recommended)

accounting.per.query.memory.check.enabled=true
# accounting.per.query.memory.limit.bytes not set - uses dynamic calculation

Enable with Fixed Limit

accounting.per.query.memory.check.enabled=true
accounting.per.query.memory.limit.bytes=2147483648  # 2GB fixed limit

Disable (Default)

accounting.per.query.memory.check.enabled=false

@xiangfu0 xiangfu0 requested a review from Copilot July 18, 2025 01:42
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements proactive per-query memory checking to prevent OOM issues by allowing operators to monitor memory usage and terminate queries that exceed configured limits. The feature provides adaptive default memory limits based on JVM heap size and comprehensive configuration options.

  • Adds memory threshold checking functionality to the ThreadResourceUsageAccountant interface
  • Integrates memory checks into operator execution paths (BaseOperator and MultiStageOperator)
  • Implements configurable per-query memory limits with adaptive defaults (1/3 of heap size)

Reviewed Changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
CommonConstants.java Adds configuration constants for enabling per-query memory checks and setting memory limits
ThreadResourceUsageAccountant.java Adds checkMemoryAndInterruptIfExceeded() method to the interface
Tracing.java Adds utility methods for memory checking and thread interruption
PerQueryCPUMemAccountantFactory.java Implements core memory checking logic with error handling and thread interruption
BaseOperator.java Integrates memory checks into the nextBlock() execution flow
MultiStageOperator.java Integrates memory checks into the nextBlock() execution flow
PerQueryCPUMemAccountantTest.java Adds comprehensive unit tests for all memory checking scenarios
Comments suppressed due to low confidence (1)

pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java:395

  • [nitpick] The method name 'interruptRunnerThread' is ambiguous - it's unclear what constitutes a 'runner thread'. Consider renaming to 'interruptAnchorThread' or 'interruptQueryThread' to better reflect its purpose.
    public static void interruptRunnerThread() {

@codecov-commenter
Copy link

codecov-commenter commented Jul 18, 2025

Codecov Report

Attention: Patch coverage is 45.69191% with 208 lines in your changes missing coverage. Please review.

Project coverage is 63.17%. Comparing base (1a476de) to head (be9fb8a).
Report is 506 commits behind head on master.

Files with missing lines Patch % Lines
.../executor/ThrottleOnCriticalHeapUsageExecutor.java 35.71% 108 Missing and 9 partials ⚠️
...re/accounting/PerQueryCPUMemAccountantFactory.java 54.54% 57 Missing and 18 partials ⚠️
...ache/pinot/core/accounting/QueryMonitorConfig.java 44.44% 8 Missing and 2 partials ⚠️
.../main/java/org/apache/pinot/spi/trace/Tracing.java 44.44% 4 Missing and 1 partial ⚠️
.../apache/pinot/core/accounting/QueryAggregator.java 50.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #16378      +/-   ##
============================================
+ Coverage     62.90%   63.17%   +0.26%     
+ Complexity     1386     1363      -23     
============================================
  Files          2867     3006     +139     
  Lines        163354   174403   +11049     
  Branches      24952    26680    +1728     
============================================
+ Hits         102755   110174    +7419     
- Misses        52847    55813    +2966     
- Partials       7752     8416     +664     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 ?
java-11 63.12% <45.16%> (+0.25%) ⬆️
java-21 63.13% <44.90%> (+0.31%) ⬆️
skip-bytebuffers-false ?
skip-bytebuffers-true ?
temurin 63.17% <45.69%> (+0.26%) ⬆️
unittests 63.16% <45.69%> (+0.26%) ⬆️
unittests1 56.28% <45.69%> (+0.45%) ⬆️
unittests2 33.12% <0.00%> (-0.45%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@xiangfu0 xiangfu0 requested review from Jackie-Jiang and vrajat July 18, 2025 02:17
@xiangfu0 xiangfu0 added Configuration Config changes (addition/deletion/change in behavior) query labels Jul 18, 2025
Copy link
Contributor

@vrajat vrajat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion is - do not expose this feature in ThreadResourceUsageAccountant. Instead add a generic function in that interface - isQueryTerminated() which can check all the conditions based on the capabilities of the Accountant.

Then PerQueryCPUMemAccountant and ResourceUsageAccountant can evolve independently.

return getWatcherTask().getHeapUsageBytes() > getWatcherTask().getQueryMonitorConfig().getAlarmingLevel();
}

public void checkMemoryAndInterruptIfExceeded() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of interrupting the anchor thread, this function can be called from Tracing.ThreadAccountantOps.isInterrupted(). Specifically:

return Tracing.getThreadAccountant.checkMemoryAndInterruptIfExceeded() || Thread.interrupted() || Tracing.getThreadAccountant().isAnchorThreadInterrupted();

A better long term solution is to change ``Tracing.ThreadAccountantOps.isInterrupted()` to

return Thread.interrupted() || Tracing.getThreadAccountant().isQueryTerminated();

ThreadResourceUsageAccountant.isQueryTerminated() is a new function:

return checkMemoryAndInterruptIfExceeded() || isAnchorThreadInterrupted()

The advantage is that new conditions can be added depending on the capability of the accountant such as also interrupting in panic mode.

public static final String CONFIG_OF_ENABLE_PER_QUERY_MEMORY_CHECK = "accounting.per.query.memory.check.enabled";
public static final boolean DEFAULT_ENABLE_PER_QUERY_MEMORY_CHECK = false;

public static final String CONFIG_OF_PER_QUERY_MEMORY_LIMIT_BYTES = "accounting.per.query.memory.limit.bytes";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be thread in the name as single thread's allocation is checked ? Or improve the capability later to consider aggregate ?

@vrajat
Copy link
Contributor

vrajat commented Jul 18, 2025

Check #16380 for an example of using isQueryTerminated

@xiangfu0 xiangfu0 force-pushed the query-sucide-when-oom branch 4 times, most recently from 0598e36 to 468ab42 Compare July 21, 2025 03:17
throw new EarlyTerminationException("Interrupted while processing next block");
}

// Check per-thread memory usage and terminate query proactively if threshold exceeded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it be overly chatty for light operators that iterate in a tight loop (e.g., FilterOperator on a single segment). Would the following addition make sense?
1. Short-circuiting if the thread’s memory sample hasn’t changed since last check.
2. Sampling every N blocks (configurable) instead of every block.

Even though each step is cheap, doing it 1000 times for a query that finishes in ~10 ms adds measurable overhead (extra branches, TLS lookups, potential volatile reads).
For heavier operators (joins, aggregations) the added cost is negligible, but for “light” ones the ratio might be higher.

CommonConstants.Accounting.DEFAULT_PER_THREAD_QUERY_MEMORY_LIMIT_BYTES);
// If using default value, dynamically calculate based on actual heap size for better defaults
if (configuredLimit == CommonConstants.Accounting.DEFAULT_PER_THREAD_QUERY_MEMORY_LIMIT_BYTES) {
long maxHeapMemory = Runtime.getRuntime().maxMemory();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inside cgroup, sometime memory.limit_in_bytes < JVM -Xmx.

When computing default, is there a way to compare maxMemory() with /sys/fs/cgroup/memory.max if present.

} else {
// Wait and let the outer loop re-evaluate - pause 100ms before next iteration
try {
Thread.sleep(100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to consider exponential ?

Might reduce latency penalty if memory pressure clears quickly

@xiangfu0 xiangfu0 force-pushed the query-sucide-when-oom branch 7 times, most recently from 27d1542 to 10917ca Compare July 24, 2025 06:04
xiangfu0 added 7 commits July 24, 2025 03:35
This commit adds a new feature that allows operators to examine per-thread
memory usage and proactively terminate queries when they exceed configured
memory limits, preventing system-wide OOM issues.

Key Changes:
- Add checkMemoryAndInterruptIfExceeded() to ThreadResourceUsageAccountant interface
- Implement memory checking logic in PerQueryCPUMemAccountantFactory
- Add configuration options for enabling and setting per-query memory limits
- Integrate memory checks into BaseOperator and MultiStageOperator nextBlock() calls
- Add ThreadAccountantOps.checkMemoryAndInterruptIfExceeded() utility method
- Add comprehensive unit tests covering all scenarios

Configuration:
- accounting.per.query.memory.check.enabled (default: false)
- accounting.per.query.memory.limit.bytes (default: 1GB)

Usage:
Operators now automatically check memory usage on each nextBlock() call
and will terminate queries with detailed error messages if limits are exceeded.
Changed DEFAULT_PER_QUERY_MEMORY_LIMIT_BYTES from fixed 1GB to
dynamically calculated Runtime.getRuntime().maxMemory() / 3.

This makes the default memory limit adaptive to the actual
JVM heap size, providing better resource utilization and
preventing OOM issues more effectively.

- More appropriate for different deployment sizes
- Scales with available memory
- Maintains backward compatibility through configuration
…s method

- Fix Runtime.getRuntime().maxMemory() at class loading time issue:
  * Change DEFAULT_PER_QUERY_MEMORY_LIMIT_BYTES to fixed 512MB default
  * Add dynamic calculation in constructor using 1/3 heap size (capped at 2GB)
  * Only use dynamic calculation when using default value

- Rename ambiguous method interruptRunnerThread to interruptAnchorThread:
  * More clearly indicates it interrupts the anchor thread
  * Updated all references in QueryAggregator and Tracing classes
… and isQueryTerminated method

- Add per-thread query memory configurations to QueryMonitorConfig for runtime changeability
- Rename configurations to include 'thread' for clarity (CONFIG_OF_PER_THREAD_QUERY_MEMORY_*)
- Add isQueryTerminated() method to ThreadResourceUsageAccountant interface
- Implement isQueryTerminated() in PerQueryCPUMemAccountantFactory and ResourceUsageAccountantFactory
- Update Tracing.ThreadAccountantOps.isInterrupted() to check for query termination
- Update checkMemoryAndInterruptIfExceeded() to use QueryMonitorConfig for runtime settings
- Update error messages to use 'per-thread' terminology for accuracy
- Update all tests and utilities to use new configuration constants
- Remove CONFIG_OF_ENABLE_PER_QUERY_MEMORY_CHECK and DEFAULT_ENABLE_PER_QUERY_MEMORY_CHECK
- Remove CONFIG_OF_PER_QUERY_MEMORY_LIMIT_BYTES and DEFAULT_PER_QUERY_MEMORY_LIMIT_BYTES
- These constants have been replaced by per-thread equivalents for better clarity
- No remaining references found in codebase
- Transform ThrottleOnCriticalHeapUsageExecutor from immediate task rejection to queue-based approach
- Add configurable task queue (default: 1000 tasks, 30s timeout, 1s monitoring interval)
- Implement background monitoring to process queued tasks when heap usage drops below critical level
- Add comprehensive metrics tracking (queued, processed, timed-out task counts)
- Support both Runnable and Callable task queuing with proper timeout handling
- Maintain backward compatibility with existing constructor
- Add extensive test coverage for queuing, timeout, overflow, and shutdown scenarios

This enhancement provides graceful degradation during high heap usage periods instead of
immediate query failures, improving overall system resilience and user experience.
@xiangfu0 xiangfu0 force-pushed the query-sucide-when-oom branch 2 times, most recently from 4d8ddec to 777d6c2 Compare July 24, 2025 11:56
…ry pressure mode

Adding more trace tracking for join/window operators
@xiangfu0 xiangfu0 force-pushed the query-sucide-when-oom branch from 777d6c2 to be9fb8a Compare July 24, 2025 13:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Configuration Config changes (addition/deletion/change in behavior) query

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants