Skip to content

Commit

Permalink
Automatically register tables if ObjectStore root is configured (apac…
Browse files Browse the repository at this point in the history
…he#4095)

* squash

* Debug test in CI :'(

* Hashing inconsistency

* Address Andy's concerns

* Docs

* Docs

* fmt

* treat empty string like None :(

* clippy

* PR feedback

* Update datafusion/core/src/config.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Update datafusion/core/src/config.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
2 people authored and Ted-Jiang committed Nov 5, 2022
1 parent 024d543 commit 5922cdd
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 14 deletions.
163 changes: 163 additions & 0 deletions datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! listing_schema contains a SchemaProvider that scans ObjectStores for tables automatically
use crate::catalog::schema::SchemaProvider;
use crate::datasource::datasource::TableProviderFactory;
use crate::datasource::TableProvider;
use datafusion_common::DataFusionError;
use futures::TryStreamExt;
use object_store::ObjectStore;
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::{Arc, Mutex};

/// A `SchemaProvider` that scans an `ObjectStore` to automatically discover tables
///
/// A subfolder relationship is assumed, i.e. given:
/// authority = s3://host.example.com:3000
/// path = /data/tpch
/// factory = `DeltaTableFactory`
///
/// A table called "customer" will be registered for the folder:
/// s3://host.example.com:3000/data/tpch/customer
///
/// assuming it contains valid deltalake data, i.e:
/// s3://host.example.com:3000/data/tpch/customer/part-00000-xxxx.snappy.parquet
/// s3://host.example.com:3000/data/tpch/customer/_delta_log/
pub struct ListingSchemaProvider {
authority: String,
path: object_store::path::Path,
factory: Arc<dyn TableProviderFactory>,
store: Arc<dyn ObjectStore>,
tables: Arc<Mutex<HashMap<String, Arc<dyn TableProvider>>>>,
}

impl ListingSchemaProvider {
/// Create a new `ListingSchemaProvider`
///
/// Arguments:
/// `authority`: The scheme (i.e. s3://) + host (i.e. example.com:3000)
/// `path`: The root path that contains subfolders which represent tables
/// `factory`: The `TableProviderFactory` to use to instantiate tables for each subfolder
/// `store`: The `ObjectStore` containing the table data
pub fn new(
authority: String,
path: object_store::path::Path,
factory: Arc<dyn TableProviderFactory>,
store: Arc<dyn ObjectStore>,
) -> Self {
Self {
authority,
path,
factory,
store,
tables: Arc::new(Mutex::new(HashMap::new())),
}
}

/// Reload table information from ObjectStore
pub async fn refresh(&self) -> datafusion_common::Result<()> {
let entries: Vec<_> = self
.store
.list(Some(&self.path))
.await?
.try_collect()
.await?;
let base = Path::new(self.path.as_ref());
let mut tables = HashSet::new();
for file in entries.iter() {
let mut parent = Path::new(file.location.as_ref());
while let Some(p) = parent.parent() {
if p == base {
tables.insert(parent);
}
parent = p;
}
}
for table in tables.iter() {
let file_name = table
.file_name()
.ok_or_else(|| {
DataFusionError::Internal("Cannot parse file name!".to_string())
})?
.to_str()
.ok_or_else(|| {
DataFusionError::Internal("Cannot parse file name!".to_string())
})?;
let table_name = table.to_str().ok_or_else(|| {
DataFusionError::Internal("Cannot parse file name!".to_string())
})?;
if !self.table_exist(file_name) {
let table_name = format!("{}/{}", self.authority, table_name);
let provider = self.factory.create(table_name.as_str()).await?;
let _ = self.register_table(file_name.to_string(), provider.clone())?;
}
}
Ok(())
}
}

impl SchemaProvider for ListingSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn table_names(&self) -> Vec<String> {
self.tables
.lock()
.expect("Can't lock tables")
.keys()
.map(|it| it.to_string())
.collect()
}

fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
self.tables
.lock()
.expect("Can't lock tables")
.get(name)
.cloned()
}

fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
self.tables
.lock()
.expect("Can't lock tables")
.insert(name, table.clone());
Ok(Some(table))
}

