Skip to content

Commit 6859b93

Browse files
authored
refactor: move CteWorkTable, default_table_source a bunch of files out of core (#15316)
* First Iteration * fix: CI tests * stable waypoint, documentation pending, memory moce pending * stable waypoint 2: add document pending, get a heads up pending * pushing for test and review * fix:mock in test * fix:cliipy
1 parent 8756197 commit 6859b93

File tree

15 files changed

+189
-123
lines changed

15 files changed

+189
-123
lines changed

datafusion/catalog-listing/src/helpers.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,5 +1079,9 @@ mod tests {
10791079
fn table_options_mut(&mut self) -> &mut TableOptions {
10801080
unimplemented!()
10811081
}
1082+
1083+
fn task_ctx(&self) -> Arc<datafusion_execution::TaskContext> {
1084+
unimplemented!()
1085+
}
10821086
}
10831087
}

datafusion/core/src/datasource/cte_worktable.rs renamed to datafusion/catalog/src/cte_worktable.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,17 @@
2020
use std::sync::Arc;
2121
use std::{any::Any, borrow::Cow};
2222

23+
use crate::Session;
2324
use arrow::datatypes::SchemaRef;
2425
use async_trait::async_trait;
25-
use datafusion_catalog::Session;
2626
use datafusion_physical_plan::work_table::WorkTableExec;
2727

28-
use crate::{
29-
error::Result,
30-
logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown},
31-
physical_plan::ExecutionPlan,
32-
};
28+
use datafusion_physical_plan::ExecutionPlan;
3329

34-
use crate::datasource::{TableProvider, TableType};
30+
use datafusion_common::error::Result;
31+
use datafusion_expr::{Expr, LogicalPlan, TableProviderFilterPushDown, TableType};
32+
33+
use crate::TableProvider;
3534

3635
/// The temporary working table where the previous iteration of a recursive query is stored
3736
/// Naming is based on PostgreSQL's implementation.

datafusion/core/src/datasource/default_table_source.rs renamed to datafusion/catalog/src/default_table_source.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use std::sync::Arc;
2121
use std::{any::Any, borrow::Cow};
2222

23-
use crate::datasource::TableProvider;
23+
use crate::TableProvider;
2424

