Skip to content

Commit ab00bc1

Browse files
authored
[Feature] support describe file (#4995)
To know the columns/types in a file, users has to create an external table, and describe the table. Sometimes the infer schema is wrong for creating table, to make it right, user need to drop the table, and recreate a table with the specify schema. To solve this problem, we add describe file interface in datafusion-clie, With the Describe File, user can know the infer schema is wrong before creating the table. Syntax: Describe file_path, Example: DESCRIBE 'tests/data/aggregate_simple_pipe.csv'; Return: column_name data_type is_nullable c1 Float32 NO c2 Float64 NO c3 Boolean NO Signed-off-by: xyz <a997647204@gmail.com> Signed-off-by: xyz <a997647204@gmail.com>
1 parent 9f498bb commit ab00bc1

File tree

13 files changed

+193
-47
lines changed

13 files changed

+193
-47
lines changed

datafusion-cli/src/command.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub enum Command {
3939
Quit,
4040
Help,
4141
ListTables,
42-
DescribeTable(String),
42+
DescribeTableStmt(String),
4343
ListFunctions,
4444
Include(Option<String>),
4545
SearchFunctions(String),
@@ -65,7 +65,7 @@ impl Command {
6565
let batches = df.collect().await?;
6666
print_options.print_batches(&batches, now)
6767
}
68-
Self::DescribeTable(name) => {
68+
Self::DescribeTableStmt(name) => {
6969
let df = ctx.sql(&format!("SHOW COLUMNS FROM {}", name)).await?;
7070
let batches = df.collect().await?;
7171
print_options.print_batches(&batches, now)
@@ -125,7 +125,7 @@ impl Command {
125125
match self {
126126
Self::Quit => ("\\q", "quit datafusion-cli"),
127127
Self::ListTables => ("\\d", "list tables"),
128-
Self::DescribeTable(_) => ("\\d name", "describe table"),
128+
Self::DescribeTableStmt(_) => ("\\d name", "describe table"),
129129
Self::Help => ("\\?", "help"),
130130
Self::Include(_) => {
131131
("\\i filename", "reads input from the specified filename")
@@ -142,7 +142,7 @@ impl Command {
142142

143143
const ALL_COMMANDS: [Command; 9] = [
144144
Command::ListTables,
145-
Command::DescribeTable(String::new()),
145+
Command::DescribeTableStmt(String::new()),
146146
Command::Quit,
147147
Command::Help,
148148
Command::Include(Some(String::new())),
@@ -183,7 +183,7 @@ impl FromStr for Command {
183183
Ok(match (c, arg) {
184184
("q", None) => Self::Quit,
185185
("d", None) => Self::ListTables,
186-
("d", Some(name)) => Self::DescribeTable(name.into()),
186+
("d", Some(name)) => Self::DescribeTableStmt(name.into()),
187187
("?", None) => Self::Help,
188188
("h", None) => Self::ListFunctions,
189189
("h", Some(function)) => Self::SearchFunctions(function.into()),

datafusion/core/src/execution/context.rs

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::{
2727
optimizer::PhysicalOptimizerRule,
2828
},
2929
};
30+
use datafusion_expr::DescribeTable;
3031
pub use datafusion_physical_expr::execution_props::ExecutionProps;
3132
use datafusion_physical_expr::var_provider::is_system_variables;
3233
use parking_lot::RwLock;
@@ -42,8 +43,11 @@ use std::{
4243
};
4344
use std::{ops::ControlFlow, sync::Weak};
4445

45-
use arrow::datatypes::{DataType, SchemaRef};
4646
use arrow::record_batch::RecordBatch;
47+
use arrow::{
48+
array::StringBuilder,
49+
datatypes::{DataType, Field, Schema, SchemaRef},
50+
};
4751

4852
use crate::catalog::{
4953
catalog::{CatalogProvider, MemoryCatalogProvider},
@@ -360,6 +364,10 @@ impl SessionContext {
360364
self.return_empty_dataframe()
361365
}
362366

367+
LogicalPlan::DescribeTable(DescribeTable { schema, .. }) => {
368+
self.return_describe_table_dataframe(schema).await
369+
}
370+
363371
LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
364372
schema_name,
365373
if_not_exists,
@@ -442,6 +450,53 @@ impl SessionContext {
442450
Ok(DataFrame::new(self.state(), plan))
443451
}
444452

453+
// return an record_batch which describe table
454+
async fn return_describe_table_record_batch(
455+
&self,
456+
schema: Arc<Schema>,
457+
) -> Result<RecordBatch> {
458+
let record_batch_schema = Arc::new(Schema::new(vec![
459+
Field::new("column_name", DataType::Utf8, false),
460+
Field::new("data_type", DataType::Utf8, false),
461+
Field::new("is_nullable", DataType::Utf8, false),
462+
]));
463+
464+
let mut column_names = StringBuilder::new();
465+
let mut data_types = StringBuilder::new();
466+
let mut is_nullables = StringBuilder::new();
467+
for (_, field) in schema.fields().iter().enumerate() {
468+
column_names.append_value(field.name());
469+
470+
// "System supplied type" --> Use debug format of the datatype
471+
let data_type = field.data_type();
472+
data_types.append_value(format!("{data_type:?}"));
473+
474+
// "YES if the column is possibly nullable, NO if it is known not nullable. "
475+
let nullable_str = if field.is_nullable() { "YES" } else { "NO" };
476+
is_nullables.append_value(nullable_str);
477+
}
478+
479+
let record_batch = RecordBatch::try_new(
480+
record_batch_schema,
481+
vec![
482+
Arc::new(column_names.finish()),
483+
Arc::new(data_types.finish()),
484+
Arc::new(is_nullables.finish()),
485+
],
486+
)?;
487+
488+
Ok(record_batch)
489+
}
490+
491+
// return an dataframe which describe file
492+
async fn return_describe_table_dataframe(
493+
&self,
494+
schema: Arc<Schema>,
495+
) -> Result<DataFrame> {
496+
let record_batch = self.return_describe_table_record_batch(schema).await?;
497+
self.read_batch(record_batch)
498+
}
499+
445500
async fn create_external_table(
446501
&self,
447502
cmd: &CreateExternalTable,
@@ -1719,7 +1774,7 @@ impl SessionState {
17191774
DFStatement::CreateExternalTable(table) => {
17201775
relations.insert(ObjectName(vec![Ident::from(table.name.as_str())]));
17211776
}
1722-
DFStatement::DescribeTable(table) => {
1777+
DFStatement::DescribeTableStmt(table) => {
17231778
relations
17241779
.get_or_insert_with(&table.table_name, |_| table.table_name.clone());
17251780
}
@@ -2058,7 +2113,6 @@ mod tests {
20582113
use crate::test_util::parquet_test_data;
20592114
use crate::variable::VarType;
20602115
use arrow::array::ArrayRef;
2061-
use arrow::datatypes::*;
20622116
use arrow::record_batch::RecordBatch;
20632117
use async_trait::async_trait;
20642118
use datafusion_expr::{create_udaf, create_udf, Expr, Volatility};

datafusion/core/src/physical_plan/planner.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1191,6 +1191,11 @@ impl DefaultPhysicalPlanner {
11911191
"Unsupported logical plan: SetVariable must be root of the plan".to_string(),
11921192
))
11931193
}
1194+
LogicalPlan::DescribeTable(_) => {
1195+
Err(DataFusionError::Internal(
1196+
"Unsupported logical plan: DescribeTable must be root of the plan".to_string(),
1197+
))
1198+
}
11941199
LogicalPlan::Explain(_) => Err(DataFusionError::Internal(
11951200
"Unsupported logical plan: Explain must be root of the plan".to_string(),
11961201
)),
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
19+
##########
20+
# Describe internal tables when information_schema is true
21+
##########
22+
23+
statement ok
24+
set datafusion.catalog.information_schema = true
25+
26+
statement ok
27+
CREATE external table aggregate_simple(c1 real, c2 double, c3 boolean) STORED as CSV WITH HEADER ROW LOCATION 'tests/data/aggregate_simple.csv';
28+
29+
query C1
30+
DESCRIBE aggregate_simple;
31+
----
32+
c1 Float32 NO
33+
c2 Float64 NO
34+
c3 Boolean NO
35+
36+
statement ok
37+
DROP TABLE aggregate_simple;
38+
39+
##########
40+
# Describe internal tables when information_schema is false
41+
##########
42+
43+
statement ok
44+
set datafusion.catalog.information_schema = false
45+
46+
statement ok
47+
CREATE external table aggregate_simple(c1 real, c2 double, c3 boolean) STORED as CSV WITH HEADER ROW LOCATION 'tests/data/aggregate_simple.csv';
48+
49+
query C2
50+
DESCRIBE aggregate_simple;
51+
----
52+
c1 Float32 NO
53+
c2 Float64 NO
54+
c3 Boolean NO
55+
56+
statement ok
57+
DROP TABLE aggregate_simple;
58+
59+
##########
60+
# Describe file (currently we can only describe file in datafusion-cli, fix this after issue (#4850) has been done)
61+
##########
62+
63+
statement error Error during planning: table 'datafusion.public.tests/data/aggregate_simple.csv' not found
64+
DESCRIBE 'tests/data/aggregate_simple.csv';

datafusion/expr/src/lib.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,12 @@ pub use logical_plan::{
7171
build_join_schema, union, wrap_projection_for_join_if_necessary, UNNAMED_TABLE,
7272
},
7373
Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
74-
CreateMemoryTable, CreateView, CrossJoin, Distinct, DmlStatement, DropTable,
75-
DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType,
76-
Limit, LogicalPlan, LogicalPlanBuilder, Partitioning, PlanType, PlanVisitor,
77-
Projection, Repartition, SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias,
78-
TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values, Window, WriteOp,
74+
CreateMemoryTable, CreateView, CrossJoin, DescribeTable, Distinct, DmlStatement,
75+
DropTable, DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint,
76+
JoinType, Limit, LogicalPlan, LogicalPlanBuilder, Partitioning, PlanType,
77+
PlanVisitor, Projection, Repartition, SetVariable, Sort, StringifiedPlan, Subquery,
78+
SubqueryAlias, TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values,
79+
Window, WriteOp,
7980
};
8081
pub use nullif::SUPPORTED_NULLIF_TYPES;
8182
pub use operator::Operator;

datafusion/expr/src/logical_plan/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ mod plan;
2323
pub use builder::{table_scan, LogicalPlanBuilder};
2424
pub use plan::{
2525
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
26-
CreateMemoryTable, CreateView, CrossJoin, Distinct, DmlStatement, DropTable,
27-
DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType,
28-
Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Prepare, Projection,
29-
Repartition, SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
30-
ToStringifiedPlan, Union, Values, Window, WriteOp,
26+
CreateMemoryTable, CreateView, CrossJoin, DescribeTable, Distinct, DmlStatement,
27+
DropTable, DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint,
28+
JoinType, Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Prepare,
29+
Projection, Repartition, SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias,
30+
TableScan, ToStringifiedPlan, Union, Values, Window, WriteOp,
3131
};
3232

3333
pub use display::display_schema;

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ pub enum LogicalPlan {
121121
Prepare(Prepare),
122122
/// Insert / Update / Delete
123123
Dml(DmlStatement),
124+
/// Describe the schema of table
125+
DescribeTable(DescribeTable),
124126
}
125127

126128
impl LogicalPlan {
@@ -161,6 +163,9 @@ impl LogicalPlan {
161163
LogicalPlan::DropTable(DropTable { schema, .. }) => schema,
162164
LogicalPlan::DropView(DropView { schema, .. }) => schema,
163165
LogicalPlan::SetVariable(SetVariable { schema, .. }) => schema,
166+
LogicalPlan::DescribeTable(DescribeTable { dummy_schema, .. }) => {
167+
dummy_schema
168+
}
164169
LogicalPlan::Dml(DmlStatement { table_schema, .. }) => table_schema,
165170
}
166171
}
@@ -221,6 +226,7 @@ impl LogicalPlan {
221226
| LogicalPlan::Prepare(Prepare { input, .. }) => input.all_schemas(),
222227
LogicalPlan::DropTable(_)
223228
| LogicalPlan::DropView(_)
229+
| LogicalPlan::DescribeTable(_)
224230
| LogicalPlan::SetVariable(_) => vec![],
225231
LogicalPlan::Dml(DmlStatement { table_schema, .. }) => vec![table_schema],
226232
}
@@ -322,6 +328,7 @@ impl LogicalPlan {
322328
| LogicalPlan::Union(_)
323329
| LogicalPlan::Distinct(_)
324330
| LogicalPlan::Dml(_)
331+
| LogicalPlan::DescribeTable(_)
325332
| LogicalPlan::Prepare(_) => Ok(()),
326333
}
327334
}
@@ -363,7 +370,8 @@ impl LogicalPlan {
363370
| LogicalPlan::CreateCatalog(_)
364371
| LogicalPlan::DropTable(_)
365372
| LogicalPlan::SetVariable(_)
366-
| LogicalPlan::DropView(_) => vec![],
373+
| LogicalPlan::DropView(_)
374+
| LogicalPlan::DescribeTable(_) => vec![],
367375
}
368376
}
369377

@@ -561,7 +569,8 @@ impl LogicalPlan {
561569
| LogicalPlan::CreateCatalog(_)
562570
| LogicalPlan::DropTable(_)
563571
| LogicalPlan::SetVariable(_)
564-
| LogicalPlan::DropView(_) => true,
572+
| LogicalPlan::DropView(_)
573+
| LogicalPlan::DescribeTable(_) => true,
565574
};
566575
if !recurse {
567576
return Ok(false);
@@ -1170,6 +1179,9 @@ impl LogicalPlan {
11701179
}) => {
11711180
write!(f, "Prepare: {name:?} {data_types:?} ")
11721181
}
1182+
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
1183+
write!(f, "DescribeTable")
1184+
}
11731185
}
11741186
}
11751187
}
@@ -1625,6 +1637,15 @@ pub struct Prepare {
16251637
pub input: Arc<LogicalPlan>,
16261638
}
16271639

1640+
/// Describe the schema of table
1641+
#[derive(Clone)]
1642+
pub struct DescribeTable {
1643+
/// Table schema
1644+
pub schema: Arc<Schema>,
1645+
/// Dummy schema
1646+
pub dummy_schema: DFSchemaRef,
1647+
}
1648+
16281649
/// Produces a relation with string representations of
16291650
/// various parts of the plan
16301651
#[derive(Clone)]

datafusion/expr/src/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -738,6 +738,7 @@ pub fn from_plan(
738738
assert!(inputs.is_empty(), "{plan:?} should have no inputs");
739739
Ok(plan.clone())
740740
}
741+
LogicalPlan::DescribeTable(_) => Ok(plan.clone()),
741742
}
742743
}
743744

datafusion/optimizer/src/common_subexpr_eliminate.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ impl OptimizerRule for CommonSubexprEliminate {
237237
| LogicalPlan::DropTable(_)
238238
| LogicalPlan::DropView(_)
239239
| LogicalPlan::SetVariable(_)
240+
| LogicalPlan::DescribeTable(_)
240241
| LogicalPlan::Distinct(_)
241242
| LogicalPlan::Extension(_)
242243
| LogicalPlan::Dml(_)

datafusion/optimizer/src/push_down_projection.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ fn optimize_plan(
409409
| LogicalPlan::DropTable(_)
410410
| LogicalPlan::DropView(_)
411411
| LogicalPlan::SetVariable(_)
412+
| LogicalPlan::DescribeTable(_)
412413
| LogicalPlan::CrossJoin(_)
413414
| LogicalPlan::Dml(_)
414415
| LogicalPlan::Extension { .. }

0 commit comments

Comments
 (0)