fn deregister_table(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
Ok(self.tables.lock().expect("Can't lock tables").remove(name))
}

fn table_exist(&self, name: &str) -> bool {
self.tables
.lock()
.expect("Can't lock tables")
.contains_key(name)
}
}
1 change: 1 addition & 0 deletions datafusion/core/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#![allow(clippy::module_inception)]
pub mod catalog;
pub mod information_schema;
pub mod listing_schema;
pub mod schema;

pub use datafusion_sql::{ResolvedTableReference, TableReference};
33 changes: 25 additions & 8 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str =
/// Configuration option "datafusion.optimizer.max_passes"
pub const OPT_OPTIMIZER_MAX_PASSES: &str = "datafusion.optimizer.max_passes";

/// Location scanned to load tables for `default` schema
pub const OPT_CATALOG_LOCATION: &str = "datafusion.catalog.location";

/// Type of `TableProvider` to use when loading `default` schema
pub const OPT_CATALOG_TYPE: &str = "datafusion.catalog.type";

/// Definition of a configuration option
pub struct ConfigDefinition {
/// key used to identifier this configuration option
Expand Down Expand Up @@ -144,13 +150,13 @@ impl ConfigDefinition {
pub fn new_string(
key: impl Into<String>,
description: impl Into<String>,
default_value: String,
default_value: Option<String>,
) -> Self {
Self::new(
key,
description,
DataType::Utf8,
ScalarValue::Utf8(Some(default_value)),
ScalarValue::Utf8(default_value),
)
}
}
Expand Down Expand Up @@ -217,7 +223,7 @@ impl BuiltInConfigs {
"The session time zone which some function require \
e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone,
then extract the hour.",
"UTC".into()
Some("UTC".into()),
),
ConfigDefinition::new_bool(
OPT_PARQUET_PUSHDOWN_FILTERS,
Expand Down Expand Up @@ -245,11 +251,22 @@ impl BuiltInConfigs {
rule. When set to false, any rules that produce errors will cause the query to fail.",
true
),
ConfigDefinition::new_u64(
OPT_OPTIMIZER_MAX_PASSES,
"Number of times that the optimizer will attempt to optimize the plan",
3
)]
ConfigDefinition::new_u64(
OPT_OPTIMIZER_MAX_PASSES,
"Number of times that the optimizer will attempt to optimize the plan",
3
),
ConfigDefinition::new_string(
OPT_CATALOG_LOCATION,
"Location scanned to load tables for `default` schema, defaults to None",
None,
),
ConfigDefinition::new_string(
OPT_CATALOG_TYPE,
"Type of `TableProvider` to use when loading `default` schema. Defaults to None",
None,
),
]
}
}

Expand Down
99 changes: 97 additions & 2 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ use datafusion_sql::{
planner::{ContextProvider, SqlToRel},
};
use parquet::file::properties::WriterProperties;
use url::Url;

use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::datasource::object_store::ObjectStoreUrl;
use uuid::Uuid;

