Add proactive per-query memory checking to prevent OOM#16378
Add proactive per-query memory checking to prevent OOM#16378xiangfu0 wants to merge 8 commits intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR implements proactive per-query memory checking to prevent OOM issues by allowing operators to monitor memory usage and terminate queries that exceed configured limits. The feature provides adaptive default memory limits based on JVM heap size and comprehensive configuration options.
- Adds memory threshold checking functionality to the
ThreadResourceUsageAccountantinterface - Integrates memory checks into operator execution paths (
BaseOperatorandMultiStageOperator) - Implements configurable per-query memory limits with adaptive defaults (1/3 of heap size)
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| CommonConstants.java | Adds configuration constants for enabling per-query memory checks and setting memory limits |
| ThreadResourceUsageAccountant.java | Adds checkMemoryAndInterruptIfExceeded() method to the interface |
| Tracing.java | Adds utility methods for memory checking and thread interruption |
| PerQueryCPUMemAccountantFactory.java | Implements core memory checking logic with error handling and thread interruption |
| BaseOperator.java | Integrates memory checks into the nextBlock() execution flow |
| MultiStageOperator.java | Integrates memory checks into the nextBlock() execution flow |
| PerQueryCPUMemAccountantTest.java | Adds comprehensive unit tests for all memory checking scenarios |
Comments suppressed due to low confidence (1)
pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java:395
- [nitpick] The method name 'interruptRunnerThread' is ambiguous - it's unclear what constitutes a 'runner thread'. Consider renaming to 'interruptAnchorThread' or 'interruptQueryThread' to better reflect its purpose.
public static void interruptRunnerThread() {
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
Outdated
Show resolved
Hide resolved
pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantTest.java
Show resolved
Hide resolved
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #16378 +/- ##
============================================
+ Coverage 62.90% 63.17% +0.26%
+ Complexity 1386 1363 -23
============================================
Files 2867 3006 +139
Lines 163354 174403 +11049
Branches 24952 26680 +1728
============================================
+ Hits 102755 110174 +7419
- Misses 52847 55813 +2966
- Partials 7752 8416 +664
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
vrajat
left a comment
There was a problem hiding this comment.
My suggestion is - do not expose this feature in ThreadResourceUsageAccountant. Instead add a generic function in that interface - isQueryTerminated() which can check all the conditions based on the capabilities of the Accountant.
Then PerQueryCPUMemAccountant and ResourceUsageAccountant can evolve independently.
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
Show resolved
Hide resolved
| return getWatcherTask().getHeapUsageBytes() > getWatcherTask().getQueryMonitorConfig().getAlarmingLevel(); | ||
| } | ||
|
|
||
| public void checkMemoryAndInterruptIfExceeded() { |
There was a problem hiding this comment.
Instead of interrupting the anchor thread, this function can be called from Tracing.ThreadAccountantOps.isInterrupted(). Specifically:
return Tracing.getThreadAccountant.checkMemoryAndInterruptIfExceeded() || Thread.interrupted() || Tracing.getThreadAccountant().isAnchorThreadInterrupted();
A better long term solution is to change ``Tracing.ThreadAccountantOps.isInterrupted()` to
return Thread.interrupted() || Tracing.getThreadAccountant().isQueryTerminated();
ThreadResourceUsageAccountant.isQueryTerminated() is a new function:
return checkMemoryAndInterruptIfExceeded() || isAnchorThreadInterrupted()
The advantage is that new conditions can be added depending on the capability of the accountant such as also interrupting in panic mode.
| public static final String CONFIG_OF_ENABLE_PER_QUERY_MEMORY_CHECK = "accounting.per.query.memory.check.enabled"; | ||
| public static final boolean DEFAULT_ENABLE_PER_QUERY_MEMORY_CHECK = false; | ||
|
|
||
| public static final String CONFIG_OF_PER_QUERY_MEMORY_LIMIT_BYTES = "accounting.per.query.memory.limit.bytes"; |
There was a problem hiding this comment.
Should there be thread in the name as single thread's allocation is checked ? Or improve the capability later to consider aggregate ?
|
Check #16380 for an example of using |
0598e36 to
468ab42
Compare
| throw new EarlyTerminationException("Interrupted while processing next block"); | ||
| } | ||
|
|
||
| // Check per-thread memory usage and terminate query proactively if threshold exceeded |
There was a problem hiding this comment.
Could it be overly chatty for light operators that iterate in a tight loop (e.g., FilterOperator on a single segment). Would the following addition make sense?
1. Short-circuiting if the thread’s memory sample hasn’t changed since last check.
2. Sampling every N blocks (configurable) instead of every block.
Even though each step is cheap, doing it 1000 times for a query that finishes in ~10 ms adds measurable overhead (extra branches, TLS lookups, potential volatile reads).
For heavier operators (joins, aggregations) the added cost is negligible, but for “light” ones the ratio might be higher.
| CommonConstants.Accounting.DEFAULT_PER_THREAD_QUERY_MEMORY_LIMIT_BYTES); | ||
| // If using default value, dynamically calculate based on actual heap size for better defaults | ||
| if (configuredLimit == CommonConstants.Accounting.DEFAULT_PER_THREAD_QUERY_MEMORY_LIMIT_BYTES) { | ||
| long maxHeapMemory = Runtime.getRuntime().maxMemory(); |
There was a problem hiding this comment.
Inside cgroup, sometime memory.limit_in_bytes < JVM -Xmx.
When computing default, is there a way to compare maxMemory() with /sys/fs/cgroup/memory.max if present.
| } else { | ||
| // Wait and let the outer loop re-evaluate - pause 100ms before next iteration | ||
| try { | ||
| Thread.sleep(100); |
There was a problem hiding this comment.
Do we want to consider exponential ?
Might reduce latency penalty if memory pressure clears quickly
27d1542 to
10917ca
Compare
This commit adds a new feature that allows operators to examine per-thread memory usage and proactively terminate queries when they exceed configured memory limits, preventing system-wide OOM issues. Key Changes: - Add checkMemoryAndInterruptIfExceeded() to ThreadResourceUsageAccountant interface - Implement memory checking logic in PerQueryCPUMemAccountantFactory - Add configuration options for enabling and setting per-query memory limits - Integrate memory checks into BaseOperator and MultiStageOperator nextBlock() calls - Add ThreadAccountantOps.checkMemoryAndInterruptIfExceeded() utility method - Add comprehensive unit tests covering all scenarios Configuration: - accounting.per.query.memory.check.enabled (default: false) - accounting.per.query.memory.limit.bytes (default: 1GB) Usage: Operators now automatically check memory usage on each nextBlock() call and will terminate queries with detailed error messages if limits are exceeded.
Changed DEFAULT_PER_QUERY_MEMORY_LIMIT_BYTES from fixed 1GB to dynamically calculated Runtime.getRuntime().maxMemory() / 3. This makes the default memory limit adaptive to the actual JVM heap size, providing better resource utilization and preventing OOM issues more effectively. - More appropriate for different deployment sizes - Scales with available memory - Maintains backward compatibility through configuration
…s method - Fix Runtime.getRuntime().maxMemory() at class loading time issue: * Change DEFAULT_PER_QUERY_MEMORY_LIMIT_BYTES to fixed 512MB default * Add dynamic calculation in constructor using 1/3 heap size (capped at 2GB) * Only use dynamic calculation when using default value - Rename ambiguous method interruptRunnerThread to interruptAnchorThread: * More clearly indicates it interrupts the anchor thread * Updated all references in QueryAggregator and Tracing classes
… and isQueryTerminated method - Add per-thread query memory configurations to QueryMonitorConfig for runtime changeability - Rename configurations to include 'thread' for clarity (CONFIG_OF_PER_THREAD_QUERY_MEMORY_*) - Add isQueryTerminated() method to ThreadResourceUsageAccountant interface - Implement isQueryTerminated() in PerQueryCPUMemAccountantFactory and ResourceUsageAccountantFactory - Update Tracing.ThreadAccountantOps.isInterrupted() to check for query termination - Update checkMemoryAndInterruptIfExceeded() to use QueryMonitorConfig for runtime settings - Update error messages to use 'per-thread' terminology for accuracy - Update all tests and utilities to use new configuration constants
- Remove CONFIG_OF_ENABLE_PER_QUERY_MEMORY_CHECK and DEFAULT_ENABLE_PER_QUERY_MEMORY_CHECK - Remove CONFIG_OF_PER_QUERY_MEMORY_LIMIT_BYTES and DEFAULT_PER_QUERY_MEMORY_LIMIT_BYTES - These constants have been replaced by per-thread equivalents for better clarity - No remaining references found in codebase
- Transform ThrottleOnCriticalHeapUsageExecutor from immediate task rejection to queue-based approach - Add configurable task queue (default: 1000 tasks, 30s timeout, 1s monitoring interval) - Implement background monitoring to process queued tasks when heap usage drops below critical level - Add comprehensive metrics tracking (queued, processed, timed-out task counts) - Support both Runnable and Callable task queuing with proper timeout handling - Maintain backward compatibility with existing constructor - Add extensive test coverage for queuing, timeout, overflow, and shutdown scenarios This enhancement provides graceful degradation during high heap usage periods instead of immediate query failures, improving overall system resilience and user experience.
4d8ddec to
777d6c2
Compare
…ry pressure mode Adding more trace tracking for join/window operators
777d6c2 to
be9fb8a
Compare
Summary
This PR adds a new feature that allows operators to examine per-thread memory usage and proactively terminate queries when they exceed configured memory limits, preventing system-wide OOM issues.
Key Changes
checkMemoryAndInterruptIfExceeded()toThreadResourceUsageAccountantinterfacePerQueryCPUMemAccountantFactoryBaseOperatorandMultiStageOperatornextBlock()callsThreadAccountantOps.checkMemoryAndInterruptIfExceeded()utility methodConfiguration
accounting.per.query.memory.check.enabled(default: false)accounting.per.query.memory.limit.bytes(default: adaptive based on heap size)Adaptive Default Memory Limit
The default memory limit now dynamically scales with the JVM heap size:
This provides better resource utilization while maintaining the ability to override via configuration.
Usage
Operators now automatically check memory usage on each
nextBlock()call and will terminate queries with detailed error messages if limits are exceeded.Testing
PerQueryCPUMemAccountantTestConfiguration Examples
Enable with Dynamic Limits (Recommended)
Enable with Fixed Limit
Disable (Default)