2525
use arrow::datatypes::SchemaRef;
2626
use datafusion_common::{internal_err, Constraints};
@@ -133,7 +133,7 @@ fn preserves_table_type() {
133133

134134
async fn scan(
135135
&self,
136-
_: &dyn datafusion_catalog::Session,
136+
_: &dyn crate::Session,
137137
_: Option<&Vec<usize>>,
138138
_: &[Expr],
139139
_: Option<usize>,

datafusion/catalog/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ pub use r#async::*;
4949
pub use schema::*;
5050
pub use session::*;
5151
pub use table::*;
52+
pub mod cte_worktable;
53+
pub mod default_table_source;
5254
pub mod stream;
5355
pub mod streaming;
5456
pub mod view;

datafusion/catalog/src/memory.rs renamed to datafusion/catalog/src/memory/catalog.rs

Lines changed: 2 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@
1818
//! [`MemoryCatalogProvider`], [`MemoryCatalogProviderList`]: In-memory
1919
//! implementations of [`CatalogProviderList`] and [`CatalogProvider`].
2020
21-
use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider};
22-
use async_trait::async_trait;
21+
use crate::{CatalogProvider, CatalogProviderList, SchemaProvider};
2322
use dashmap::DashMap;
24-
use datafusion_common::{exec_err, DataFusionError};
23+
use datafusion_common::exec_err;
2524
use std::any::Any;
2625
use std::sync::Arc;
2726

@@ -134,67 +133,3 @@ impl CatalogProvider for MemoryCatalogProvider {
134133
}
135134
}
136135
}
137-
138-
/// Simple in-memory implementation of a schema.
139-
#[derive(Debug)]
140-
pub struct MemorySchemaProvider {
141-
tables: DashMap<String, Arc<dyn TableProvider>>,
142-
}
143-
144-
impl MemorySchemaProvider {
145-
/// Instantiates a new MemorySchemaProvider with an empty collection of tables.
146-
pub fn new() -> Self {
147-
Self {
148-
tables: DashMap::new(),
149-
}
150-
}
151-
}
152-
153-
impl Default for MemorySchemaProvider {
154-
fn default() -> Self {
155-
Self::new()
156-
}
157-
}
158-
159-
#[async_trait]
160-
impl SchemaProvider for MemorySchemaProvider {
161-
fn as_any(&self) -> &dyn Any {
162-
self
163-
}
164-
165-
fn table_names(&self) -> Vec<String> {
166-
self.tables
167-
.iter()
168-
.map(|table| table.key().clone())
169-
.collect()
170-
}
171-
172-
async fn table(
173-
&self,
174-
name: &str,
175-
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
176-
Ok(self.tables.get(name).map(|table| Arc::clone(table.value())))
177-
}
178-
179-
fn register_table(
180-
&self,
181-
name: String,
182-
table: Arc<dyn TableProvider>,
183-
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
184-
if self.table_exist(name.as_str()) {
185-
return exec_err!("The table {name} already exists");
186-
}
187-
Ok(self.tables.insert(name, table))
188-
}
189-
190-
fn deregister_table(
191-
&self,
192-
name: &str,
193-
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
194-
Ok(self.tables.remove(name).map(|(_, table)| table))
195-
}
196-
197-
fn table_exist(&self, name: &str) -> bool {
198-
self.tables.contains_key(name)
199-
}
200-
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
pub(crate) mod catalog;
19+
pub(crate) mod schema;
20+
21+
pub use catalog::*;
22+
pub use schema::*;
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! [`MemorySchemaProvider`]: In-memory implementations of [`SchemaProvider`].
19+
20+
use crate::{SchemaProvider, TableProvider};
21+
use async_trait::async_trait;
22+
use dashmap::DashMap;
23+
use datafusion_common::{exec_err, DataFusionError};
24+
use std::any::Any;
25+
use std::sync::Arc;
26+
27+
/// Simple in-memory implementation of a schema.
28+
#[derive(Debug)]
29+
pub struct MemorySchemaProvider {
30+
tables: DashMap<String, Arc<dyn TableProvider>>,
31+
}
32+
33+
impl MemorySchemaProvider {
34+
/// Instantiates a new MemorySchemaProvider with an empty collection of tables.
35+
pub fn new() -> Self {
36+
Self {
37+
tables: DashMap::new(),
38+
}
39+
}
40+
}
41+
42+
impl Default for MemorySchemaProvider {
43+
fn default() -> Self {
44+
Self::new()
45+
}
46+
}
47+
48+
#[async_trait]
49+
impl SchemaProvider for MemorySchemaProvider {
50+
fn as_any(&self) -> &dyn Any {
51+
self
52+
}
53+
54+
fn table_names(&self) -> Vec<String> {
55+
self.tables
56+
.iter()
57+
.map(|table| table.key().clone())
58+
.collect()
59+
}
60+
61+
async fn table(
62+
&self,
63+
name: &str,
64+
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
65+
Ok(self.tables.get(name).map(|table| Arc::clone(table.value())))
66+
}
67+
68+
fn register_table(
69+
&self,
70+
name: String,
71+
table: Arc<dyn TableProvider>,
72+
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
73+
if self.table_exist(name.as_str()) {
74+
return exec_err!("The table {name} already exists");
75+
}
76+
Ok(self.tables.insert(name, table))
77+
}
78+
79+
fn deregister_table(
80+
&self,
81+
name: &str,
82+
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
83+
Ok(self.tables.remove(name).map(|(_, table)| table))
84+
}
85+
86+
fn table_exist(&self, name: &str) -> bool {
87+
self.tables.contains_key(name)
88+
}
89+
}

datafusion/catalog/src/session.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ pub trait Session: Send + Sync {
132132

133133
/// Returns a mutable reference to [`TableOptions`]
134134
fn table_options_mut(&mut self) -> &mut TableOptions;
135+
136+
/// Get a new TaskContext to run in this session
137+
fn task_ctx(&self) -> Arc<TaskContext>;
135138
}
136139

137140
/// Create a new task context instance from Session

datafusion/core/src/datasource/memory.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use std::sync::Arc;
2424

2525
use crate::datasource::{TableProvider, TableType};
2626
use crate::error::Result;
27-
use crate::execution::context::SessionState;
2827
use crate::logical_expr::Expr;
2928
use crate::physical_plan::insert::{DataSink, DataSinkExec};
3029
use crate::physical_plan::repartition::RepartitionExec;
@@ -129,7 +128,7 @@ impl MemTable {
129128
pub async fn load(
130129
t: Arc<dyn TableProvider>,
131130
output_partitions: Option<usize>,
132-
state: &SessionState,
131+
state: &dyn Session,
133132
) -> Result<Self> {
134133
let schema = t.schema();
135134
let constraints = t.constraints();
@@ -267,6 +266,8 @@ impl TableProvider for MemTable {
267266
/// # Returns
268267
///
269268
/// * A plan that returns the number of rows written.
269+
///
270+
/// [`SessionState`]: crate::execution::context::SessionState
270271
async fn insert_into(
271272
&self,
272273
_state: &dyn Session,

datafusion/core/src/datasource/mod.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
//!
2020
//! [`ListingTable`]: crate::datasource::listing::ListingTable
2121
22-
pub mod cte_worktable;
23-
pub mod default_table_source;
2422
pub mod dynamic_file;
2523
pub mod empty;
2624
pub mod file_format;
@@ -32,11 +30,6 @@ pub mod provider;
3230
mod statistics;
3331
mod view_test;
3432

35-
pub use datafusion_catalog::stream;
36-
pub use datafusion_catalog::view;
37-
pub use datafusion_datasource::schema_adapter;
38-
pub use datafusion_datasource::source;
39-
4033
// backwards compatibility
4134
pub use self::default_table_source::{
4235
provider_as_source, source_as_provider, DefaultTableSource,
@@ -45,6 +38,12 @@ pub use self::memory::MemTable;
4538
pub use self::view::ViewTable;
4639
pub use crate::catalog::TableProvider;
4740
pub use crate::logical_expr::TableType;
41+
pub use datafusion_catalog::cte_worktable;
42+
pub use datafusion_catalog::default_table_source;
43+
pub use datafusion_catalog::stream;
44+
pub use datafusion_catalog::view;
45+
pub use datafusion_datasource::schema_adapter;
46+
pub use datafusion_datasource::source;
4847
pub use datafusion_execution::object_store;
4948
pub use datafusion_physical_expr::create_ordering;
5049
pub use statistics::get_statistics_with_limit;

0 commit comments

Comments
 (0)