Skip to content

Commit a0d6b2f

Browse files
author
Brent Gardner
authored
Update TableProviderFactory trait to support real-world use-cases (#3867)
* Add failing test * passing test * Restore cargo.tomls * Move CustomTable into Deltalake * Fix manifest * Minimal code to pass dynamic delta table registration * Fix manifest * Fix build?
1 parent 62aeb75 commit a0d6b2f

File tree

5 files changed

+57
-29
lines changed

5 files changed

+57
-29
lines changed

datafusion/core/src/datasource/datasource.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,9 @@ pub trait TableProvider: Sync + Send {
7676
/// A factory which creates [`TableProvider`]s at runtime given a URL.
7777
///
7878
/// For example, this can be used to create a table "on the fly"
79-
/// from a directory of files only when that name is referenced.
79+
/// from a directory of files only when that name is referenced.
80+
#[async_trait]
8081
pub trait TableProviderFactory: Sync + Send {
8182
/// Create a TableProvider given name and url
82-
fn create(&self, name: &str, url: &str) -> Arc<dyn TableProvider>;
83+
async fn create(&self, name: &str, url: &str) -> Result<Arc<dyn TableProvider>>;
8384
}

datafusion/core/src/execution/context.rs

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ use crate::config::{
8080
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
8181
OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES,
8282
};
83-
use crate::datasource::datasource::TableProviderFactory;
8483
use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
8584
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
8685
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
@@ -159,8 +158,6 @@ pub struct SessionContext {
159158
pub session_start_time: DateTime<Utc>,
160159
/// Shared session state for the session
161160
pub state: Arc<RwLock<SessionState>>,
162-
/// Dynamic table providers
163-
pub table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
164161
}
165162

166163
impl Default for SessionContext {
@@ -188,7 +185,6 @@ impl SessionContext {
188185
session_id: state.session_id.clone(),
189186
session_start_time: chrono::Utc::now(),
190187
state: Arc::new(RwLock::new(state)),
191-
table_factories: HashMap::default(),
192188
}
193189
}
194190

@@ -198,19 +194,9 @@ impl SessionContext {
198194
session_id: state.session_id.clone(),
199195
session_start_time: chrono::Utc::now(),
200196
state: Arc::new(RwLock::new(state)),
201-
table_factories: HashMap::default(),
202197
}
203198
}
204199

205-
/// Register a `TableProviderFactory` for a given `file_type` identifier
206-
pub fn register_table_factory(
207-
&mut self,
208-
file_type: &str,
209-
factory: Arc<dyn TableProviderFactory>,
210-
) {
211-
self.table_factories.insert(file_type.to_string(), factory);
212-
}
213-
214200
/// Registers the [`RecordBatch`] as the specified table name
215201
pub fn register_batch(
216202
&self,
@@ -431,13 +417,20 @@ impl SessionContext {
431417
&self,
432418
cmd: &CreateExternalTable,
433419
) -> Result<Arc<DataFrame>> {
434-
let factory = &self.table_factories.get(&cmd.file_type).ok_or_else(|| {
435-
DataFusionError::Execution(format!(
436-
"Unable to find factory for {}",
437-
cmd.file_type
438-
))
439-
})?;
440-
let table = (*factory).create(cmd.name.as_str(), cmd.location.as_str());
420+
let state = self.state.read().clone();
421+
let factory = &state
422+
.runtime_env
423+
.table_factories
424+
.get(&cmd.file_type)
425+
.ok_or_else(|| {
426+
DataFusionError::Execution(format!(
427+
"Unable to find factory for {}",
428+
cmd.file_type
429+
))
430+
})?;
431+
let table = (*factory)
432+
.create(cmd.name.as_str(), cmd.location.as_str())
433+
.await?;
441434
self.register_table(cmd.name.as_str(), table)?;
442435
let plan = LogicalPlanBuilder::empty(false).build()?;
443436
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))

datafusion/core/src/execution/runtime_env.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ use crate::{
2525
memory_manager::{MemoryConsumerId, MemoryManager, MemoryManagerConfig},
2626
},
2727
};
28+
use std::collections::HashMap;
2829

30+
use crate::datasource::datasource::TableProviderFactory;
2931
use crate::datasource::object_store::ObjectStoreRegistry;
3032
use datafusion_common::DataFusionError;
3133
use object_store::ObjectStore;
@@ -43,6 +45,8 @@ pub struct RuntimeEnv {
4345
pub disk_manager: Arc<DiskManager>,
4446
/// Object Store Registry
4547
pub object_store_registry: Arc<ObjectStoreRegistry>,
48+
/// TableProviderFactories
49+
pub table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
4650
}
4751

