Skip to content

Commit 34475bb

Browse files
authored
Dynamic information_schema configuration and port more tests (#4722)
* Dynamic information_schema configuration and port more tests * sort rows
1 parent fe3f018 commit 34475bb

File tree

5 files changed

+146
-104
lines changed

5 files changed

+146
-104
lines changed

datafusion/core/src/catalog/information_schema.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ impl CatalogWithInformationSchema {
6969
inner,
7070
}
7171
}
72+
73+
/// Return a reference to the wrapped provider
74+
pub(crate) fn inner(&self) -> Arc<dyn CatalogProvider> {
75+
self.inner.clone()
76+
}
7277
}
7378

7479
impl CatalogProvider for CatalogWithInformationSchema {

datafusion/core/src/execution/context.rs

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,10 @@ impl SessionContext {
421421
))
422422
}
423423
}
424+
// Since information_schema config may have changed, revalidate
425+
if variable == OPT_INFORMATION_SCHEMA {
426+
state.update_information_schema();
427+
}
424428
drop(state);
425429

426430
self.return_empty_dataframe()
@@ -1546,17 +1550,10 @@ impl SessionState {
15461550

15471551
Self::register_default_schema(&config, &runtime, &default_catalog);
15481552

1549-
let default_catalog: Arc<dyn CatalogProvider> = if config.information_schema()
1550-
{
1551-
Arc::new(CatalogWithInformationSchema::new(
1552-
Arc::downgrade(&catalog_list),
1553-
Arc::new(default_catalog),
1554-
))
1555-
} else {
1556-
Arc::new(default_catalog)
1557-
};
1558-
catalog_list
1559-
.register_catalog(config.default_catalog.clone(), default_catalog);
1553+
catalog_list.register_catalog(
1554+
config.default_catalog.clone(),
1555+
Arc::new(default_catalog),
1556+
);
15601557
}
15611558

