Skip to content

[PoC] Add API for tracking distinct buffers in MemoryPool by reference count #16359

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from

Conversation

Dandandan
Copy link
Contributor

@Dandandan Dandandan commented Jun 10, 2025

Which issue does this PR close?

  • Closes #.

Rationale for this change

Currently, memory is tracked based on self reporting of bytes by each individual consumer. While that often works, it will overreport the amount of memory

  • Whenever the child operator produces data based on calling slice on the RecordBatch / arrays. (this over-reports a lot for very large RecordBatches, i.e. those coming out of aggregates).
  • Whenever child arrays are re-used within the same RecordBatch (for example, two same arrays with a different name).

This PR proposes an API extension to MemoryPool and MemoryReservation that tracks Arc<dyn Array> based on their memory address.

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added execution Related to the execution crate physical-plan Changes to the physical-plan crate labels Jun 10, 2025
@Dandandan Dandandan changed the title Add API for tracking distinct arrays Add API for tracking distinct arrays in MemoryPool Jun 10, 2025
@Dandandan Dandandan changed the title Add API for tracking distinct arrays in MemoryPool Add API for tracking distinct arrays in MemoryPool by reference count Jun 10, 2025
@Dandandan Dandandan changed the title Add API for tracking distinct arrays in MemoryPool by reference count [PoC] Add API for tracking distinct arrays in MemoryPool by reference count Jun 10, 2025
@Dandandan Dandandan changed the title [PoC] Add API for tracking distinct arrays in MemoryPool by reference count [PoC] Add API for tracking distinct buffers in MemoryPool by reference count Jun 10, 2025
Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, this solves the memory overcounting issue across batches. I got some suggestions/questions.

This new pool implementation might cause some issue for MemoryReservation API:
If we first count the memory usage of several arrays by grow_with_arrays() interface, and then use the following functions to release (like reservation.resize(0)), memory used value will be subtracted, but the inner hash table won't be cleared

pub fn free(&mut self) -> usize {
let size = self.size;
if size != 0 {
self.shrink(size)
}
size
}

pub fn resize(&mut self, capacity: usize) {
match capacity.cmp(&self.size) {
Ordering::Greater => self.grow(capacity - self.size),
Ordering::Less => self.shrink(self.size - capacity),
_ => {}
}
}

This causes a potential inconsistent state, maybe we can use another semantics for the MemoryReservation API:

  • Memory for arrays can only be managed with grow_with_arrays()/shrink_with_arrays()
  • Other interfaces like shrink()/grow() will used for other memory usage

And the implementation will be like

#[derive(Debug)]
pub struct GreedyMemoryPoolWithTracking {
    pool_size: usize,
    used_others: AtomicUsize, // managed with non-array APIs
    
    used_array: AtomicUsize, // ref-counted with buffer addr stored in the hash table
    references: Mutex<HashMap<usize, usize>>,
}

fn grow_with_arrays(
&self,
reservation: &MemoryReservation,
arrays: &[Arc<dyn Array>],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the API take RecordBatch instead of arrays? Since inside df it's more common to passing batches around, and we can use a utility function to do batch -> [Array] for array usages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats possible, let me have a look at whether we can use recordbatch always. My corcern was we might not always have a RecordBatch, but might have an Array. In that case conversion to recordbatch would be strange.

@Dandandan
Copy link
Contributor Author

Thank you, this solves the memory overcounting issue across batches. I got some suggestions/questions.

This new pool implementation might cause some issue for MemoryReservation API: If we first count the memory usage of several arrays by grow_with_arrays() interface, and then use the following functions to release (like reservation.resize(0)), memory used value will be subtracted, but the inner hash table won't be cleared

pub fn free(&mut self) -> usize {
let size = self.size;
if size != 0 {
self.shrink(size)
}
size
}

pub fn resize(&mut self, capacity: usize) {
match capacity.cmp(&self.size) {
Ordering::Greater => self.grow(capacity - self.size),
Ordering::Less => self.shrink(self.size - capacity),
_ => {}
}
}

This causes a potential inconsistent state, maybe we can use another semantics for the MemoryReservation API:

  • Memory for arrays can only be managed with grow_with_arrays()/shrink_with_arrays()
  • Other interfaces like shrink()/grow() will used for other memory usage

And the implementation will be like

#[derive(Debug)]
pub struct GreedyMemoryPoolWithTracking {
    pool_size: usize,
    used_others: AtomicUsize, // managed with non-array APIs
    
    used_array: AtomicUsize, // ref-counted with buffer addr stored in the hash table
    references: Mutex<HashMap<usize, usize>>,
}

Hi

Thank you, this solves the memory overcounting issue across batches. I got some suggestions/questions.

This new pool implementation might cause some issue for MemoryReservation API: If we first count the memory usage of several arrays by grow_with_arrays() interface, and then use the following functions to release (like reservation.resize(0)), memory used value will be subtracted, but the inner hash table won't be cleared

pub fn free(&mut self) -> usize {
let size = self.size;
if size != 0 {
self.shrink(size)
}
size
}

pub fn resize(&mut self, capacity: usize) {
match capacity.cmp(&self.size) {
Ordering::Greater => self.grow(capacity - self.size),
Ordering::Less => self.shrink(self.size - capacity),
_ => {}
}
}

This causes a potential inconsistent state, maybe we can use another semantics for the MemoryReservation API:

  • Memory for arrays can only be managed with grow_with_arrays()/shrink_with_arrays()
  • Other interfaces like shrink()/grow() will used for other memory usage

And the implementation will be like

#[derive(Debug)]
pub struct GreedyMemoryPoolWithTracking {
    pool_size: usize,
    used_others: AtomicUsize, // managed with non-array APIs
    
    used_array: AtomicUsize, // ref-counted with buffer addr stored in the hash table
    references: Mutex<HashMap<usize, usize>>,
}

Thanks for the feedback!

Yes, tracking the size seperately makes sense, I'll change that!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
execution Related to the execution crate physical-plan Changes to the physical-plan crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants