Skip to content

Commit 718a94b

Browse files
committed
Remove MemoryManager
1 parent e07add8 commit 718a94b

File tree

15 files changed

+107
-134
lines changed

15 files changed

+107
-134
lines changed

datafusion/core/src/execution/context.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ use crate::config::{
7878
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
7979
OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES,
8080
};
81-
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry, MemoryManager};
81+
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
8282
use crate::physical_optimizer::enforcement::BasicEnforcement;
8383
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
8484
use crate::physical_plan::planner::DefaultPhysicalPlanner;
@@ -99,6 +99,7 @@ use url::Url;
9999

100100
use crate::catalog::listing_schema::ListingSchemaProvider;
101101
use crate::datasource::object_store::ObjectStoreUrl;
102+
use crate::execution::memory_manager::MemoryPool;
102103
use uuid::Uuid;
103104

104105
use super::options::{
@@ -1961,9 +1962,9 @@ impl TaskContext {
19611962
self.task_id.clone()
19621963
}
19631964

1964-
/// Return the [`MemoryManager`] associated with this [TaskContext]
1965-
pub fn memory_manager(&self) -> &Arc<MemoryManager> {
1966-
&self.runtime.memory_manager
1965+
/// Return the [`MemoryPool`] associated with this [TaskContext]
1966+
pub fn memory_pool(&self) -> &Arc<dyn MemoryPool> {
1967+
&self.runtime.memory_pool
19671968
}
19681969

19691970
/// Return the [RuntimeEnv] associated with this [TaskContext]
@@ -2031,6 +2032,7 @@ mod tests {
20312032
use super::*;
20322033
use crate::assert_batches_eq;
20332034
use crate::execution::context::QueryPlanner;
2035+
use crate::execution::memory_manager::TrackedAllocation;
20342036
use crate::execution::runtime_env::RuntimeConfig;
20352037
use crate::physical_plan::expressions::AvgAccumulator;
20362038
use crate::test;
@@ -2056,20 +2058,23 @@ mod tests {
20562058
let ctx1 = SessionContext::new();
20572059

20582060
// configure with same memory / disk manager
2059-
let memory_manager = ctx1.runtime_env().memory_manager.clone();
2061+
let memory_pool = ctx1.runtime_env().memory_pool.clone();
2062+
2063+
let mut allocation = TrackedAllocation::new(&memory_pool, "test".to_string());
2064+
allocation.grow(100);
2065+
20602066
let disk_manager = ctx1.runtime_env().disk_manager.clone();
20612067

20622068
let ctx2 =
20632069
SessionContext::with_config_rt(SessionConfig::new(), ctx1.runtime_env());
20642070

2065-
assert!(std::ptr::eq(
2066-
Arc::as_ptr(&memory_manager),
2067-
Arc::as_ptr(&ctx1.runtime_env().memory_manager)
2068-
));
2069-
assert!(std::ptr::eq(
2070-
Arc::as_ptr(&memory_manager),
2071-
Arc::as_ptr(&ctx2.runtime_env().memory_manager)
2072-
));
2071+
assert_eq!(ctx1.runtime_env().memory_pool.allocated(), 100);
2072+
assert_eq!(ctx2.runtime_env().memory_pool.allocated(), 100);
2073+
2074+
drop(allocation);
2075+
2076+
assert_eq!(ctx1.runtime_env().memory_pool.allocated(), 0);
2077+
assert_eq!(ctx2.runtime_env().memory_pool.allocated(), 0);
20732078

20742079
assert!(std::ptr::eq(
20752080
Arc::as_ptr(&disk_manager),

datafusion/core/src/execution/memory_manager/mod.rs

Lines changed: 23 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -50,45 +50,6 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug {
5050
fn allocated(&self) -> usize;
5151
}
5252

53-
/// A cooperative MemoryManager which tracks memory in a cooperative fashion.
54-
/// `ExecutionPlan` nodes such as `SortExec`, which require large amounts of memory
55-
/// register their memory requests with the MemoryManager which then tracks the total
56-
/// memory that has been allocated across all such nodes.
57-
///
58-
/// The associated [`MemoryPool`] determines how to respond to memory allocation
59-
/// requests, and any associated fairness control
60-
#[derive(Debug)]
61-
pub struct MemoryManager {
62-
pool: Arc<dyn MemoryPool>,
63-
}
64-
65-
impl MemoryManager {
66-
/// Create new memory manager based on the configuration
67-
pub fn new(pool: Arc<dyn MemoryPool>) -> Self {
68-
Self { pool }
69-
}
70-
71-
/// Returns the number of allocated bytes
72-
///
73-
/// Note: this can exceed the pool size as a result of [`MemoryManager::allocate`]
74-
pub fn allocated(&self) -> usize {
75-
self.pool.allocated()
76-
}
77-
78-
/// Returns a new empty allocation identified by `name`
79-
pub fn new_allocation(&self, name: String) -> TrackedAllocation {
80-
self.new_allocation_with_options(AllocationOptions::new(name))
81-
}
82-
83-
/// Returns a new empty allocation with the provided [`AllocationOptions`]
84-
pub fn new_allocation_with_options(
85-
&self,
86-
options: AllocationOptions,
87-
) -> TrackedAllocation {
88-
TrackedAllocation::new_empty(options, Arc::clone(&self.pool))
89-
}
90-
}
91-
9253
/// Options associated with a [`TrackedAllocation`]
9354
#[derive(Debug)]
9455
pub struct AllocationOptions {
@@ -131,12 +92,21 @@ pub struct TrackedAllocation {
13192
}
13293

13394
impl TrackedAllocation {
134-
fn new_empty(options: AllocationOptions, policy: Arc<dyn MemoryPool>) -> Self {
135-
policy.allocate(&options);
95+
/// Create a new [`TrackedAllocation`] in the provided [`MemoryPool`]
96+
pub fn new(pool: &Arc<dyn MemoryPool>, name: String) -> Self {
97+
Self::new_with_options(pool, AllocationOptions::new(name))
98+
}
99+
100+
/// Create a new [`TrackedAllocation`] in the provided [`MemoryPool`]
101+
pub fn new_with_options(
102+
pool: &Arc<dyn MemoryPool>,
103+
options: AllocationOptions,
104+
) -> Self {
105+
pool.allocate(&options);
136106
Self {
137107
options,
138108
size: 0,
139-
policy,
109+
policy: Arc::clone(pool),
140110
}
141111
}
142112

@@ -231,31 +201,30 @@ mod tests {
231201

232202
#[test]
233203
fn test_memory_manager_underflow() {
234-
let policy = Arc::new(GreedyMemoryPool::new(50));
235-
let manager = MemoryManager::new(policy);
236-
let mut a1 = manager.new_allocation("a1".to_string());
237-
assert_eq!(manager.allocated(), 0);
204+
let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
205+
let mut a1 = TrackedAllocation::new(&pool, "a1".to_string());
206+
assert_eq!(pool.allocated(), 0);
238207

239208
a1.grow(100);
240-
assert_eq!(manager.allocated(), 100);
209+
assert_eq!(pool.allocated(), 100);
241210

242211
assert_eq!(a1.free(), 100);
243-
assert_eq!(manager.allocated(), 0);
212+
assert_eq!(pool.allocated(), 0);
244213

245214
a1.try_grow(100).unwrap_err();
246-
assert_eq!(manager.allocated(), 0);
215+
assert_eq!(pool.allocated(), 0);
247216

248217
a1.try_grow(30).unwrap();
249-
assert_eq!(manager.allocated(), 30);
218+
assert_eq!(pool.allocated(), 30);
250219

251-
let mut a2 = manager.new_allocation("a2".to_string());
220+
let mut a2 = TrackedAllocation::new(&pool, "a2".to_string());
252221
a2.try_grow(25).unwrap_err();
253-
assert_eq!(manager.allocated(), 30);
222+
assert_eq!(pool.allocated(), 30);
254223

255224
drop(a1);
256-
assert_eq!(manager.allocated(), 0);
225+
assert_eq!(pool.allocated(), 0);
257226

258227
a2.try_grow(25).unwrap();
259-
assert_eq!(manager.allocated(), 25);
228+
assert_eq!(pool.allocated(), 25);
260229
}
261230
}

datafusion/core/src/execution/memory_manager/pool.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::execution::memory_manager::{AllocationOptions, MemoryPool};
19-
use crate::execution::TrackedAllocation;
18+
use crate::execution::memory_manager::{
19+
AllocationOptions, MemoryPool, TrackedAllocation,
20+
};
2021
use datafusion_common::{DataFusionError, Result};
2122
use parking_lot::Mutex;
2223
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -220,24 +221,23 @@ fn insufficient_capacity_err(
220221
mod tests {
221222
use super::*;
222223
use crate::execution::memory_manager::AllocationOptions;
223-
use crate::execution::MemoryManager;
224224
use std::sync::Arc;
225225

226226
#[test]
227227
fn test_fair() {
228-
let manager = MemoryManager::new(Arc::new(FairSpillPool::new(100)));
228+
let pool = Arc::new(FairSpillPool::new(100)) as _;
229229

230-
let mut a1 = manager.new_allocation("unspillable".to_string());
230+
let mut a1 = TrackedAllocation::new(&pool, "unspillable".to_string());
231231
// Can grow beyond capacity of pool
232232
a1.grow(2000);
233-
assert_eq!(manager.allocated(), 2000);
233+
assert_eq!(pool.allocated(), 2000);
234234

235235
let options = AllocationOptions::new("s1".to_string()).with_can_spill(true);
236-
let mut a2 = manager.new_allocation_with_options(options);
236+
let mut a2 = TrackedAllocation::new_with_options(&pool, options);
237237
// Can grow beyond capacity of pool
238238
a2.grow(2000);
239239

240-
assert_eq!(manager.allocated(), 4000);
240+
assert_eq!(pool.allocated(), 4000);
241241

242242
let err = a2.try_grow(1).unwrap_err().to_string();
243243
assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for s1 with 2000 bytes already allocated - maximum available is 0");
@@ -248,23 +248,23 @@ mod tests {
248248
a1.shrink(1990);
249249
a2.shrink(2000);
250250

251-
assert_eq!(manager.allocated(), 10);
251+
assert_eq!(pool.allocated(), 10);
252252

253253
a1.try_grow(10).unwrap();
254-
assert_eq!(manager.allocated(), 20);
254+
assert_eq!(pool.allocated(), 20);
255255

256256
// Can grow a2 to 80 as only spilling consumer
257257
a2.try_grow(80).unwrap();
258-
assert_eq!(manager.allocated(), 100);
258+
assert_eq!(pool.allocated(), 100);
259259

260260
a2.shrink(70);
261261

262262
assert_eq!(a1.size(), 20);
263263
assert_eq!(a2.size(), 10);
264-
assert_eq!(manager.allocated(), 30);
264+
assert_eq!(pool.allocated(), 30);
265265

266266
let options = AllocationOptions::new("s2".to_string()).with_can_spill(true);
267-
let mut a3 = manager.new_allocation_with_options(options);
267+
let mut a3 = TrackedAllocation::new_with_options(&pool, options);
268268

269269
let err = a3.try_grow(70).unwrap_err().to_string();
270270
assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for s2 with 0 bytes already allocated - maximum available is 40");
@@ -276,7 +276,7 @@ mod tests {
276276

277277
// But dropping a2 does
278278
drop(a2);
279-
assert_eq!(manager.allocated(), 20);
279+
assert_eq!(pool.allocated(), 20);
280280
a3.try_grow(80).unwrap();
281281
}
282282
}

datafusion/core/src/execution/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,5 +48,4 @@ pub mod registry;
4848
pub mod runtime_env;
4949

5050
pub use disk_manager::DiskManager;
51-
pub use memory_manager::{human_readable_size, MemoryManager, TrackedAllocation};
5251
pub use registry::FunctionRegistry;

datafusion/core/src/execution/runtime_env.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use crate::datasource::object_store::ObjectStoreRegistry;
3030
use crate::execution::memory_manager::{
3131
GreedyMemoryPool, MemoryPool, UnboundedMemoryPool,
3232
};
33-
use crate::execution::MemoryManager;
3433
use datafusion_common::DataFusionError;
3534
use object_store::ObjectStore;
3635
use std::fmt::{Debug, Formatter};
@@ -42,7 +41,7 @@ use url::Url;
4241
/// Execution runtime environment.
4342
pub struct RuntimeEnv {
4443
/// Runtime memory management
45-
pub memory_manager: Arc<MemoryManager>,
44+
pub memory_pool: Arc<dyn MemoryPool>,
4645
/// Manage temporary files during query execution
4746
pub disk_manager: Arc<DiskManager>,
4847
/// Object Store Registry
@@ -67,11 +66,11 @@ impl RuntimeEnv {
6766
table_factories,
6867
} = config;
6968

70-
let pool =
69+
let memory_pool =
7170
memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
7271

7372
Ok(Self {
74-
memory_manager: Arc::new(MemoryManager::new(pool)),
73+
memory_pool,
7574
disk_manager: DiskManager::try_new(disk_manager)?,
7675
object_store_registry,
7776
table_factories,

datafusion/core/src/physical_plan/aggregates/hash.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,10 @@ impl GroupedHashAggregateStream {
124124

125125
timer.done();
126126

127-
let allocation = context
128-
.memory_manager()
129-
.new_allocation(format!("GroupedHashAggregateStream[{}]", partition));
127+
let allocation = TrackedAllocation::new(
128+
context.memory_pool(),
129+
format!("GroupedHashAggregateStream[{}]", partition),
130+
);
130131

131132
let inner = GroupedHashAggregateStreamInner {
132133
schema: Arc::clone(&schema),

datafusion/core/src/physical_plan/aggregates/no_grouping.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,10 @@ impl AggregateStream {
7272
let aggregate_expressions = aggregate_expressions(&aggr_expr, &mode, 0)?;
7373
let accumulators = create_accumulators(&aggr_expr)?;
7474

75-
let allocation = context
76-
.memory_manager()
77-
.new_allocation(format!("AggregateStream[{}]", partition));
75+
let allocation = TrackedAllocation::new(
76+
context.memory_pool(),
77+
format!("AggregateStream[{}]", partition),
78+
);
7879

7980
let inner = AggregateStreamInner {
8081
schema: Arc::clone(&schema),

datafusion/core/src/physical_plan/aggregates/row_hash.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,10 @@ impl GroupedHashAggregateStreamV2 {
139139
let aggr_schema = aggr_state_schema(&aggr_expr)?;
140140

141141
let aggr_layout = Arc::new(RowLayout::new(&aggr_schema, RowType::WordAligned));
142-
let allocation = context
143-
.memory_manager()
144-
.new_allocation(format!("GroupedHashAggregateStreamV2[{}]", partition));
142+
let allocation = TrackedAllocation::new(
143+
context.memory_pool(),
144+
format!("GroupedHashAggregateStreamV2[{}]", partition),
145+
);
145146

146147
let aggr_state = AggregationState {
147148
allocation,

datafusion/core/src/physical_plan/explain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ impl ExecutionPlan for ExplainExec {
153153

154154
let metrics = ExecutionPlanMetricsSet::new();
155155
let tracking_metrics =
156-
MemTrackingMetrics::new(&metrics, context.memory_manager(), partition);
156+
MemTrackingMetrics::new(&metrics, context.memory_pool(), partition);
157157

158158
debug!(
159159
"Before returning SizedRecordBatch in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());

datafusion/core/src/physical_plan/metrics/composite.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
//! Metrics common for complex operators with multiple steps.
1919
20-
use crate::execution::MemoryManager;
20+
use crate::execution::memory_manager::MemoryPool;
2121
use crate::physical_plan::metrics::tracker::MemTrackingMetrics;
2222
use crate::physical_plan::metrics::{
2323
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricValue, MetricsSet, Time,
@@ -69,18 +69,18 @@ impl CompositeMetricsSet {
6969
pub fn new_intermediate_tracking(
7070
&self,
7171
partition: usize,
72-
memory_manager: &MemoryManager,
72+
pool: &Arc<dyn MemoryPool>,
7373
) -> MemTrackingMetrics {
74-
MemTrackingMetrics::new(&self.mid, memory_manager, partition)
74+
MemTrackingMetrics::new(&self.mid, pool, partition)
7575
}
7676

7777
/// create a new final memory tracking metrics
7878
pub fn new_final_tracking(
7979
&self,
8080
partition: usize,
81-
memory_manager: &MemoryManager,
81+
pool: &Arc<dyn MemoryPool>,
8282
) -> MemTrackingMetrics {
83-
MemTrackingMetrics::new(&self.final_, memory_manager, partition)
83+
MemTrackingMetrics::new(&self.final_, pool, partition)
8484
}
8585

8686
fn merge_compute_time(&self, dest: &Time) {

0 commit comments

Comments
 (0)