-
Notifications
You must be signed in to change notification settings - Fork 1.5k
[PoC] Add API for tracking distinct buffers in MemoryPool
by reference count
#16359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
MemoryPool
MemoryPool
MemoryPool
by reference count
MemoryPool
by reference countMemoryPool
by reference count
MemoryPool
by reference countMemoryPool
by reference count
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, this solves the memory overcounting issue across batches. I got some suggestions/questions.
This new pool implementation might cause some issue for MemoryReservation
API:
If we first count the memory usage of several arrays by grow_with_arrays()
interface, and then use the following functions to release (like reservation.resize(0)
), memory used value will be subtracted, but the inner hash table won't be cleared
datafusion/datafusion/execution/src/memory_pool/mod.rs
Lines 308 to 314 in f35416e
pub fn free(&mut self) -> usize { | |
let size = self.size; | |
if size != 0 { | |
self.shrink(size) | |
} | |
size | |
} |
datafusion/datafusion/execution/src/memory_pool/mod.rs
Lines 345 to 351 in f35416e
pub fn resize(&mut self, capacity: usize) { | |
match capacity.cmp(&self.size) { | |
Ordering::Greater => self.grow(capacity - self.size), | |
Ordering::Less => self.shrink(self.size - capacity), | |
_ => {} | |
} | |
} |
This causes a potential inconsistent state, maybe we can use another semantics for the MemoryReservation
API:
- Memory for arrays can only be managed with
grow_with_arrays()/shrink_with_arrays()
- Other interfaces like
shrink()/grow()
will used for other memory usage
And the implementation will be like
#[derive(Debug)]
pub struct GreedyMemoryPoolWithTracking {
pool_size: usize,
used_others: AtomicUsize, // managed with non-array APIs
used_array: AtomicUsize, // ref-counted with buffer addr stored in the hash table
references: Mutex<HashMap<usize, usize>>,
}
fn grow_with_arrays( | ||
&self, | ||
reservation: &MemoryReservation, | ||
arrays: &[Arc<dyn Array>], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make the API take RecordBatch
instead of arrays? Since inside df it's more common to passing batches around, and we can use a utility function to do batch -> [Array]
for array usages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats possible, let me have a look at whether we can use recordbatch always. My corcern was we might not always have a RecordBatch, but might have an Array. In that case conversion to recordbatch would be strange.
Hi
Thanks for the feedback! Yes, tracking the size seperately makes sense, I'll change that! |
Which issue does this PR close?
Rationale for this change
Currently, memory is tracked based on self reporting of bytes by each individual consumer. While that often works, it will overreport the amount of memory
slice
on theRecordBatch
/ arrays. (this over-reports a lot for very largeRecordBatches
, i.e. those coming out of aggregates).RecordBatch
(for example, two same arrays with a different name).This PR proposes an API extension to
MemoryPool
andMemoryReservation
that tracksArc<dyn Array>
based on their memory address.What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?