Skip to content

Automatically register tables if ObjectStore root is configured #4095

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 163 additions & 0 deletions datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! listing_schema contains a SchemaProvider that scans ObjectStores for tables automatically
use crate::catalog::schema::SchemaProvider;
use crate::datasource::datasource::TableProviderFactory;
use crate::datasource::TableProvider;
use datafusion_common::DataFusionError;
use futures::TryStreamExt;
use object_store::ObjectStore;
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::{Arc, Mutex};

/// A `SchemaProvider` that scans an `ObjectStore` to automatically discover tables
///
/// A subfolder relationship is assumed, i.e. given:
/// authority = s3://host.example.com:3000
/// path = /data/tpch
/// factory = `DeltaTableFactory`
///
/// A table called "customer" will be registered for the folder:
/// s3://host.example.com:3000/data/tpch/customer
///
/// assuming it contains valid deltalake data, i.e:
/// s3://host.example.com:3000/data/tpch/customer/part-00000-xxxx.snappy.parquet
/// s3://host.example.com:3000/data/tpch/customer/_delta_log/
pub struct ListingSchemaProvider {
authority: String,
path: object_store::path::Path,
factory: Arc<dyn TableProviderFactory>,
store: Arc<dyn ObjectStore>,
tables: Arc<Mutex<HashMap<String, Arc<dyn TableProvider>>>>,
}

impl ListingSchemaProvider {
/// Create a new `ListingSchemaProvider`
///
/// Arguments:
/// `authority`: The scheme (i.e. s3://) + host (i.e. example.com:3000)
/// `path`: The root path that contains subfolders which represent tables
/// `factory`: The `TableProviderFactory` to use to instantiate tables for each subfolder
/// `store`: The `ObjectStore` containing the table data
pub fn new(
authority: String,
path: object_store::path::Path,
factory: Arc<dyn TableProviderFactory>,
store: Arc<dyn ObjectStore>,
) -> Self {
Self {
authority,
path,
factory,
store,
tables: Arc::new(Mutex::new(HashMap::new())),
}
}

/// Reload table information from ObjectStore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend documenting here when refresh() should be called -- like it has to be called explicitly after construction for example

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the big open question with this PR 😄 . Presently, in flight_sql.rs in Ballista, when FlightSql clients enumerate the catalogs/schemas/tables, I am doing a checked downcast_ref to see if it is a ListingSchemaProvider and if so calling this method. Ultimately, I think we should probably extend the SchemaProvider API? The downcast trick certainly doesn't seem elegant.

Unfortunately this is a state synchronization problem, and I'm not sure that the ObjectStore has APIs for file system listeners, so we will need to figure out the best times to try to synchronize the state. Every time we run a query perhaps? My worry is that this could get expensive, I can imagine (or have heard about) each delta table containing 1000 parquet files, and there could probably be 100s of tables, which means a lot of files to scan.

Also unfortunately, it looks like the ObjectStore doesn't let us list only children of a folder - it lists all the files in the entire bucket, thus the weird recursive .parent() stuff.

I thought I would file this PR to get the discussion going.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that there is no policy that will work well for all implementations / usecases so the refresh policy will need to be decided by whatever the upstream system is (e.g. ballista or iox, or whatever)

Adding a refresh() method to SchemaProvider seems fine, though to be honest I think using downcast_ref() also seems fine to me

Would definitely be interested in hearing other opinions on this

pub async fn refresh(&self) -> datafusion_common::Result<()> {
let entries: Vec<_> = self
.store
.list(Some(&self.path))
.await?
.try_collect()
.await?;
let base = Path::new(self.path.as_ref());
let mut tables = HashSet::new();
for file in entries.iter() {
let mut parent = Path::new(file.location.as_ref());
while let Some(p) = parent.parent() {
if p == base {
tables.insert(parent);
}
parent = p;
}
}
for table in tables.iter() {
let file_name = table
.file_name()
.ok_or_else(|| {
DataFusionError::Internal("Cannot parse file name!".to_string())
})?
.to_str()
.ok_or_else(|| {
DataFusionError::Internal("Cannot parse file name!".to_string())
})?;
let table_name = table.to_str().ok_or_else(|| {
DataFusionError::Internal("Cannot parse file name!".to_string())
})?;
if !self.table_exist(file_name) {
let table_name = format!("{}/{}", self.authority, table_name);
let provider = self.factory.create(table_name.as_str()).await?;
let _ = self.register_table(file_name.to_string(), provider.clone())?;
}
}
Ok(())
}
}

