Skip to content

Commit 0084aeb

Browse files
psvrialamb
andauthored
Add show external tables (#3279)
* Add show external tables * Use builder-style API to set ListingTable definition Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 7c04964 commit 0084aeb

File tree

9 files changed

+85
-8
lines changed

9 files changed

+85
-8
lines changed

datafusion-examples/examples/parquet_sql_multiple_files.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ async fn main() -> Result<()> {
4949
&format!("file://{}", testdata),
5050
listing_options,
5151
None,
52+
None,
5253
)
5354
.await
5455
.unwrap();

datafusion/core/src/datasource/listing/table.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use super::PartitionedFile;
4848

4949
use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
5050

51-
/// Configuration for creating a 'ListingTable'
51+
/// Configuration for creating a 'ListingTable'
5252
pub struct ListingTableConfig {
5353
/// Paths on the `ObjectStore` for creating `ListingTable`.
5454
/// They should share the same schema and object store.
@@ -246,6 +246,7 @@ pub struct ListingTable {
246246
/// File fields + partition columns
247247
table_schema: SchemaRef,
248248
options: ListingOptions,
249+
definition: Option<String>,
249250
}
250251

251252
impl ListingTable {
@@ -280,11 +281,18 @@ impl ListingTable {
280281
file_schema,
281282
table_schema: Arc::new(Schema::new(table_fields)),
282283
options,
284+
definition: None,
283285
};
284286

285287
Ok(table)
286288
}
287289

290+
/// Specify the SQL definition for this table, if any
291+
pub fn with_definition(mut self, defintion: Option<String>) -> Self {
292+
self.definition = defintion;
293+
self
294+
}
295+
288296
/// Get paths ref
289297
pub fn table_paths(&self) -> &Vec<ListingTableUrl> {
290298
&self.table_paths
@@ -358,6 +366,10 @@ impl TableProvider for ListingTable {
358366
Ok(TableProviderFilterPushDown::Inexact)
359367
}
360368
}
369+
370+
fn get_table_definition(&self) -> Option<&str> {
371+
self.definition.as_deref()
372+
}
361373
}
362374

363375
impl ListingTable {

datafusion/core/src/execution/context.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,7 @@ impl SessionContext {
502502
cmd.location.clone(),
503503
options,
504504
provided_schema,
505+
cmd.definition.clone(),
505506
)
506507
.await?;
507508
self.return_empty_dataframe()
@@ -720,6 +721,7 @@ impl SessionContext {
720721
table_path: impl AsRef<str>,
721722
options: ListingOptions,
722723
provided_schema: Option<SchemaRef>,
724+
sql: Option<String>,
723725
) -> Result<()> {
724726
let table_path = ListingTableUrl::parse(table_path)?;
725727
let resolved_schema = match provided_schema {
@@ -729,7 +731,7 @@ impl SessionContext {
729731
let config = ListingTableConfig::new(table_path)
730732
.with_listing_options(options)
731733
.with_schema(resolved_schema);
732-
let table = ListingTable::try_new(config)?;
734+
let table = ListingTable::try_new(config)?.with_definition(sql);
733735
self.register_table(name, Arc::new(table))?;
734736
Ok(())
735737
}
@@ -750,6 +752,7 @@ impl SessionContext {
750752
table_path,
751753
listing_options,
752754
options.schema.map(|s| Arc::new(s.to_owned())),
755+
None,
753756
)
754757
.await?;
755758

@@ -767,8 +770,14 @@ impl SessionContext {
767770
let listing_options =
768771
options.to_listing_options(self.copied_config().target_partitions);
769772

770-
self.register_listing_table(name, table_path, listing_options, options.schema)
771-
.await?;
773+
self.register_listing_table(
774+
name,
775+
table_path,
776+
listing_options,
777+
options.schema,
778+
None,
779+
)
780+
.await?;
772781
Ok(())
773782
}
774783

@@ -788,7 +797,7 @@ impl SessionContext {
788797
.parquet_pruning(parquet_pruning)
789798
.to_listing_options(target_partitions);
790799

791-
self.register_listing_table(name, table_path, listing_options, None)
800+
self.register_listing_table(name, table_path, listing_options, None, None)
792801
.await?;
793802
Ok(())
794803
}
@@ -804,8 +813,14 @@ impl SessionContext {
804813
let listing_options =
805814
options.to_listing_options(self.copied_config().target_partitions);
806815

807-
self.register_listing_table(name, table_path, listing_options, options.schema)
808-
.await?;
816+
self.register_listing_table(
817+
name,
818+
table_path,
819+
listing_options,
820+
options.schema,
821+
None,
822+
)
823+
.await?;
809824
Ok(())
810825
}
811826

datafusion/core/tests/sql/information_schema.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,29 @@ async fn show_create_table() {
605605
assert_batches_eq!(expected, &results);
606606
}
607607

608+
#[tokio::test]
609+
async fn show_external_create_table() {
610+
let ctx =
611+
SessionContext::with_config(SessionConfig::new().with_information_schema(true));
612+
613+
let table_sql =
614+
"CREATE EXTERNAL TABLE abc STORED AS CSV WITH HEADER ROW LOCATION '../../testing/data/csv/aggregate_test_100.csv'";
615+
plan_and_collect(&ctx, table_sql).await.unwrap();
616+
617+
let result_sql = "SHOW CREATE TABLE abc";
618+
let results = plan_and_collect(&ctx, result_sql).await.unwrap();
619+
620+
let expected = vec![
621+
"+---------------+--------------+------------+-------------------------------------------------------------------------------------------------+",
622+
"| table_catalog | table_schema | table_name | definition |",
623+
"+---------------+--------------+------------+-------------------------------------------------------------------------------------------------+",
624+
"| datafusion | public | abc | CREATE EXTERNAL TABLE abc STORED AS CSV LOCATION ../../testing/data/csv/aggregate_test_100.csv |",
625+
"+---------------+--------------+------------+-------------------------------------------------------------------------------------------------+",
626+
];
627+
628+
assert_batches_eq!(expected, &results);
629+
}
630+
608631
/// Execute SQL and return results
609632
async fn plan_and_collect(ctx: &SessionContext, sql: &str) -> Result<Vec<RecordBatch>> {
610633
ctx.sql(sql).await?.collect().await

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1255,6 +1255,8 @@ pub struct CreateExternalTable {
12551255
pub table_partition_cols: Vec<String>,
12561256
/// Option to not error if table already exists
12571257
pub if_not_exists: bool,
1258+
/// SQL used to create the table, if available
1259+
pub definition: Option<String>,
12581260
}
12591261

12601262
/// Produces a relation with string representations of

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ message CreateExternalTableNode {
153153
repeated string table_partition_cols = 6;
154154
bool if_not_exists = 7;
155155
string delimiter = 8;
156+
string definition = 9;
156157
}
157158

158159
message CreateCatalogSchemaNode {

datafusion/proto/src/logical_plan.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,12 @@ impl AsLogicalPlan for LogicalPlanNode {
461461
))
462462
})?;
463463

464+
let definition = if !create_extern_table.definition.is_empty() {
465+
Some(create_extern_table.definition.clone())
466+
} else {
467+
None
468+
};
469+
464470
match create_extern_table.file_type.as_str() {
465471
"CSV" | "JSON" | "PARQUET" | "AVRO" => {}
466472
it => {
@@ -486,6 +492,7 @@ impl AsLogicalPlan for LogicalPlanNode {
486492
.table_partition_cols
487493
.clone(),
488494
if_not_exists: create_extern_table.if_not_exists,
495+
definition,
489496
}))
490497
}
491498
LogicalPlanType::CreateView(create_view) => {
@@ -1030,6 +1037,7 @@ impl AsLogicalPlan for LogicalPlanNode {
10301037
schema: df_schema,
10311038
table_partition_cols,
10321039
if_not_exists,
1040+
definition,
10331041
}) => Ok(protobuf::LogicalPlanNode {
10341042
logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
10351043
protobuf::CreateExternalTableNode {
@@ -1041,6 +1049,7 @@ impl AsLogicalPlan for LogicalPlanNode {
10411049
table_partition_cols: table_partition_cols.clone(),
10421050
if_not_exists: *if_not_exists,
10431051
delimiter: String::from(*delimiter),
1052+
definition: definition.clone().unwrap_or_else(|| "".to_string()),
10441053
},
10451054
)),
10461055
}),

datafusion/sql/src/parser.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use sqlparser::{
2525
parser::{Parser, ParserError},
2626
tokenizer::{Token, Tokenizer},
2727
};
28-
use std::collections::VecDeque;
28+
use std::{collections::VecDeque, fmt};
2929

3030
// Use `Parser::expected` instead, if possible
3131
macro_rules! parser_err {
@@ -59,6 +59,18 @@ pub struct CreateExternalTable {
5959
pub if_not_exists: bool,
6060
}
6161

62+
impl fmt::Display for CreateExternalTable {
63+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64+
write!(f, "CREATE EXTERNAL TABLE ")?;
65+
if self.if_not_exists {
66+
write!(f, "IF NOT EXSISTS ")?;
67+
}
68+
write!(f, "{} ", self.name)?;
69+
write!(f, "STORED AS {} ", self.file_type)?;
70+
write!(f, "LOCATION {} ", self.location)
71+
}
72+
}
73+
6274
/// DataFusion extension DDL for `DESCRIBE TABLE`
6375
#[derive(Debug, Clone, PartialEq, Eq)]
6476
pub struct DescribeTable {

datafusion/sql/src/planner.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
452452
&self,
453453
statement: CreateExternalTable,
454454
) -> Result<LogicalPlan> {
455+
let definition = Some(statement.to_string());
455456
let CreateExternalTable {
456457
name,
457458
columns,
@@ -481,6 +482,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
481482
delimiter,
482483
table_partition_cols,
483484
if_not_exists,
485+
definition,
484486
}))
485487
}
486488

0 commit comments

Comments
 (0)