diff --git a/datafusion/core/src/catalog/listing_schema.rs b/datafusion/core/src/catalog/listing_schema.rs new file mode 100644 index 0000000000000..f4ec851632f71 --- /dev/null +++ b/datafusion/core/src/catalog/listing_schema.rs @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! listing_schema contains a SchemaProvider that scans ObjectStores for tables automatically +use crate::catalog::schema::SchemaProvider; +use crate::datasource::datasource::TableProviderFactory; +use crate::datasource::TableProvider; +use datafusion_common::DataFusionError; +use futures::TryStreamExt; +use object_store::ObjectStore; +use std::any::Any; +use std::collections::{HashMap, HashSet}; +use std::path::Path; +use std::sync::{Arc, Mutex}; + +/// A `SchemaProvider` that scans an `ObjectStore` to automatically discover tables +/// +/// A subfolder relationship is assumed, i.e. given: +/// authority = s3://host.example.com:3000 +/// path = /data/tpch +/// factory = `DeltaTableFactory` +/// +/// A table called "customer" will be registered for the folder: +/// s3://host.example.com:3000/data/tpch/customer +/// +/// assuming it contains valid deltalake data, i.e: +/// s3://host.example.com:3000/data/tpch/customer/part-00000-xxxx.snappy.parquet +/// s3://host.example.com:3000/data/tpch/customer/_delta_log/ +pub struct ListingSchemaProvider { + authority: String, + path: object_store::path::Path, + factory: Arc, + store: Arc, + tables: Arc>>>, +} + +impl ListingSchemaProvider { + /// Create a new `ListingSchemaProvider` + /// + /// Arguments: + /// `authority`: The scheme (i.e. s3://) + host (i.e. example.com:3000) + /// `path`: The root path that contains subfolders which represent tables + /// `factory`: The `TableProviderFactory` to use to instantiate tables for each subfolder + /// `store`: The `ObjectStore` containing the table data + pub fn new( + authority: String, + path: object_store::path::Path, + factory: Arc, + store: Arc, + ) -> Self { + Self { + authority, + path, + factory, + store, + tables: Arc::new(Mutex::new(HashMap::new())), + } + } + + /// Reload table information from ObjectStore + pub async fn refresh(&self) -> datafusion_common::Result<()> { + let entries: Vec<_> = self + .store + .list(Some(&self.path)) + .await? + .try_collect() + .await?; + let base = Path::new(self.path.as_ref()); + let mut tables = HashSet::new(); + for file in entries.iter() { + let mut parent = Path::new(file.location.as_ref()); + while let Some(p) = parent.parent() { + if p == base { + tables.insert(parent); + } + parent = p; + } + } + for table in tables.iter() { + let file_name = table + .file_name() + .ok_or_else(|| { + DataFusionError::Internal("Cannot parse file name!".to_string()) + })? + .to_str() + .ok_or_else(|| { + DataFusionError::Internal("Cannot parse file name!".to_string()) + })?; + let table_name = table.to_str().ok_or_else(|| { + DataFusionError::Internal("Cannot parse file name!".to_string()) + })?; + if !self.table_exist(file_name) { + let table_name = format!("{}/{}", self.authority, table_name); + let provider = self.factory.create(table_name.as_str()).await?; + let _ = self.register_table(file_name.to_string(), provider.clone())?; + } + } + Ok(()) + } +} + +impl SchemaProvider for ListingSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.tables + .lock() + .expect("Can't lock tables") + .keys() + .map(|it| it.to_string()) + .collect() + } + + fn table(&self, name: &str) -> Option> { + self.tables + .lock() + .expect("Can't lock tables") + .get(name) + .cloned() + } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> datafusion_common::Result>> { + self.tables + .lock() + .expect("Can't lock tables") + .insert(name, table.clone()); + Ok(Some(table)) + } + + fn deregister_table( + &self, + name: &str, + ) -> datafusion_common::Result>> { + Ok(self.tables.lock().expect("Can't lock tables").remove(name)) + } + + fn table_exist(&self, name: &str) -> bool { + self.tables + .lock() + .expect("Can't lock tables") + .contains_key(name) + } +} diff --git a/datafusion/core/src/catalog/mod.rs b/datafusion/core/src/catalog/mod.rs index 0720f451ec6c7..0eabd8698afb0 100644 --- a/datafusion/core/src/catalog/mod.rs +++ b/datafusion/core/src/catalog/mod.rs @@ -21,6 +21,7 @@ #![allow(clippy::module_inception)] pub mod catalog; pub mod information_schema; +pub mod listing_schema; pub mod schema; pub use datafusion_sql::{ResolvedTableReference, TableReference}; diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs index ea9fa765e6a36..8277cef31d4dc 100644 --- a/datafusion/core/src/config.rs +++ b/datafusion/core/src/config.rs @@ -68,6 +68,12 @@ pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str = /// Configuration option "datafusion.optimizer.max_passes" pub const OPT_OPTIMIZER_MAX_PASSES: &str = "datafusion.optimizer.max_passes"; +/// Location scanned to load tables for `default` schema +pub const OPT_CATALOG_LOCATION: &str = "datafusion.catalog.location"; + +/// Type of `TableProvider` to use when loading `default` schema +pub const OPT_CATALOG_TYPE: &str = "datafusion.catalog.type"; + /// Definition of a configuration option pub struct ConfigDefinition { /// key used to identifier this configuration option @@ -144,13 +150,13 @@ impl ConfigDefinition { pub fn new_string( key: impl Into, description: impl Into, - default_value: String, + default_value: Option, ) -> Self { Self::new( key, description, DataType::Utf8, - ScalarValue::Utf8(Some(default_value)), + ScalarValue::Utf8(default_value), ) } } @@ -217,7 +223,7 @@ impl BuiltInConfigs { "The session time zone which some function require \ e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, then extract the hour.", - "UTC".into() + Some("UTC".into()), ), ConfigDefinition::new_bool( OPT_PARQUET_PUSHDOWN_FILTERS, @@ -245,11 +251,22 @@ impl BuiltInConfigs { rule. When set to false, any rules that produce errors will cause the query to fail.", true ), - ConfigDefinition::new_u64( - OPT_OPTIMIZER_MAX_PASSES, - "Number of times that the optimizer will attempt to optimize the plan", - 3 - )] + ConfigDefinition::new_u64( + OPT_OPTIMIZER_MAX_PASSES, + "Number of times that the optimizer will attempt to optimize the plan", + 3 + ), + ConfigDefinition::new_string( + OPT_CATALOG_LOCATION, + "Location scanned to load tables for `default` schema, defaults to None", + None, + ), + ConfigDefinition::new_string( + OPT_CATALOG_TYPE, + "Type of `TableProvider` to use when loading `default` schema. Defaults to None", + None, + ), + ] } } diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index d4962f87359c3..0196958d9f993 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -97,7 +97,10 @@ use datafusion_sql::{ planner::{ContextProvider, SqlToRel}, }; use parquet::file::properties::WriterProperties; +use url::Url; +use crate::catalog::listing_schema::ListingSchemaProvider; +use crate::datasource::object_store::ObjectStoreUrl; use uuid::Uuid; use super::options::{ @@ -914,6 +917,11 @@ impl SessionContext { state.catalog_list.register_catalog(name, catalog) } + /// Retrieves the list of available catalog names. + pub fn catalog_names(&self) -> Vec { + self.state.read().catalog_list.catalog_names() + } + /// Retrieves a [`CatalogProvider`] instance by name pub fn catalog(&self, name: &str) -> Option> { self.state.read().catalog_list.catalog(name) @@ -1258,6 +1266,11 @@ impl SessionConfig { self.set(key, ScalarValue::UInt64(Some(value))) } + /// Set a generic `str` configuration option + pub fn set_str(self, key: &str, value: &str) -> Self { + self.set(key, ScalarValue::Utf8(Some(value.to_string()))) + } + /// Customize batch size pub fn with_batch_size(self, n: usize) -> Self { // batch size must be greater than zero @@ -1508,6 +1521,8 @@ impl SessionState { ) .expect("memory catalog provider can register schema"); + Self::register_default_schema(&config, &runtime, &default_catalog); + let default_catalog: Arc = if config.information_schema { Arc::new(CatalogWithInformationSchema::new( Arc::downgrade(&catalog_list), @@ -1566,6 +1581,48 @@ impl SessionState { } } + fn register_default_schema( + config: &SessionConfig, + runtime: &Arc, + default_catalog: &MemoryCatalogProvider, + ) { + let url = config + .config_options + .read() + .get("datafusion.catalog.location"); + let format = config.config_options.read().get("datafusion.catalog.type"); + let (url, format) = match (url, format) { + (Some(url), Some(format)) => (url, format), + _ => return, + }; + if url.is_null() || format.is_null() { + return; + } + let url = url.to_string(); + let format = format.to_string(); + let url = Url::parse(url.as_str()).expect("Invalid default catalog location!"); + let authority = match url.host_str() { + Some(host) => format!("{}://{}", url.scheme(), host), + None => format!("{}://", url.scheme()), + }; + let path = &url.as_str()[authority.len() as usize..]; + let path = object_store::path::Path::parse(path).expect("Can't parse path"); + let store = ObjectStoreUrl::parse(authority.as_str()) + .expect("Invalid default catalog url"); + let store = match runtime.object_store(store) { + Ok(store) => store, + _ => return, + }; + let factory = match runtime.table_factories.get(format.as_str()) { + Some(factory) => factory, + _ => return, + }; + let schema = ListingSchemaProvider::new(authority, path, factory.clone(), store); + let _ = default_catalog + .register_schema("default", Arc::new(schema)) + .expect("Failed to register default schema"); + } + fn resolve_table_ref<'a>( &'a self, table_ref: impl Into>, @@ -1947,10 +2004,12 @@ impl FunctionRegistry for TaskContext { mod tests { use super::*; use crate::assert_batches_eq; + use crate::datasource::datasource::TableProviderFactory; use crate::execution::context::QueryPlanner; + use crate::execution::runtime_env::RuntimeConfig; use crate::physical_plan::expressions::AvgAccumulator; use crate::test; - use crate::test_util::parquet_test_data; + use crate::test_util::{parquet_test_data, TestTableFactory}; use crate::variable::VarType; use arrow::array::ArrayRef; use arrow::datatypes::*; @@ -1959,9 +2018,10 @@ mod tests { use datafusion_expr::{create_udaf, create_udf, Expr, Volatility}; use datafusion_physical_expr::functions::make_scalar_function; use std::fs::File; + use std::path::PathBuf; use std::sync::Weak; use std::thread::{self, JoinHandle}; - use std::{io::prelude::*, sync::Mutex}; + use std::{env, io::prelude::*, sync::Mutex}; use tempfile::TempDir; #[tokio::test] @@ -2199,6 +2259,41 @@ mod tests { Ok(()) } + #[tokio::test] + async fn with_listing_schema_provider() -> Result<()> { + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let path = path.join("tests/tpch-csv"); + let url = format!("file://{}", path.display()); + + let mut table_factories: HashMap> = + HashMap::new(); + table_factories.insert("test".to_string(), Arc::new(TestTableFactory {})); + let rt_cfg = RuntimeConfig::new().with_table_factories(table_factories); + let runtime = Arc::new(RuntimeEnv::new(rt_cfg).unwrap()); + let cfg = SessionConfig::new() + .set_str("datafusion.catalog.location", url.as_str()) + .set_str("datafusion.catalog.type", "test"); + let session_state = SessionState::with_config_rt(cfg, runtime); + let ctx = SessionContext::with_state(session_state); + + let mut table_count = 0; + for cat_name in ctx.catalog_names().iter() { + let cat = ctx.catalog(cat_name).unwrap(); + for s_name in cat.schema_names().iter() { + let schema = cat.schema(s_name).unwrap(); + if let Some(listing) = + schema.as_any().downcast_ref::() + { + listing.refresh().await.unwrap(); + table_count = schema.table_names().len(); + } + } + } + + assert_eq!(table_count, 8); + Ok(()) + } + #[tokio::test] async fn custom_query_planner() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); diff --git a/datafusion/core/tests/sql/information_schema.rs b/datafusion/core/tests/sql/information_schema.rs index 8d05637170409..3033f50ed4753 100644 --- a/datafusion/core/tests/sql/information_schema.rs +++ b/datafusion/core/tests/sql/information_schema.rs @@ -698,6 +698,8 @@ async fn show_all() { "+-------------------------------------------------+---------+", "| name | setting |", "+-------------------------------------------------+---------+", + "| datafusion.catalog.location | NULL |", + "| datafusion.catalog.type | NULL |", "| datafusion.execution.batch_size | 8192 |", "| datafusion.execution.coalesce_batches | true |", "| datafusion.execution.coalesce_target_batch_size | 4096 |", diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 2404131da3d02..0e2810ef28687 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -37,16 +37,15 @@ Environment variables are read during `SessionConfig` initialisation so they mus | key | type | default | description | | ----------------------------------------------- | ------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.catalog.location | Utf8 | NULL | Location scanned to load tables for `default` schema | +| datafusion.catalog.type | Utf8 | NULL | Type of `TableProvider` to use when loading `default` schema | | datafusion.execution.batch_size | UInt64 | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption. | | datafusion.execution.coalesce_batches | Boolean | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting 'datafusion.execution.coalesce_target_batch_size'. | | datafusion.execution.coalesce_target_batch_size | UInt64 | 4096 | Target batch size when coalescing batches. Uses in conjunction with the configuration setting 'datafusion.execution.coalesce_batches'. | | datafusion.execution.parquet.enable_page_index | Boolean | false | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded. | | datafusion.execution.parquet.pushdown_filters | Boolean | false | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. | | datafusion.execution.parquet.reorder_filters | Boolean | false | If true, filter expressions evaluated during the parquet decoding opearation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query. | -| datafusion.execution.time_zone | Utf8 | UTC | The session time zone which some function require e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, | -| then extract the hour. | -| datafusion.execution.time_zone | Utf8 | UTC | The session time zone which some function require e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, | -| then extract the hour | +| datafusion.execution.time_zone | Utf8 | UTC | The session time zone which some function require e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, then extract the hour. | | datafusion.explain.logical_plan_only | Boolean | false | When set to true, the explain statement will only print logical plans. | | datafusion.explain.physical_plan_only | Boolean | false | When set to true, the explain statement will only print physical plans. | | datafusion.optimizer.filter_null_join_keys | Boolean | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. |