impl SchemaProvider for ListingSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn table_names(&self) -> Vec<String> {
self.tables
.lock()
.expect("Can't lock tables")
.keys()
.map(|it| it.to_string())
.collect()
}

fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
self.tables
.lock()
.expect("Can't lock tables")
.get(name)
.cloned()
}

fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
self.tables
.lock()
.expect("Can't lock tables")
.insert(name, table.clone());
Ok(Some(table))
}

fn deregister_table(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
Ok(self.tables.lock().expect("Can't lock tables").remove(name))
}

fn table_exist(&self, name: &str) -> bool {
self.tables
.lock()
.expect("Can't lock tables")
.contains_key(name)
}
}
1 change: 1 addition & 0 deletions datafusion/core/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#![allow(clippy::module_inception)]
pub mod catalog;
pub mod information_schema;
pub mod listing_schema;
pub mod schema;

pub use datafusion_sql::{ResolvedTableReference, TableReference};
33 changes: 25 additions & 8 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str =
/// Configuration option "datafusion.optimizer.max_passes"
pub const OPT_OPTIMIZER_MAX_PASSES: &str = "datafusion.optimizer.max_passes";

/// Location scanned to load tables for `default` schema
pub const OPT_CATALOG_LOCATION: &str = "datafusion.catalog.location";

/// Type of `TableProvider` to use when loading `default` schema
pub const OPT_CATALOG_TYPE: &str = "datafusion.catalog.type";

/// Definition of a configuration option
pub struct ConfigDefinition {
/// key used to identifier this configuration option
Expand Down Expand Up @@ -144,13 +150,13 @@ impl ConfigDefinition {
pub fn new_string(
key: impl Into<String>,
description: impl Into<String>,
default_value: String,
default_value: Option<String>,
) -> Self {
Self::new(
key,
description,
DataType::Utf8,
ScalarValue::Utf8(Some(default_value)),
ScalarValue::Utf8(default_value),
)
}
}
Expand Down Expand Up @@ -217,7 +223,7 @@ impl BuiltInConfigs {
"The session time zone which some function require \
e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone,
then extract the hour.",
"UTC".into()
Some("UTC".into()),
),
ConfigDefinition::new_bool(
OPT_PARQUET_PUSHDOWN_FILTERS,
Expand Down Expand Up @@ -245,11 +251,22 @@ impl BuiltInConfigs {
rule. When set to false, any rules that produce errors will cause the query to fail.",
true
),
ConfigDefinition::new_u64(
OPT_OPTIMIZER_MAX_PASSES,
"Number of times that the optimizer will attempt to optimize the plan",
3
)]
ConfigDefinition::new_u64(
OPT_OPTIMIZER_MAX_PASSES,
"Number of times that the optimizer will attempt to optimize the plan",
3
),
ConfigDefinition::new_string(
OPT_CATALOG_LOCATION,
"Location scanned to load tables for `default` schema, defaults to None",
None,
),
ConfigDefinition::new_string(
OPT_CATALOG_TYPE,
"Type of `TableProvider` to use when loading `default` schema. Defaults to None",
None,
),
]
}
}

Expand Down
99 changes: 97 additions & 2 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ use datafusion_sql::{
planner::{ContextProvider, SqlToRel},
};
use parquet::file::properties::WriterProperties;
use url::Url;

use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::datasource::object_store::ObjectStoreUrl;
use uuid::Uuid;

