Skip to content

Commit 761f384

Browse files
author
Brent Gardner
committed
Rebase
1 parent a40d4e4 commit 761f384

File tree

5 files changed

+110
-73
lines changed

5 files changed

+110
-73
lines changed

datafusion/core/src/datasource/datasource.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ pub trait TableProvider: Sync + Send {
7474
}
7575

7676
/// A factory which creates [`TableProvider`]s at runtime given a URL.
77-
///
78-
/// For example, this can be used to create a table "on the fly"
77+
///
78+
/// For example, this can be used to create a table "on the fly"
7979
/// from a directory of files only when that name is referenced.
8080
pub trait TableProviderFactory: Sync + Send {
8181
/// Create a TableProvider given name and url

datafusion/core/src/execution/context.rs

Lines changed: 98 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ use crate::datasource::TableProvider;
6868
use crate::error::{DataFusionError, Result};
6969
use crate::logical_plan::{
7070
provider_as_source, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
71-
CreateMemoryTable, CreateView, DropTable, FileType, FunctionRegistry, LogicalPlan,
71+
CreateMemoryTable, CreateView, DropTable, FunctionRegistry, LogicalPlan,
7272
LogicalPlanBuilder, UNNAMED_TABLE,
7373
};
7474
use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
@@ -90,6 +90,7 @@ use crate::config::{
9090
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
9191
OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES,
9292
};
93+
use crate::datasource::datasource::TableProviderFactory;
9394
use crate::execution::runtime_env::RuntimeEnv;
9495
use crate::logical_plan::plan::Explain;
9596
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
@@ -175,6 +176,8 @@ pub struct SessionContext {
175176
pub session_start_time: DateTime<Utc>,
176177
/// Shared session state for the session
177178
pub state: Arc<RwLock<SessionState>>,
179+
/// Dynamic table providers
180+
pub table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
178181
}
179182

180183
impl Default for SessionContext {
@@ -202,6 +205,7 @@ impl SessionContext {
202205
session_id: state.session_id.clone(),
203206
session_start_time: chrono::Utc::now(),
204207
state: Arc::new(RwLock::new(state)),
208+
table_factories: HashMap::default(),
205209
}
206210
}
207211

@@ -211,9 +215,19 @@ impl SessionContext {
211215
session_id: state.session_id.clone(),
212216
session_start_time: chrono::Utc::now(),
213217
state: Arc::new(RwLock::new(state)),
218+
table_factories: HashMap::default(),
214219
}
215220
}
216221

