Skip to content

[PoC] Add API for tracking distinct buffers in MemoryPool by reference count #16359

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions datafusion/execution/src/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
//! [`MemoryPool`] for memory management during query execution, [`proxy`] for
//! help with allocation accounting.

use arrow::array::Array;
use datafusion_common::{internal_err, Result};
use std::hash::{Hash, Hasher};
use std::vec;
use std::{cmp::Ordering, sync::atomic, sync::Arc};

mod pool;
Expand Down Expand Up @@ -131,14 +133,58 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug {
/// This must always succeed
fn grow(&self, reservation: &MemoryReservation, additional: usize);

/// Infallibly grow the provided `reservation` by bytes in held in &[Arc<dyn Array>]
///
/// This defaults to summing the memory size of all arrays, but can be
/// overridden by implementations that track the memory size of Array usages
fn grow_with_arrays(
&self,
reservation: &MemoryReservation,
arrays: &[Arc<dyn Array>],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the API take RecordBatch instead of arrays? Since inside df it's more common to passing batches around, and we can use a utility function to do batch -> [Array] for array usages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats possible, let me have a look at whether we can use recordbatch always. My corcern was we might not always have a RecordBatch, but might have an Array. In that case conversion to recordbatch would be strange.

) {
let additional = arrays
.iter()
.map(|array| array.get_array_memory_size())
.sum();
self.grow(reservation, additional);
}

/// Infallibly shrink the provided `reservation` by `shrink` bytes
fn shrink(&self, reservation: &MemoryReservation, shrink: usize);

fn shrink_with_arrays(
&self,
reservation: &MemoryReservation,
arrays: &[Arc<dyn Array>],
) {
let shrink = arrays
.iter()
.map(|array| array.get_array_memory_size())
.sum();
self.shrink(reservation, shrink);
}

/// Attempt to grow the provided `reservation` by `additional` bytes
///
/// On error the `allocation` will not be increased in size
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>;

/// Infallibly grow the provided `reservation` by bytes held in &[Arc<dyn Array>]
///
/// This defaults to summing the memory size of all arrays, but can be
/// overridden by implementations that track the memory size of Array usages
fn try_grow_with_arrays(
&self,
reservation: &MemoryReservation,
arrays: &[Arc<dyn Array>],
) -> Result<()> {
let additional = arrays
.iter()
.map(|array| array.get_array_memory_size())
.sum();
self.try_grow(reservation, additional)
}

/// Return the total amount of memory reserved
fn reserved(&self) -> usize;

Expand Down Expand Up @@ -261,6 +307,7 @@ impl MemoryConsumer {
consumer: self,
}),
size: 0,
arrays: Vec::new(),
}
}
}
Expand Down Expand Up @@ -290,6 +337,8 @@ impl Drop for SharedRegistration {
pub struct MemoryReservation {
registration: Arc<SharedRegistration>,
size: usize,
// arrays tracked by this reservation
arrays: Vec<Arc<dyn Array>>,
}

impl MemoryReservation {
Expand All @@ -310,6 +359,11 @@ impl MemoryReservation {
if size != 0 {
self.shrink(size)
}

self.registration
.pool
.shrink_with_arrays(self, &self.arrays);
self.arrays.clear();
size
}

Expand Down Expand Up @@ -375,6 +429,18 @@ impl MemoryReservation {
Ok(())
}

/// Increase the size of this reservation by bytes held in
/// the provided `arrays`.
pub fn try_grow_with_arrays(&mut self, arrays: &[Arc<dyn Array>]) -> Result<()> {
self.registration.pool.try_grow_with_arrays(self, arrays)?;
// don't increase size of this pool
arrays
.iter()
.for_each(|array| self.arrays.push(Arc::clone(array)));

Ok(())
}

/// Splits off `capacity` bytes from this [`MemoryReservation`]
/// into a new [`MemoryReservation`] with the same
/// [`MemoryConsumer`].
Expand All @@ -390,6 +456,7 @@ impl MemoryReservation {
Self {
size: capacity,
registration: Arc::clone(&self.registration),
arrays: vec![],
}
}

Expand All @@ -398,6 +465,7 @@ impl MemoryReservation {
Self {
size: 0,
registration: Arc::clone(&self.registration),
arrays: vec![],
}
}

Expand Down
139 changes: 139 additions & 0 deletions datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use datafusion_common::HashMap;
use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
use log::debug;
use parking_lot::Mutex;
use std::sync::Arc;
use std::{
num::NonZeroUsize,
sync::atomic::{AtomicUsize, Ordering},
Expand Down Expand Up @@ -112,6 +113,144 @@ impl MemoryPool for GreedyMemoryPool {
}
}

// A [`MemoryPool`] that implements a greedy first-come first-serve limit.
/// and tracks the memory usage based on the references to the arrays.
///
/// This pool works well for queries that do not need to spill or have
/// a single spillable operator. See [`FairSpillPool`] if there are
/// multiple spillable operators that all will spill.
#[derive(Debug)]
pub struct GreedyMemoryPoolWithTracking {
pool_size: usize,
used: AtomicUsize,
references: Mutex<HashMap<usize, usize>>,
}

impl GreedyMemoryPoolWithTracking {
/// Create a new pool that can allocate up to `pool_size` bytes
pub fn new(pool_size: usize) -> Self {
debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
Self {
pool_size,
used: AtomicUsize::new(0),
references: Mutex::new(HashMap::new()),
}
}
}

impl MemoryPool for GreedyMemoryPoolWithTracking {
fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
self.used.fetch_add(additional, Ordering::Relaxed);
}

fn grow_with_arrays(
&self,
reservation: &MemoryReservation,
arrays: &[Arc<dyn arrow::array::Array>],
) {
for array in arrays {
let array_data = array.to_data();
for buffer in array_data.buffers() {
let addr = buffer.data_ptr().as_ptr() as usize;
let ref_count = *self
.references
.lock()
.entry(addr)
.and_modify(|ref_count| *ref_count += array.get_array_memory_size())
.or_insert(1);

// If this is the first time we see this buffer, we need to grow the pool
if ref_count == 1 {
let additional = buffer.capacity();
self.grow(reservation, additional);
}
}
}
}

fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
self.used.fetch_sub(shrink, Ordering::Relaxed);
}

fn shrink_with_arrays(
&self,
reservation: &MemoryReservation,
arrays: &[Arc<dyn arrow::array::Array>],
) {
for array in arrays {
let array_data = array.to_data();
for buffer in array_data.buffers() {
// We need to track the memory usage of the buffers
let addr = buffer.data_ptr().as_ptr() as usize;
let ref_count = *self
.references
.lock()
.entry(addr)
.and_modify(|ref_count| *ref_count -= buffer.len())
.or_insert(1);

// If this is the last reference to this buffer, we need to shrink the pool
if ref_count == 0 {
let additional = buffer.capacity();
self.shrink(reservation, additional);
}
}
}
}

fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
self.used
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
let new_used = used + additional;
(new_used <= self.pool_size).then_some(new_used)
})
.map_err(|used| {
insufficient_capacity_err(
reservation,
additional,
self.pool_size.saturating_sub(used),
)
})?;
Ok(())
}

