Skip to content

Commit 641338f

Browse files
yjshenalamb
andauthored
Add MemTrackingMetrics to ease memory tracking for non-limited memory consumers (#1691)
* Memory manager no longer track consumers, update aggregatedMetricsSet * Easy memory tracking with metrics * use tracking metrics in SPMS * tests * fix * doc * Update datafusion/src/physical_plan/sorts/sort.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * make tracker AtomicUsize Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent ab145c8 commit 641338f

File tree

13 files changed

+525
-386
lines changed

13 files changed

+525
-386
lines changed

datafusion/src/execution/memory_manager.rs

Lines changed: 70 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919
2020
use crate::error::{DataFusionError, Result};
2121
use async_trait::async_trait;
22-
use hashbrown::HashMap;
22+
use hashbrown::HashSet;
2323
use log::debug;
2424
use std::fmt;
2525
use std::fmt::{Debug, Display, Formatter};
2626
use std::sync::atomic::{AtomicUsize, Ordering};
27-
use std::sync::{Arc, Condvar, Mutex, Weak};
27+
use std::sync::{Arc, Condvar, Mutex};
2828

2929
static CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);
3030

@@ -245,10 +245,10 @@ The memory management architecture is the following:
245245
/// Manage memory usage during physical plan execution
246246
#[derive(Debug)]
247247
pub struct MemoryManager {
248-
requesters: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>,
249-
trackers: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>,
248+
requesters: Arc<Mutex<HashSet<MemoryConsumerId>>>,
250249
pool_size: usize,
251250
requesters_total: Arc<Mutex<usize>>,
251+
trackers_total: AtomicUsize,
252252
cv: Condvar,
253253
}
254254

@@ -267,41 +267,47 @@ impl MemoryManager {
267267
);
268268

269269
Arc::new(Self {
270-
requesters: Arc::new(Mutex::new(HashMap::new())),
271-
trackers: Arc::new(Mutex::new(HashMap::new())),
270+
requesters: Arc::new(Mutex::new(HashSet::new())),
272271
pool_size,
273272
requesters_total: Arc::new(Mutex::new(0)),
273+
trackers_total: AtomicUsize::new(0),
274274
cv: Condvar::new(),
275275
})
276276
}
277277
}
278278
}
279279

280280
fn get_tracker_total(&self) -> usize {
281-
let trackers = self.trackers.lock().unwrap();
282-
if trackers.len() > 0 {
283-
trackers.values().fold(0usize, |acc, y| match y.upgrade() {
284-
None => acc,
285-
Some(t) => acc + t.mem_used(),
286-
})
287-
} else {
288-
0
289-
}
281+
self.trackers_total.load(Ordering::SeqCst)
290282
}
291283

292-
/// Register a new memory consumer for memory usage tracking
293-
pub(crate) fn register_consumer(&self, consumer: &Arc<dyn MemoryConsumer>) {
294-
let id = consumer.id().clone();
295-
match consumer.type_() {
296-
ConsumerType::Requesting => {
297-
let mut requesters = self.requesters.lock().unwrap();
298-
requesters.insert(id, Arc::downgrade(consumer));
299-
}
300-
ConsumerType::Tracking => {
301-
let mut trackers = self.trackers.lock().unwrap();
302-
trackers.insert(id, Arc::downgrade(consumer));
303-
}
304-
}
284+
pub(crate) fn grow_tracker_usage(&self, delta: usize) {
285+
self.trackers_total.fetch_add(delta, Ordering::SeqCst);
286+
}
287+
288+
pub(crate) fn shrink_tracker_usage(&self, delta: usize) {
289+
let update =
290+
self.trackers_total
291+
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
292+
if x >= delta {
293+
Some(x - delta)
294+
} else {
295+
None
296+
}
297+
});
298+
update.expect(&*format!(
299+
"Tracker total memory shrink by {} underflow, current value is ",
300+
delta
301+
));
302+
}
303+
304+
fn get_requester_total(&self) -> usize {
305+
*self.requesters_total.lock().unwrap()
306+
}
307+
308+
/// Register a new memory requester
309+
pub(crate) fn register_requester(&self, requester_id: &MemoryConsumerId) {
310+
self.requesters.lock().unwrap().insert(requester_id.clone());
305311
}
306312