222+
/// Register a `TableProviderFactory` for a given `file_type` identifier
223+
pub fn register_table_factory(
224+
&mut self,
225+
file_type: &str,
226+
factory: Arc<dyn TableProviderFactory>,
227+
) {
228+
self.table_factories.insert(file_type.to_string(), factory);
229+
}
230+
217231
/// Return the [RuntimeEnv] used to run queries with this [SessionContext]
218232
pub fn runtime_env(&self) -> Arc<RuntimeEnv> {
219233
self.state.read().runtime_env.clone()
@@ -236,70 +250,12 @@ impl SessionContext {
236250
pub async fn sql(&self, sql: &str) -> Result<Arc<DataFrame>> {
237251
let plan = self.create_logical_plan(sql)?;
238252
match plan {
239-
LogicalPlan::CreateExternalTable(CreateExternalTable {
240-
ref schema,
241-
ref name,
242-
ref location,
243-
ref file_type,
244-
ref has_header,
245-
ref delimiter,
246-
ref table_partition_cols,
247-
ref if_not_exists,
248-
}) => {
249-
let (file_format, file_extension) = match file_type {
250-
FileType::CSV => (
251-
Arc::new(
252-
CsvFormat::default()
253-
.with_has_header(*has_header)
254-
.with_delimiter(*delimiter as u8),
255-
) as Arc<dyn FileFormat>,
256-
DEFAULT_CSV_EXTENSION,
257-
),
258-
FileType::Parquet => (
259-
Arc::new(ParquetFormat::default()) as Arc<dyn FileFormat>,
260-
DEFAULT_PARQUET_EXTENSION,
261-
),
262-
FileType::Avro => (
263-
Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>,
264-
DEFAULT_AVRO_EXTENSION,
265-
),
266-
FileType::NdJson => (
267-
Arc::new(JsonFormat::default()) as Arc<dyn FileFormat>,
268-
DEFAULT_JSON_EXTENSION,
269-
),
270-
};
271-
let table = self.table(name.as_str());
272-
match (if_not_exists, table) {
273-
(true, Ok(_)) => self.return_empty_dataframe(),
274-
(_, Err(_)) => {
275-
// TODO make schema in CreateExternalTable optional instead of empty
276-
let provided_schema = if schema.fields().is_empty() {
277-
None
278-
} else {
279-
Some(Arc::new(schema.as_ref().to_owned().into()))
280-
};
281-
let options = ListingOptions {
282-
format: file_format,
283-
collect_stat: false,
284-
file_extension: file_extension.to_owned(),
285-
target_partitions: self.copied_config().target_partitions,
286-
table_partition_cols: table_partition_cols.clone(),
287-
};
288-
self.register_listing_table(
289-
name,
290-
location,
291-
options,
292-
provided_schema,
293-
)
294-
.await?;
295-
self.return_empty_dataframe()
296-
}
297-
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
298-
"Table '{:?}' already exists",
299-
name
300-
))),
253+
LogicalPlan::CreateExternalTable(cmd) => match cmd.file_type.as_str() {
254+
"PARQUET" | "CSV" | "JSON" | "AVRO" => {
255+
self.create_listing_table(&cmd).await
301256
}
302-
}
257+
_ => self.create_custom_table(&cmd).await,
258+
},
303259

304260
LogicalPlan::CreateMemoryTable(CreateMemoryTable {
305261
name,
@@ -480,6 +436,84 @@ impl SessionContext {
480436
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
481437
}
482438

439+
async fn create_custom_table(
440+
&self,
441+
cmd: &CreateExternalTable,
442+
) -> Result<Arc<DataFrame>> {
443+
let factory = &self.table_factories.get(&cmd.file_type).ok_or_else(|| {
444+
DataFusionError::Execution(format!(
445+
"Unable to find factory for {}",
446+
cmd.file_type
447+
))
448+
})?;
449+
let table = (*factory).create(cmd.name.as_str(), cmd.location.as_str());
450+
self.register_table(cmd.name.as_str(), table)?;
451+
let plan = LogicalPlanBuilder::empty(false).build()?;
452+
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
453+
}
454+
455+
async fn create_listing_table(
456+
&self,
457+
cmd: &CreateExternalTable,
458+
) -> Result<Arc<DataFrame>> {
459+
let (file_format, file_extension) = match cmd.file_type.as_str() {
460+
"CSV" => (
461+
Arc::new(
462+
CsvFormat::default()
463+
.with_has_header(cmd.has_header)
464+
.with_delimiter(cmd.delimiter as u8),
465+
) as Arc<dyn FileFormat>,
466+
DEFAULT_CSV_EXTENSION,
467+
),
468+
"PARQUET" => (
469+
Arc::new(ParquetFormat::default()) as Arc<dyn FileFormat>,
470+
DEFAULT_PARQUET_EXTENSION,
471+
),
472+
"AVRO" => (
473+
Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>,
474+
DEFAULT_AVRO_EXTENSION,
475+
),
476+
"JSON" => (
477+
Arc::new(JsonFormat::default()) as Arc<dyn FileFormat>,
478+
DEFAULT_JSON_EXTENSION,
479+
),
480+
_ => Err(DataFusionError::Execution(
481+
"Only known FileTypes can be ListingTables!".to_string(),
482+
))?,
483+
};
484+
let table = self.table(cmd.name.as_str());
485+
match (cmd.if_not_exists, table) {
486+
(true, Ok(_)) => self.return_empty_dataframe(),
487+
(_, Err(_)) => {
488+
// TODO make schema in CreateExternalTable optional instead of empty
489+
let provided_schema = if cmd.schema.fields().is_empty() {
490+
None
491+
} else {
492+
Some(Arc::new(cmd.schema.as_ref().to_owned().into()))
493+
};
494+
let options = ListingOptions {
495+
format: file_format,
496+
collect_stat: false,
497+
file_extension: file_extension.to_owned(),
498+
target_partitions: self.copied_config().target_partitions,
499+
table_partition_cols: cmd.table_partition_cols.clone(),
500+
};
501+
self.register_listing_table(
502+
cmd.name.as_str(),
503+
cmd.location.clone(),
504+
options,
505+
provided_schema,
506+
)
507+
.await?;
508+
self.return_empty_dataframe()
509+
}
510+
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
511+
"Table '{:?}' already exists",
512+
cmd.name
513+
))),
514+
}
515+
}
516+
483517
fn find_and_deregister<'a>(
484518
&self,
485519
table_ref: impl Into<TableReference<'a>>,

datafusion/core/tests/sql/create_drop.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -427,8 +427,10 @@ async fn create_bad_custom_table() {
427427
match res {
428428
Ok(_) => panic!("Registration of tables without factories should fail"),
429429
Err(e) => {
430-
assert!(e.to_string().contains("Unable to find factory for"), "Registration of tables without factories should throw correct error")
431-
}
430+
assert!(
431+
e.to_string().contains("Unable to find factory for"),
432+
"Registration of tables without factories should throw correct error"
433+
)
432434
}
433435
}
434436
}

datafusion/expr/src/logical_plan/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ pub use builder::{table_scan, LogicalPlanBuilder};
2424
pub use plan::{
2525
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
2626
CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, DropView,
27-
EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
28-
Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort, StringifiedPlan,
29-
Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Values, Window,
27+
EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit,
28+
LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
29+
StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
30+
Values, Window,
3031
};
3132

3233
pub use display::display_schema;

datafusion/sql/src/planner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ use datafusion_expr::expr_rewriter::normalize_col_with_schemas;
2626
use datafusion_expr::logical_plan::{
2727
Analyze, CreateCatalog, CreateCatalogSchema,
2828
CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, CreateView,
29-
DropTable, DropView, Explain, JoinType, LogicalPlan, LogicalPlanBuilder, Partitioning,
30-
PlanType, ToStringifiedPlan,
29+
DropTable, DropView, Explain, JoinType, LogicalPlan, LogicalPlanBuilder,
30+
Partitioning, PlanType, ToStringifiedPlan,
3131
};
3232
use datafusion_expr::utils::{
3333
can_hash, expand_qualified_wildcard, expand_wildcard, expr_as_column_expr,

0 commit comments

Comments
 (0)