fn try_grow_with_arrays(
&self,
reservation: &MemoryReservation,
arrays: &[Arc<dyn arrow::array::Array>],
) -> Result<()> {
for array in arrays.iter() {
// also take into account overhead
let array_data = array.to_data();
let buffers = array_data.buffers();
for buffer in buffers {
let addr = buffer.data_ptr().as_ptr() as usize;
let ref_count = *self
.references
.lock()
.entry(addr)
.and_modify(|ref_count| *ref_count += 1)
.or_insert(1);

// If this is the first time we see this buffer, we need to grow the pool
if ref_count == 1 {
let additional = buffer.capacity();
self.try_grow(reservation, additional)?;
}
}
}
Ok(())
}

fn reserved(&self) -> usize {
self.used.load(Ordering::Relaxed)
}

fn memory_limit(&self) -> MemoryLimit {
MemoryLimit::Finite(self.pool_size)
}
}

/// A [`MemoryPool`] that prevents spillable reservations from using more than
/// an even fraction of the available memory sans any unspillable reservations
/// (i.e. `(pool_size - unspillable_memory) / num_spillable_reservations`)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ async fn load_left_input(
|(mut batches, metrics, mut reservation), batch| async {
let batch_size = batch.get_array_memory_size();
// Reserve memory for incoming batch
reservation.try_grow(batch_size)?;
reservation.try_grow_with_arrays(batch.columns())?;
// Update metrics
metrics.build_mem_used.add(batch_size);
metrics.build_input_batches.add(1);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,7 @@ async fn collect_left_input(
.try_fold(initial, |mut acc, batch| async {
let batch_size = get_record_batch_memory_size(&batch);
// Reserve memory for incoming batch
acc.3.try_grow(batch_size)?;
acc.3.try_grow_with_arrays(batch.columns())?;
// Update metrics
acc.2.build_mem_used.add(batch_size);
acc.2.build_input_batches.add(1);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/recursive_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ impl RecursiveQueryStream {
mut self: std::pin::Pin<&mut Self>,
batch: RecordBatch,
) -> Poll<Option<Result<RecordBatch>>> {
if let Err(e) = self.reservation.try_grow(batch.get_array_memory_size()) {
if let Err(e) = self.reservation.try_grow_with_arrays(batch.columns()) {
return Poll::Ready(Some(Err(e)));
}

Expand Down
Loading