4852
impl Debug for RuntimeEnv {
@@ -58,12 +62,14 @@ impl RuntimeEnv {
5862
memory_manager,
5963
disk_manager,
6064
object_store_registry,
65+
table_factories,
6166
} = config;
6267

6368
Ok(Self {
6469
memory_manager: MemoryManager::new(memory_manager),
6570
disk_manager: DiskManager::try_new(disk_manager)?,
6671
object_store_registry,
72+
table_factories,
6773
})
6874
}
6975

@@ -87,7 +93,7 @@ impl RuntimeEnv {
8793
self.memory_manager.shrink_tracker_usage(delta)
8894
}
8995

90-
/// Registers a object store with scheme using a custom `ObjectStore` so that
96+
/// Registers an object store with scheme using a custom `ObjectStore` so that
9197
/// an external file system or object storage system could be used against this context.
9298
///
9399
/// Returns the `ObjectStore` previously registered for this scheme, if any
@@ -101,6 +107,14 @@ impl RuntimeEnv {
101107
.register_store(scheme, host, object_store)
102108
}
103109

110+
/// Registers TableFactories
111+
pub fn register_table_factories(
112+
&mut self,
113+
table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
114+
) {
115+
self.table_factories.extend(table_factories)
116+
}
117+
104118
/// Retrieves a `ObjectStore` instance for a url
105119
pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
106120
self.object_store_registry
@@ -124,6 +138,8 @@ pub struct RuntimeConfig {
124138
pub memory_manager: MemoryManagerConfig,
125139
/// ObjectStoreRegistry to get object store based on url
126140
pub object_store_registry: Arc<ObjectStoreRegistry>,
141+
/// Custom table factories for things like deltalake that are not part of core datafusion
142+
pub table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
127143
}
128144

129145
impl RuntimeConfig {
@@ -153,6 +169,15 @@ impl RuntimeConfig {
153169
self
154170
}
155171

172+
/// Customize object store registry
173+
pub fn with_table_factories(
174+
mut self,
175+
table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
176+
) -> Self {
177+
self.table_factories = table_factories;
178+
self
179+
}
180+
156181
/// Specify the total memory to use while running the DataFusion
157182
/// plan to `max_memory * memory_fraction` in bytes.
158183
///

datafusion/core/tests/sql/create_drop.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
use async_trait::async_trait;
1919
use std::any::Any;
20+
use std::collections::HashMap;
2021
use std::io::Write;
2122

2223
use datafusion::datasource::datasource::TableProviderFactory;
2324
use datafusion::execution::context::SessionState;
25+
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
2426
use datafusion_expr::TableType;
2527
use tempfile::TempDir;
2628

@@ -398,16 +400,22 @@ impl TableProvider for TestTableProvider {
398400

399401
struct TestTableFactory {}
400402

403+
#[async_trait]
401404
impl TableProviderFactory for TestTableFactory {
402-
fn create(&self, _name: &str, _path: &str) -> Arc<dyn TableProvider> {
403-
Arc::new(TestTableProvider {})
405+
async fn create(&self, _name: &str, _url: &str) -> Result<Arc<dyn TableProvider>> {
406+
Ok(Arc::new(TestTableProvider {}))
404407
}
405408
}
406409

407410
#[tokio::test]
408411
async fn create_custom_table() -> Result<()> {
409-
let mut ctx = SessionContext::new();
410-
ctx.register_table_factory("DELTATABLE", Arc::new(TestTableFactory {}));
412+
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
413+
HashMap::new();
414+
table_factories.insert("DELTATABLE".to_string(), Arc::new(TestTableFactory {}));
415+
let cfg = RuntimeConfig::new().with_table_factories(table_factories);
416+
let env = RuntimeEnv::new(cfg).unwrap();
417+
let ses = SessionConfig::new();
418+
let ctx = SessionContext::with_config_rt(ses, Arc::new(env));
411419

412420
let sql = "CREATE EXTERNAL TABLE dt STORED AS DELTATABLE LOCATION 's3://bucket/schema/table';";
413421
ctx.sql(sql).await.unwrap();

datafusion/proto/src/logical_plan.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,8 @@ impl AsLogicalPlan for LogicalPlanNode {
469469
match create_extern_table.file_type.as_str() {
470470
"CSV" | "JSON" | "PARQUET" | "AVRO" => {}
471471
it => {
472-
if !ctx.table_factories.contains_key(it) {
472+
let env = &ctx.state.as_ref().read().runtime_env;
473+
if !env.table_factories.contains_key(it) {
473474
Err(DataFusionError::Internal(format!(
474475
"No TableProvider for file type: {}",
475476
it

0 commit comments

Comments
 (0)