Skip to content

Commit 3664766

Browse files
authored
use Weak ptr to break catalog list <> info schema cyclic reference (#681)
Fixes #680.
1 parent 0368f59 commit 3664766

File tree

2 files changed

+36
-8
lines changed

2 files changed

+36
-8
lines changed

datafusion/src/catalog/information_schema.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
//!
2020
//! Information Schema](https://en.wikipedia.org/wiki/Information_schema)
2121
22-
use std::{any, sync::Arc};
22+
use std::{
23+
any,
24+
sync::{Arc, Weak},
25+
};
2326

2427
use arrow::{
2528
array::{StringBuilder, UInt64Builder},
@@ -41,14 +44,14 @@ const COLUMNS: &str = "columns";
4144
/// Wraps another [`CatalogProvider`] and adds a "information_schema"
4245
/// schema that can introspect on tables in the catalog_list
4346
pub(crate) struct CatalogWithInformationSchema {
44-
catalog_list: Arc<dyn CatalogList>,
47+
catalog_list: Weak<dyn CatalogList>,
4548
/// wrapped provider
4649
inner: Arc<dyn CatalogProvider>,
4750
}
4851

4952
impl CatalogWithInformationSchema {
5053
pub(crate) fn new(
51-
catalog_list: Arc<dyn CatalogList>,
54+
catalog_list: Weak<dyn CatalogList>,
5255
inner: Arc<dyn CatalogProvider>,
5356
) -> Self {
5457
Self {
@@ -73,9 +76,10 @@ impl CatalogProvider for CatalogWithInformationSchema {
7376

7477
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
7578
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) {
76-
Some(Arc::new(InformationSchemaProvider {
77-
catalog_list: self.catalog_list.clone(),
78-
}))
79+
Weak::upgrade(&self.catalog_list).map(|catalog_list| {
80+
Arc::new(InformationSchemaProvider { catalog_list })
81+
as Arc<dyn SchemaProvider>
82+
})
7983
} else {
8084
self.inner.schema(name)
8185
}

datafusion/src/execution/context.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ impl ExecutionContext {
144144

145145
let default_catalog: Arc<dyn CatalogProvider> = if config.information_schema {
146146
Arc::new(CatalogWithInformationSchema::new(
147-
catalog_list.clone(),
147+
Arc::downgrade(&catalog_list),
148148
Arc::new(default_catalog),
149149
))
150150
} else {
@@ -346,7 +346,7 @@ impl ExecutionContext {
346346
let state = self.state.lock().unwrap();
347347
let catalog = if state.config.information_schema {
348348
Arc::new(CatalogWithInformationSchema::new(
349-
state.catalog_list.clone(),
349+
Arc::downgrade(&state.catalog_list),
350350
catalog,
351351
))
352352
} else {
@@ -924,6 +924,7 @@ mod tests {
924924
use arrow::datatypes::*;
925925
use arrow::record_batch::RecordBatch;
926926
use std::fs::File;
927+
use std::sync::Weak;
927928
use std::thread::{self, JoinHandle};
928929
use std::{io::prelude::*, sync::Mutex};
929930
use tempfile::TempDir;
@@ -3364,6 +3365,29 @@ mod tests {
33643365
assert_batches_sorted_eq!(expected, &result);
33653366
}
33663367

3368+
#[tokio::test]
3369+
async fn catalogs_not_leaked() {
3370+
// the information schema used to introduce cyclic Arcs
3371+
let ctx = ExecutionContext::with_config(
3372+
ExecutionConfig::new().with_information_schema(true),
3373+
);
3374+
3375+
// register a single catalog
3376+
let catalog = Arc::new(MemoryCatalogProvider::new());
3377+
let catalog_weak = Arc::downgrade(&catalog);
3378+
ctx.register_catalog("my_catalog", catalog);
3379+
3380+
let catalog_list_weak = {
3381+
let state = ctx.state.lock().unwrap();
3382+
Arc::downgrade(&state.catalog_list)
3383+
};
3384+
3385+
drop(ctx);
3386+
3387+
assert_eq!(Weak::strong_count(&catalog_list_weak), 0);
3388+
assert_eq!(Weak::strong_count(&catalog_weak), 0);
3389+
}
3390+
33673391
struct MyPhysicalPlanner {}
33683392

33693393
impl PhysicalPlanner for MyPhysicalPlanner {

0 commit comments

Comments
 (0)