Skip to content

feat: add SchemaProvider::table_type(table_name: &str) #16401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions datafusion/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,14 @@ impl InformationSchemaConfig {
// schema name may not exist in the catalog, so we need to check
if let Some(schema) = catalog.schema(&schema_name) {
for table_name in schema.table_names() {
if let Some(table) = schema.table(&table_name).await? {
if let Some(table_type) =
schema.table_type(&table_name).await?
{
builder.add_table(
&catalog_name,
&schema_name,
&table_name,
table.table_type(),
table_type,
);
}
}
Expand Down Expand Up @@ -1359,3 +1361,6 @@ impl PartitionStream for InformationSchemaParameters {
))
}
}

#[cfg(test)]
mod tests;
88 changes: 88 additions & 0 deletions datafusion/catalog/src/information_schema/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::sync::Arc;

use crate::CatalogProvider;

use super::*;

#[tokio::test]
async fn make_tables_uses_table_type() {
let config = InformationSchemaConfig {
catalog_list: Arc::new(Fixture),
};
let mut builder = InformationSchemaTablesBuilder {
catalog_names: StringBuilder::new(),
schema_names: StringBuilder::new(),
table_names: StringBuilder::new(),
table_types: StringBuilder::new(),
schema: Arc::new(Schema::empty()),
};

assert!(config.make_tables(&mut builder).await.is_ok());

assert_eq!("BASE TABLE", builder.table_types.finish().value(0));
}

#[derive(Debug)]
struct Fixture;

#[async_trait]
impl SchemaProvider for Fixture {
// InformationSchemaConfig::make_tables should use this.
async fn table_type(&self, _: &str) -> Result<Option<TableType>> {
Ok(Some(TableType::Base))
}

// InformationSchemaConfig::make_tables used this before `table_type`
// existed but should not, as it may be expensive.
async fn table(&self, _: &str) -> Result<Option<Arc<dyn TableProvider>>> {
panic!("InformationSchemaConfig::make_tables called SchemaProvider::table instead of table_type")
}

fn as_any(&self) -> &dyn Any {
unimplemented!("not required for these tests")
}

fn table_names(&self) -> Vec<String> {
vec!["atable".to_string()]
}

fn table_exist(&self, _: &str) -> bool {
unimplemented!("not required for these tests")
}
}

impl CatalogProviderList for Fixture {
fn as_any(&self) -> &dyn Any {
unimplemented!("not required for these tests")
}

fn register_catalog(
&self,
_: String,
_: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
unimplemented!("not required for these tests")
}

fn catalog_names(&self) -> Vec<String> {
vec!["acatalog".to_string()]
}

fn catalog(&self, _: &str) -> Option<Arc<dyn CatalogProvider>> {
Some(Arc::new(Self))
}
}

impl CatalogProvider for Fixture {
fn as_any(&self) -> &dyn Any {
unimplemented!("not required for these tests")
}

fn schema_names(&self) -> Vec<String> {
vec!["aschema".to_string()]
}

fn schema(&self, _: &str) -> Option<Arc<dyn SchemaProvider>> {
Some(Arc::new(Self))
}
}
9 changes: 9 additions & 0 deletions datafusion/catalog/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::Arc;

use crate::table::TableProvider;
use datafusion_common::Result;
use datafusion_expr::TableType;

/// Represents a schema, comprising a number of named tables.
///
Expand Down Expand Up @@ -54,6 +55,14 @@ pub trait SchemaProvider: Debug + Sync + Send {
name: &str,
) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError>;

/// Retrieves the type of a specific table from the schema by name, if it exists, otherwise
/// returns `None`. Implementations for which this operation is cheap but [Self::table] is
/// expensive can override this to improve operations that only need the type, e.g.
/// `SELECT * FROM information_schema.tables`.
async fn table_type(&self, name: &str) -> Result<Option<TableType>> {
self.table(name).await.map(|o| o.map(|t| t.table_type()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any way to avoid nested map? perhaps flat_map or and_then ? would me more readable IMHO

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to and_then.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, clippy suggested what I had to begin with:

warning: using `Result.and_then(|x| Ok(y))`, which is more succinctly expressed as `map(|x| y)`
  --> datafusion/catalog/src/schema.rs:63:9
   |
63 | /         self.table(name)
64 | |             .await
65 | |             .and_then(|o| Ok(o.map(|t| t.table_type())))
   | |________________________________________________________^
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bind_instead_of_map
   = note: `#[warn(clippy::bind_instead_of_map)]` on by default
help: try
   |
63 ~         self.table(name)
64 +             .await.map(|o| o.map(|t| t.table_type()))

So I changed it back.

}

/// If supported by the implementation, adds a new table named `name` to
/// this schema.
///
Expand Down
Loading