307313
fn max_mem_for_requesters(&self) -> usize {
@@ -317,7 +323,6 @@ impl MemoryManager {
317323

318324
let granted;
319325
loop {
320-
let remaining = rqt_max - *rqt_current_used;
321326
let max_per_rqt = rqt_max / num_rqt;
322327
let min_per_rqt = max_per_rqt / 2;
323328

@@ -326,6 +331,7 @@ impl MemoryManager {
326331
break;
327332
}
328333

334+
let remaining = rqt_max.checked_sub(*rqt_current_used).unwrap_or_default();
329335
if remaining >= required {
330336
granted = true;
331337
*rqt_current_used += required;
@@ -347,46 +353,37 @@ impl MemoryManager {
347353

348354
fn record_free_then_acquire(&self, freed: usize, acquired: usize) {
349355
let mut requesters_total = self.requesters_total.lock().unwrap();
356+
assert!(*requesters_total >= freed);
350357
*requesters_total -= freed;
351358
*requesters_total += acquired;
352359
self.cv.notify_all()
353360
}
354361

355-
/// Drop a memory consumer from memory usage tracking
356-
pub(crate) fn drop_consumer(&self, id: &MemoryConsumerId) {
362+
/// Drop a memory consumer and reclaim the memory
363+
pub(crate) fn drop_consumer(&self, id: &MemoryConsumerId, mem_used: usize) {
357364
// find in requesters first
358365
{
359366
let mut requesters = self.requesters.lock().unwrap();
360-
if requesters.remove(id).is_some() {
361-
return;
367+
if requesters.remove(id) {
368+
let mut total = self.requesters_total.lock().unwrap();
369+
assert!(*total >= mem_used);
370+
*total -= mem_used;
362371
}
363372
}
364-
let mut trackers = self.trackers.lock().unwrap();
365-
trackers.remove(id);
373+
self.shrink_tracker_usage(mem_used);
374+
self.cv.notify_all();
366375
}
367376
}
368377

369378
impl Display for MemoryManager {
370379
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
371-
let requesters =
372-
self.requesters
373-
.lock()
374-
.unwrap()
375-
.values()
376-
.fold(vec![], |mut acc, consumer| match consumer.upgrade() {
377-
None => acc,
378-
Some(c) => {
379-
acc.push(format!("{}", c));
380-
acc
381-
}
382-
});
383-
let tracker_mem = self.get_tracker_total();
384380
write!(f,
385-
"MemoryManager usage statistics: total {}, tracker used {}, total {} requesters detail: \n {},",
386-
human_readable_size(self.pool_size),
387-
human_readable_size(tracker_mem),
388-
&requesters.len(),
389-
requesters.join("\n"))
381+
"MemoryManager usage statistics: total {}, trackers used {}, total {} requesters used: {}",
382+
human_readable_size(self.pool_size),
383+
human_readable_size(self.get_tracker_total()),
384+
self.requesters.lock().unwrap().len(),
385+
human_readable_size(self.get_requester_total()),
386+
)
390387
}
391388
}
392389

@@ -418,6 +415,8 @@ mod tests {
418415
use super::*;
419416
use crate::error::Result;
420417
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
418+
use crate::execution::MemoryConsumer;
419+
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
421420
use async_trait::async_trait;
422421
use std::sync::atomic::{AtomicUsize, Ordering};
423422
use std::sync::Arc;
@@ -487,6 +486,7 @@ mod tests {
487486

488487
impl DummyTracker {
489488
fn new(partition: usize, runtime: Arc<RuntimeEnv>, mem_used: usize) -> Self {
489+
runtime.grow_tracker_usage(mem_used);
490490
Self {
491491
id: MemoryConsumerId::new(partition),
492492
runtime,
@@ -528,23 +528,29 @@ mod tests {
528528
.with_memory_manager(MemoryManagerConfig::try_new_limit(100, 1.0).unwrap());
529529
let runtime = Arc::new(RuntimeEnv::new(config).unwrap());
530530

531-
let tracker1 = Arc::new(DummyTracker::new(0, runtime.clone(), 5));
532-
runtime.register_consumer(&(tracker1.clone() as Arc<dyn MemoryConsumer>));
531+
DummyTracker::new(0, runtime.clone(), 5);
533532
assert_eq!(runtime.memory_manager.get_tracker_total(), 5);
534533

535-
let tracker2 = Arc::new(DummyTracker::new(0, runtime.clone(), 10));
536-
runtime.register_consumer(&(tracker2.clone() as Arc<dyn MemoryConsumer>));
534+
let tracker1 = DummyTracker::new(0, runtime.clone(), 10);
537535
assert_eq!(runtime.memory_manager.get_tracker_total(), 15);
538536

539-
let tracker3 = Arc::new(DummyTracker::new(0, runtime.clone(), 15));
540-
runtime.register_consumer(&(tracker3.clone() as Arc<dyn MemoryConsumer>));
537+
DummyTracker::new(0, runtime.clone(), 15);
541538
assert_eq!(runtime.memory_manager.get_tracker_total(), 30);
542539

543-
runtime.drop_consumer(tracker2.id());
540+
runtime.drop_consumer(tracker1.id(), tracker1.mem_used);
541+
assert_eq!(runtime.memory_manager.get_tracker_total(), 20);
542+
543+
// MemTrackingMetrics as an easy way to track memory
544+
let ms = ExecutionPlanMetricsSet::new();
545+
let tracking_metric = MemTrackingMetrics::new_with_rt(&ms, 0, runtime.clone());
546+
tracking_metric.init_mem_used(15);
547+
assert_eq!(runtime.memory_manager.get_tracker_total(), 35);
548+
549+
drop(tracking_metric);
544550
assert_eq!(runtime.memory_manager.get_tracker_total(), 20);
545551

546-
let requester1 = Arc::new(DummyRequester::new(0, runtime.clone()));
547-
runtime.register_consumer(&(requester1.clone() as Arc<dyn MemoryConsumer>));
552+
let requester1 = DummyRequester::new(0, runtime.clone());
553+
runtime.register_requester(requester1.id());
548554

549555
// first requester entered, should be able to use any of the remaining 80
550556
requester1.do_with_mem(40).await.unwrap();
@@ -553,8 +559,8 @@ mod tests {
553559
assert_eq!(requester1.mem_used(), 50);
554560
assert_eq!(*runtime.memory_manager.requesters_total.lock().unwrap(), 50);
555561

556-
let requester2 = Arc::new(DummyRequester::new(0, runtime.clone()));
557-
runtime.register_consumer(&(requester2.clone() as Arc<dyn MemoryConsumer>));
562+
let requester2 = DummyRequester::new(0, runtime.clone());
563+
runtime.register_requester(requester2.id());
558564

559565
requester2.do_with_mem(20).await.unwrap();
560566
requester2.do_with_mem(30).await.unwrap();

datafusion/src/execution/runtime_env.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@ use crate::{
2222
error::Result,
2323
execution::{
2424
disk_manager::{DiskManager, DiskManagerConfig},
25-
memory_manager::{
26-
MemoryConsumer, MemoryConsumerId, MemoryManager, MemoryManagerConfig,
27-
},
25+
memory_manager::{MemoryConsumerId, MemoryManager, MemoryManagerConfig},
2826
},
2927
};
3028

@@ -71,13 +69,23 @@ impl RuntimeEnv {
7169
}
7270

7371
/// Register the consumer to get it tracked
74-
pub fn register_consumer(&self, memory_consumer: &Arc<dyn MemoryConsumer>) {
75-
self.memory_manager.register_consumer(memory_consumer);
72+
pub fn register_requester(&self, id: &MemoryConsumerId) {
73+
self.memory_manager.register_requester(id);
7674
}
7775

78-
/// Drop the consumer from get tracked
79-
pub fn drop_consumer(&self, id: &MemoryConsumerId) {
80-
self.memory_manager.drop_consumer(id)
76+
/// Drop the consumer from get tracked, reclaim memory
77+
pub fn drop_consumer(&self, id: &MemoryConsumerId, mem_used: usize) {
78+
self.memory_manager.drop_consumer(id, mem_used)
79+
}
80+
81+
/// Grow tracker memory of `delta`
82+
pub fn grow_tracker_usage(&self, delta: usize) {
83+
self.memory_manager.grow_tracker_usage(delta)
84+
}
85+
86+
/// Shrink tracker memory of `delta`
87+
pub fn shrink_tracker_usage(&self, delta: usize) {
88+
self.memory_manager.shrink_tracker_usage(delta)
8189
}
8290
}
8391

datafusion/src/physical_plan/common.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use super::{RecordBatchStream, SendableRecordBatchStream};
2121
use crate::error::{DataFusionError, Result};
2222
use crate::execution::runtime_env::RuntimeEnv;
23-
use crate::physical_plan::metrics::BaselineMetrics;
23+
use crate::physical_plan::metrics::MemTrackingMetrics;
2424
use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
2525
use arrow::compute::concat;
2626
use arrow::datatypes::{Schema, SchemaRef};
@@ -43,21 +43,23 @@ pub struct SizedRecordBatchStream {
4343
schema: SchemaRef,
4444
batches: Vec<Arc<RecordBatch>>,
4545
index: usize,
46-
baseline_metrics: BaselineMetrics,
46+
metrics: MemTrackingMetrics,
4747
}
4848

4949
impl SizedRecordBatchStream {
5050
/// Create a new RecordBatchIterator
5151
pub fn new(
5252
schema: SchemaRef,
5353
batches: Vec<Arc<RecordBatch>>,
54-
baseline_metrics: BaselineMetrics,
54+
metrics: MemTrackingMetrics,
5555
) -> Self {
56+
let size = batches.iter().map(|b| batch_byte_size(b)).sum::<usize>();
57+
metrics.init_mem_used(size);
5658
SizedRecordBatchStream {
5759
schema,
5860
index: 0,
5961
batches,
60-
baseline_metrics,
62+
metrics,
6163
}
6264
}
6365
}
@@ -75,7 +77,7 @@ impl Stream for SizedRecordBatchStream {
7577
} else {
7678
None
7779
});
78-
self.baseline_metrics.record_poll(poll)
80+
self.metrics.record_poll(poll)
7981
}
8082
}
8183

datafusion/src/physical_plan/explain.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatc
3232

3333
use super::SendableRecordBatchStream;
3434
use crate::execution::runtime_env::RuntimeEnv;
35-
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
35+
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
3636
use async_trait::async_trait;
3737

3838
/// Explain execution plan operator. This operator contains the string
@@ -148,12 +148,12 @@ impl ExecutionPlan for ExplainExec {
148148
)?;
149149

150150
let metrics = ExecutionPlanMetricsSet::new();
151-
let baseline_metrics = BaselineMetrics::new(&metrics, partition);
151+
let tracking_metrics = MemTrackingMetrics::new(&metrics, partition);
152152

153153
Ok(Box::pin(SizedRecordBatchStream::new(
154154
self.schema.clone(),
155155
vec![Arc::new(record_batch)],
156-
baseline_metrics,
156+
tracking_metrics,
157157
)))
158158
}
159159

0 commit comments

Comments
 (0)