use super::options::{
Expand Down Expand Up @@ -914,6 +917,11 @@ impl SessionContext {
state.catalog_list.register_catalog(name, catalog)
}

/// Retrieves the list of available catalog names.
pub fn catalog_names(&self) -> Vec<String> {
self.state.read().catalog_list.catalog_names()
}

/// Retrieves a [`CatalogProvider`] instance by name
pub fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
self.state.read().catalog_list.catalog(name)
Expand Down Expand Up @@ -1258,6 +1266,11 @@ impl SessionConfig {
self.set(key, ScalarValue::UInt64(Some(value)))
}

/// Set a generic `str` configuration option
pub fn set_str(self, key: &str, value: &str) -> Self {
self.set(key, ScalarValue::Utf8(Some(value.to_string())))
}

/// Customize batch size
pub fn with_batch_size(self, n: usize) -> Self {
// batch size must be greater than zero
Expand Down Expand Up @@ -1508,6 +1521,8 @@ impl SessionState {
)
.expect("memory catalog provider can register schema");

Self::register_default_schema(&config, &runtime, &default_catalog);

let default_catalog: Arc<dyn CatalogProvider> = if config.information_schema {
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(&catalog_list),
Expand Down Expand Up @@ -1566,6 +1581,48 @@ impl SessionState {
}
}

fn register_default_schema(
config: &SessionConfig,
runtime: &Arc<RuntimeEnv>,
default_catalog: &MemoryCatalogProvider,
) {
let url = config
.config_options
.read()
.get("datafusion.catalog.location");
let format = config.config_options.read().get("datafusion.catalog.type");
let (url, format) = match (url, format) {
(Some(url), Some(format)) => (url, format),
_ => return,
};
if url.is_null() || format.is_null() {
return;
}
let url = url.to_string();
let format = format.to_string();
let url = Url::parse(url.as_str()).expect("Invalid default catalog location!");
let authority = match url.host_str() {
Some(host) => format!("{}://{}", url.scheme(), host),
None => format!("{}://", url.scheme()),
};
let path = &url.as_str()[authority.len() as usize..];
let path = object_store::path::Path::parse(path).expect("Can't parse path");
let store = ObjectStoreUrl::parse(authority.as_str())
.expect("Invalid default catalog url");
let store = match runtime.object_store(store) {
Ok(store) => store,
_ => return,
};
let factory = match runtime.table_factories.get(format.as_str()) {
Some(factory) => factory,
_ => return,
};
let schema = ListingSchemaProvider::new(authority, path, factory.clone(), store);
let _ = default_catalog
.register_schema("default", Arc::new(schema))
.expect("Failed to register default schema");
}

fn resolve_table_ref<'a>(
&'a self,
table_ref: impl Into<TableReference<'a>>,
Expand Down Expand Up @@ -1947,10 +2004,12 @@ impl FunctionRegistry for TaskContext {
mod tests {
use super::*;
use crate::assert_batches_eq;
use crate::datasource::datasource::TableProviderFactory;
use crate::execution::context::QueryPlanner;
use crate::execution::runtime_env::RuntimeConfig;
use crate::physical_plan::expressions::AvgAccumulator;
use crate::test;
use crate::test_util::parquet_test_data;
use crate::test_util::{parquet_test_data, TestTableFactory};
use crate::variable::VarType;
use arrow::array::ArrayRef;
use arrow::datatypes::*;
Expand All @@ -1959,9 +2018,10 @@ mod tests {
use datafusion_expr::{create_udaf, create_udf, Expr, Volatility};
use datafusion_physical_expr::functions::make_scalar_function;
use std::fs::File;
use std::path::PathBuf;
use std::sync::Weak;
use std::thread::{self, JoinHandle};
use std::{io::prelude::*, sync::Mutex};
use std::{env, io::prelude::*, sync::Mutex};
use tempfile::TempDir;

#[tokio::test]
Expand Down Expand Up @@ -2199,6 +2259,41 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn with_listing_schema_provider() -> Result<()> {
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let path = path.join("tests/tpch-csv");
let url = format!("file://{}", path.display());

let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
HashMap::new();
table_factories.insert("test".to_string(), Arc::new(TestTableFactory {}));
let rt_cfg = RuntimeConfig::new().with_table_factories(table_factories);
let runtime = Arc::new(RuntimeEnv::new(rt_cfg).unwrap());
let cfg = SessionConfig::new()
.set_str("datafusion.catalog.location", url.as_str())
.set_str("datafusion.catalog.type", "test");
let session_state = SessionState::with_config_rt(cfg, runtime);
let ctx = SessionContext::with_state(session_state);

let mut table_count = 0;
for cat_name in ctx.catalog_names().iter() {
let cat = ctx.catalog(cat_name).unwrap();
for s_name in cat.schema_names().iter() {
let schema = cat.schema(s_name).unwrap();
if let Some(listing) =
schema.as_any().downcast_ref::<ListingSchemaProvider>()
{
listing.refresh().await.unwrap();
table_count = schema.table_names().len();
}
}
}

assert_eq!(table_count, 8);
Ok(())
}

#[tokio::test]
async fn custom_query_planner() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
Expand Down
Loading

0 comments on commit 5922cdd

Please sign in to comment.