use super::options::{
Expand Down Expand Up @@ -914,6 +917,11 @@ impl SessionContext {
state.catalog_list.register_catalog(name, catalog)
}

/// Retrieves the list of available catalog names.
pub fn catalog_names(&self) -> Vec<String> {
self.state.read().catalog_list.catalog_names()
}

/// Retrieves a [`CatalogProvider`] instance by name
pub fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
self.state.read().catalog_list.catalog(name)
Expand Down Expand Up @@ -1258,6 +1266,11 @@ impl SessionConfig {
self.set(key, ScalarValue::UInt64(Some(value)))
}

/// Set a generic `str` configuration option
pub fn set_str(self, key: &str, value: &str) -> Self {
self.set(key, ScalarValue::Utf8(Some(value.to_string())))
}

/// Customize batch size
pub fn with_batch_size(self, n: usize) -> Self {
// batch size must be greater than zero
Expand Down Expand Up @@ -1508,6 +1521,8 @@ impl SessionState {
)
.expect("memory catalog provider can register schema");

Self::register_default_schema(&config, &runtime, &default_catalog);

let default_catalog: Arc<dyn CatalogProvider> = if config.information_schema {
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(&catalog_list),
Expand Down Expand Up @@ -1566,6 +1581,48 @@ impl SessionState {
}
}

fn register_default_schema(
config: &SessionConfig,
runtime: &Arc<RuntimeEnv>,
default_catalog: &MemoryCatalogProvider,
) {
let url = config
.config_options
.read()
.get("datafusion.catalog.location");
let format = config.config_options.read().get("datafusion.catalog.type");
let (url, format) = match (url, format) {
(Some(url), Some(format)) => (url, format),
_ => return,
};
if url.is_null() || format.is_null() {
return;
}
let url = url.to_string();
let format = format.to_string();
let url = Url::parse(url.as_str()).expect("Invalid default catalog location!");
let authority = match url.host_str() {
Some(host) => format!("{}://{}", url.scheme(), host),
None => format!("{}://", url.scheme()),
};
let path = &url.as_str()[authority.len() as usize..];
let path = object_store::path::Path::parse(path).expect("Can't parse path");
let store = ObjectStoreUrl::parse(authority.as_str())
.expect("Invalid default catalog url");
let store = match runtime.object_store(store) {
Ok(store) => store,
_ => return,
};
let factory = match runtime.table_factories.get(format.as_str()) {
Some(factory) => factory,
_ => return,
};
let schema = ListingSchemaProvider::new(authority, path, factory.clone(), store);
let _ = default_catalog
.register_schema("default", Arc::new(schema))
.expect("Failed to register default schema");
}

fn resolve_table_ref<'a>(
&'a self,
table_ref: impl Into<TableReference<'a>>,
Expand Down Expand Up @@ -1947,10 +2004,12 @@ impl FunctionRegistry for TaskContext {
mod tests {
use super::*;
use crate::assert_batches_eq;
use crate::datasource::datasource::TableProviderFactory;
use crate::execution::context::QueryPlanner;
use crate::execution::runtime_env::RuntimeConfig;
use crate::physical_plan::expressions::AvgAccumulator;
use crate::test;
use crate::test_util::parquet_test_data;
use crate::test_util::{parquet_test_data, TestTableFactory};
use crate::variable::VarType;
use arrow::array::ArrayRef;
use arrow::datatypes::*;
Expand All @@ -1959,9 +2018,10 @@ mod tests {
use datafusion_expr::{create_udaf, create_udf, Expr, Volatility};
use datafusion_physical_expr::functions::make_scalar_function;
use std::fs::File;
use std::path::PathBuf;
use std::sync::Weak;
use std::thread::{self, JoinHandle};
use std::{io::prelude::*, sync::Mutex};
use std::{env, io::prelude::*, sync::Mutex};
use tempfile::TempDir;

#[tokio::test]
Expand Down Expand Up @@ -2199,6 +2259,41 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn with_listing_schema_provider() -> Result<()> {
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let path = path.join("tests/tpch-csv");
let url = format!("file://{}", path.display());

let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
HashMap::new();
table_factories.insert("test".to_string(), Arc::new(TestTableFactory {}));
let rt_cfg = RuntimeConfig::new().with_table_factories(table_factories);
let runtime = Arc::new(RuntimeEnv::new(rt_cfg).unwrap());
let cfg = SessionConfig::new()
.set_str("datafusion.catalog.location", url.as_str())
.set_str("datafusion.catalog.type", "test");
let session_state = SessionState::with_config_rt(cfg, runtime);
let ctx = SessionContext::with_state(session_state);

let mut table_count = 0;
for cat_name in ctx.catalog_names().iter() {
let cat = ctx.catalog(cat_name).unwrap();
for s_name in cat.schema_names().iter() {
let schema = cat.schema(s_name).unwrap();
if let Some(listing) =
schema.as_any().downcast_ref::<ListingSchemaProvider>()
{
listing.refresh().await.unwrap();
table_count = schema.table_names().len();
}
}
}

assert_eq!(table_count, 8);
Ok(())
}

#[tokio::test]
async fn custom_query_planner() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
Expand Down
Loading