15621559
let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
@@ -1583,7 +1580,7 @@ impl SessionState {
15831580
// To make sure the SinglePartition is satisfied, run the BasicEnforcement again, originally it was the AddCoalescePartitionsExec here.
15841581
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
15851582

1586-
SessionState {
1583+
let mut this = SessionState {
15871584
session_id,
15881585
optimizer: Optimizer::new(),
15891586
physical_optimizers,
@@ -1594,6 +1591,60 @@ impl SessionState {
15941591
config,
15951592
execution_props: ExecutionProps::new(),
15961593
runtime_env: runtime,
1594+
};
1595+
this.update_information_schema();
1596+
this
1597+
}
1598+
1599+
/// Enables/Disables information_schema support based on the value of
1600+
/// config.information_schema()
1601+
///
1602+
/// When enabled, all catalog providers are wrapped with
1603+
/// [`CatalogWithInformationSchema`] if needed
1604+
///
1605+
/// When disabled, any [`CatalogWithInformationSchema`] is unwrapped
1606+
fn update_information_schema(&mut self) {
1607+
let enabled = self.config.information_schema();
1608+
let catalog_list = &self.catalog_list;
1609+
1610+
let new_catalogs: Vec<_> = self
1611+
.catalog_list
1612+
.catalog_names()
1613+
.into_iter()
1614+
.map(|catalog_name| {
1615+
// unwrap because the list of names came from catalog
1616+
// list so it should still be there
1617+
let catalog = catalog_list.catalog(&catalog_name).unwrap();
1618+
1619+
let unwrapped = catalog
1620+
.as_any()
1621+
.downcast_ref::<CatalogWithInformationSchema>()
1622+
.map(|wrapped| wrapped.inner());
1623+
1624+
let new_catalog = match (enabled, unwrapped) {
1625+
// already wrapped, no thing needed
1626+
(true, Some(_)) => catalog,
1627+
(true, None) => {
1628+
// wrap the catalog in information schema
1629+
Arc::new(CatalogWithInformationSchema::new(
1630+
Arc::downgrade(catalog_list),
1631+
catalog,
1632+
))
1633+
}
1634+
// disabling, currently wrapped
1635+
(false, Some(unwrapped)) => unwrapped,
1636+
// disabling, currently unwrapped
1637+
(false, None) => catalog,
1638+
};
1639+
1640+
(catalog_name, new_catalog)
1641+
})
1642+
// collect to avoid concurrent modification
1643+
.collect();
1644+
1645+
// replace all catalogs
1646+
for (catalog_name, new_catalog) in new_catalogs {
1647+
catalog_list.register_catalog(catalog_name, new_catalog);
15971648
}
15981649
}
15991650

datafusion/core/tests/sql/information_schema.rs

Lines changed: 0 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -30,91 +30,6 @@ use rstest::rstest;
3030

3131
use super::*;
3232

33-
#[tokio::test]
34-
async fn information_schema_tables_not_exist_by_default() {
35-
let ctx = SessionContext::new();
36-
37-
let err = plan_and_collect(&ctx, "SELECT * from information_schema.tables")
38-
.await
39-
.unwrap_err();
40-
assert_eq!(
41-
err.to_string(),
42-
// Error propagates from SessionState::schema_for_ref
43-
"Error during planning: failed to resolve schema: information_schema"
44-
);
45-
}
46-
47-
#[tokio::test]
48-
async fn information_schema_tables_no_tables() {
49-
let ctx =
50-
SessionContext::with_config(SessionConfig::new().with_information_schema(true));
51-
52-
let result = plan_and_collect(&ctx, "SELECT * from information_schema.tables")
53-
.await
54-
.unwrap();
55-
56-
let expected = vec![
57-
"+---------------+--------------------+-------------+------------+",
58-
"| table_catalog | table_schema | table_name | table_type |",
59-
"+---------------+--------------------+-------------+------------+",
60-
"| datafusion | information_schema | columns | VIEW |",
61-
"| datafusion | information_schema | df_settings | VIEW |",
62-
"| datafusion | information_schema | tables | VIEW |",
63-
"| datafusion | information_schema | views | VIEW |",
64-
"+---------------+--------------------+-------------+------------+",
65-
];
66-
assert_batches_sorted_eq!(expected, &result);
67-
}
68-
69-
#[tokio::test]
70-
async fn information_schema_tables_tables_default_catalog() {
71-
let ctx =
72-
SessionContext::with_config(SessionConfig::new().with_information_schema(true));
73-
74-
// Now, register an empty table
75-
ctx.register_table("t", table_with_sequence(1, 1).unwrap())
76-
.unwrap();
77-
78-
let result = plan_and_collect(&ctx, "SELECT * from information_schema.tables")
79-
.await
80-
.unwrap();
81-
82-
let expected = vec![
83-
"+---------------+--------------------+-------------+------------+",
84-
"| table_catalog | table_schema | table_name | table_type |",
85-
"+---------------+--------------------+-------------+------------+",
86-
"| datafusion | information_schema | columns | VIEW |",
87-
"| datafusion | information_schema | df_settings | VIEW |",
88-
"| datafusion | information_schema | tables | VIEW |",
89-
"| datafusion | information_schema | views | VIEW |",
90-
"| datafusion | public | t | BASE TABLE |",
91-
"+---------------+--------------------+-------------+------------+",
92-
];
93-
assert_batches_sorted_eq!(expected, &result);
94-
95-
// Newly added tables should appear
96-
ctx.register_table("t2", table_with_sequence(1, 1).unwrap())
97-
.unwrap();
98-
99-
let result = plan_and_collect(&ctx, "SELECT * from information_schema.tables")
100-
.await
101-
.unwrap();
102-
103-
let expected = vec![
104-
"+---------------+--------------------+-------------+------------+",
105-
"| table_catalog | table_schema | table_name | table_type |",
106-
"+---------------+--------------------+-------------+------------+",
107-
"| datafusion | information_schema | columns | VIEW |",
108-
"| datafusion | information_schema | df_settings | VIEW |",
109-
"| datafusion | information_schema | tables | VIEW |",
110-
"| datafusion | information_schema | views | VIEW |",
111-
"| datafusion | public | t | BASE TABLE |",
112-
"| datafusion | public | t2 | BASE TABLE |",
113-
"+---------------+--------------------+-------------+------------+",
114-
];
115-
assert_batches_sorted_eq!(expected, &result);
116-
}
117-
11833
#[tokio::test]
11934
async fn information_schema_tables_tables_with_multiple_catalogs() {
12035
let ctx =

datafusion/core/tests/sqllogictests/src/main.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use async_trait::async_trait;
1919
use datafusion::arrow::record_batch::RecordBatch;
20-
use datafusion::prelude::{SessionConfig, SessionContext};
20+
use datafusion::prelude::SessionContext;
2121
use datafusion_sql::parser::{DFParser, Statement};
2222
use log::info;
2323
use normalize::convert_batches;
@@ -130,12 +130,6 @@ async fn context_for_test_file(file_name: &str) -> SessionContext {
130130
setup::register_aggregate_tables(&ctx).await;
131131
ctx
132132
}
133-
"information_schema.slt" => {
134-
info!("Enabling information schema");
135-
SessionContext::with_config(
136-
SessionConfig::new().with_information_schema(true),
137-
)
138-
}
139133
_ => {
140134
info!("Using default SessionContext");
141135
SessionContext::new()

datafusion/core/tests/sqllogictests/test_files/information_schema.slt

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,83 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18+
19+
# Verify the information schema does not exit by default
20+
statement error Error during planning: failed to resolve schema: information_schema
21+
SELECT * from information_schema.tables
22+
23+
statement error DataFusion error: Error during planning: SHOW \[VARIABLE\] is not supported unless information_schema is enabled
24+
show all
25+
26+
# Turn it on
27+
28+
# expect that the queries now work
29+
statement ok
30+
set datafusion.catalog.information_schema = true;
31+
32+
# Verify the information schema now does exist and is empty
33+
query CCC rowsort
34+
SELECT * from information_schema.tables;
35+
----
36+
datafusion information_schema columns VIEW
37+
datafusion information_schema df_settings VIEW
38+
datafusion information_schema tables VIEW
39+
datafusion information_schema views VIEW
40+
41+
# Disable information_schema and verify it now errors again
42+
statement ok
43+
set datafusion.catalog.information_schema = false
44+
45+
statement error Error during planning: failed to resolve schema: information_schema
46+
SELECT * from information_schema.tables
47+
48+
49+
############
50+
## Enable information schema for the rest of the test
51+
############
52+
statement ok
53+
set datafusion.catalog.information_schema = true
54+
55+
############
56+
# New tables should show up in information schema
57+
###########
58+
statement ok
59+
create table t as values (1);
60+
61+
query CCC rowsort
62+
SELECT * from information_schema.tables;
63+
----
64+
datafusion information_schema columns VIEW
65+
datafusion information_schema df_settings VIEW
66+
datafusion information_schema tables VIEW
67+
datafusion information_schema views VIEW
68+
datafusion public t BASE TABLE
69+
70+
# Another new table should show up in information schema
71+
statement ok
72+
create table t2 as values (1);
73+
74+
query CCC rowsort
75+
SELECT * from information_schema.tables;
76+
----
77+
datafusion information_schema columns VIEW
78+
datafusion information_schema df_settings VIEW
79+
datafusion information_schema tables VIEW
80+
datafusion information_schema views VIEW
81+
datafusion public t BASE TABLE
82+
datafusion public t2 BASE TABLE
83+
84+
# Cleanup
85+
statement ok
86+
drop table t
87+
88+
statement ok
89+
drop table t2
90+
91+
############
92+
## SHOW VARIABLES should work
93+
###########
94+
1895
# target_partitions defaults to num_cores, so set
1996
# to a known value that is unlikely to be
2097
# the real number of cores on a system

0 commit comments

Comments
 (0)