-
Couldn't load subscription status.
- Fork 1.7k
Description
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
The basic challenge is that as of today, DataFusion can use an unbounded amount of memory for running a plan and it is neither possible to calculate the memory before hand nor limit the use.
If DataFusion processes individual partitions that are larger than the available memory system memory, right now it will keep allocating memory from the system until it is killed by the OS or container system.
Also, when running multiple datafusion plans in the same process, each will consume memory without limit where it may be desirable to reserve / cap memory usage by any individual plan to ensure the plans don't together exceed the system memory budget
Thus, it would be nice if we could give DataFusion's plans a memory budget which they then stayed under
Describe the solution you'd like
- Add an option to ExecutionConfig that has a “total plan memory budget”
- Add logic to each node that requires a memory buffer to ensure it stays under the limit.
The operators that can use large amounts of memory today are:
- Sort
- Join
- GroupByHash
There are many potential ways to ensure the limit is respected:
- (Simplest) error if the budget is exceeded
- (more complex): employ algorithms that can use secondary storage (e.g. temp files) like sort that spills multiple round of partial sorted results and give a final merge phase for the partition global ordering
Describe alternatives you've considered
There are some interesting tradeoffs between “up front allocation” dividing memory up across all operators that would need it and a more dynamic approach.
This is likely something that will require some major efforts over many different issues -- I suggest we use this issue to implement a simple "error if over limit" strategy and then work on more sophisticated strategies subsequently
Progress tracking
Added Jan 2022:
Remaining Work
- Initial MemoryManager and DiskManager APIs for query execution + External Sort implementation #1526
- [Epic] Generate runtime errors if the memory budget is exceeded #3941
- [EPIC] Memory Limited Sort (Externalized / Spill) #1568
- Memory Limited GroupBy (Externalized / Spill) #1570
- Memory Limited Joins (Externalized / Spill) #1599
- Track memory usage in Non Limited Operators #1569
- DiskManager Performs Blocking IO #1637
- Unlimited memory consumption in
RepartitionExec#4816