|
| 1 | +// Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +// or more contributor license agreements. See the NOTICE file |
| 3 | +// distributed with this work for additional information |
| 4 | +// regarding copyright ownership. The ASF licenses this file |
| 5 | +// to you under the Apache License, Version 2.0 (the |
| 6 | +// "License"); you may not use this file except in compliance |
| 7 | +// with the License. You may obtain a copy of the License at |
| 8 | +// |
| 9 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +// |
| 11 | +// Unless required by applicable law or agreed to in writing, |
| 12 | +// software distributed under the License is distributed on an |
| 13 | +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +// KIND, either express or implied. See the License for the |
| 15 | +// specific language governing permissions and limitations |
| 16 | +// under the License. |
| 17 | + |
| 18 | +use std::any::Any; |
| 19 | +use std::sync::Arc; |
| 20 | + |
| 21 | +pub use crate::schema::SchemaProvider; |
| 22 | +use datafusion_common::not_impl_err; |
| 23 | +use datafusion_common::Result; |
| 24 | + |
| 25 | +/// Represents a catalog, comprising a number of named schemas. |
| 26 | +/// |
| 27 | +/// # Catalog Overview |
| 28 | +/// |
| 29 | +/// To plan and execute queries, DataFusion needs a "Catalog" that provides |
| 30 | +/// metadata such as which schemas and tables exist, their columns and data |
| 31 | +/// types, and how to access the data. |
| 32 | +/// |
| 33 | +/// The Catalog API consists: |
| 34 | +/// * [`CatalogProviderList`]: a collection of `CatalogProvider`s |
| 35 | +/// * [`CatalogProvider`]: a collection of `SchemaProvider`s (sometimes called a "database" in other systems) |
| 36 | +/// * [`SchemaProvider`]: a collection of `TableProvider`s (often called a "schema" in other systems) |
| 37 | +/// * [`TableProvider]`: individual tables |
| 38 | +/// |
| 39 | +/// # Implementing Catalogs |
| 40 | +/// |
| 41 | +/// To implement a catalog, you implement at least one of the [`CatalogProviderList`], |
| 42 | +/// [`CatalogProvider`] and [`SchemaProvider`] traits and register them |
| 43 | +/// appropriately the [`SessionContext`]. |
| 44 | +/// |
| 45 | +/// [`SessionContext`]: crate::execution::context::SessionContext |
| 46 | +/// |
| 47 | +/// DataFusion comes with a simple in-memory catalog implementation, |
| 48 | +/// [`MemoryCatalogProvider`], that is used by default and has no persistence. |
| 49 | +/// DataFusion does not include more complex Catalog implementations because |
| 50 | +/// catalog management is a key design choice for most data systems, and thus |
| 51 | +/// it is unlikely that any general-purpose catalog implementation will work |
| 52 | +/// well across many use cases. |
| 53 | +/// |
| 54 | +/// # Implementing "Remote" catalogs |
| 55 | +/// |
| 56 | +/// Sometimes catalog information is stored remotely and requires a network call |
| 57 | +/// to retrieve. For example, the [Delta Lake] table format stores table |
| 58 | +/// metadata in files on S3 that must be first downloaded to discover what |
| 59 | +/// schemas and tables exist. |
| 60 | +/// |
| 61 | +/// [Delta Lake]: https://delta.io/ |
| 62 | +/// |
| 63 | +/// The [`CatalogProvider`] can support this use case, but it takes some care. |
| 64 | +/// The planning APIs in DataFusion are not `async` and thus network IO can not |
| 65 | +/// be performed "lazily" / "on demand" during query planning. The rationale for |
| 66 | +/// this design is that using remote procedure calls for all catalog accesses |
| 67 | +/// required for query planning would likely result in multiple network calls |
| 68 | +/// per plan, resulting in very poor planning performance. |
| 69 | +/// |
| 70 | +/// To implement [`CatalogProvider`] and [`SchemaProvider`] for remote catalogs, |
| 71 | +/// you need to provide an in memory snapshot of the required metadata. Most |
| 72 | +/// systems typically either already have this information cached locally or can |
| 73 | +/// batch access to the remote catalog to retrieve multiple schemas and tables |
| 74 | +/// in a single network call. |
| 75 | +/// |
| 76 | +/// Note that [`SchemaProvider::table`] is an `async` function in order to |
| 77 | +/// simplify implementing simple [`SchemaProvider`]s. For many table formats it |
| 78 | +/// is easy to list all available tables but there is additional non trivial |
| 79 | +/// access required to read table details (e.g. statistics). |
| 80 | +/// |
| 81 | +/// The pattern that DataFusion itself uses to plan SQL queries is to walk over |
| 82 | +/// the query to [find all table references], |
| 83 | +/// performing required remote catalog in parallel, and then plans the query |
| 84 | +/// using that snapshot. |
| 85 | +/// |
| 86 | +/// [find all table references]: resolve_table_references |
| 87 | +/// |
| 88 | +/// # Example Catalog Implementations |
| 89 | +/// |
| 90 | +/// Here are some examples of how to implement custom catalogs: |
| 91 | +/// |
| 92 | +/// * [`datafusion-cli`]: [`DynamicFileCatalogProvider`] catalog provider |
| 93 | +/// that treats files and directories on a filesystem as tables. |
| 94 | +/// |
| 95 | +/// * The [`catalog.rs`]: a simple directory based catalog. |
| 96 | +/// |
| 97 | +/// * [delta-rs]: [`UnityCatalogProvider`] implementation that can |
| 98 | +/// read from Delta Lake tables |
| 99 | +/// |
| 100 | +/// [`datafusion-cli`]: https://datafusion.apache.org/user-guide/cli/index.html |
| 101 | +/// [`DynamicFileCatalogProvider`]: https://github.com/apache/datafusion/blob/31b9b48b08592b7d293f46e75707aad7dadd7cbc/datafusion-cli/src/catalog.rs#L75 |
| 102 | +/// [`catalog.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/catalog.rs |
| 103 | +/// [delta-rs]: https://github.com/delta-io/delta-rs |
| 104 | +/// [`UnityCatalogProvider`]: https://github.com/delta-io/delta-rs/blob/951436ecec476ce65b5ed3b58b50fb0846ca7b91/crates/deltalake-core/src/data_catalog/unity/datafusion.rs#L111-L123 |
| 105 | +/// |
| 106 | +/// [`TableProvider]: crate::datasource::TableProvider |
| 107 | +
|
| 108 | +pub trait CatalogProvider: Sync + Send { |
| 109 | + /// Returns the catalog provider as [`Any`] |
| 110 | + /// so that it can be downcast to a specific implementation. |
| 111 | + fn as_any(&self) -> &dyn Any; |
| 112 | + |
| 113 | + /// Retrieves the list of available schema names in this catalog. |
| 114 | + fn schema_names(&self) -> Vec<String>; |
| 115 | + |
| 116 | + /// Retrieves a specific schema from the catalog by name, provided it exists. |
| 117 | + fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>; |
| 118 | + |
| 119 | + /// Adds a new schema to this catalog. |
| 120 | + /// |
| 121 | + /// If a schema of the same name existed before, it is replaced in |
| 122 | + /// the catalog and returned. |
| 123 | + /// |
| 124 | + /// By default returns a "Not Implemented" error |
| 125 | + fn register_schema( |
| 126 | + &self, |
| 127 | + name: &str, |
| 128 | + schema: Arc<dyn SchemaProvider>, |
| 129 | + ) -> Result<Option<Arc<dyn SchemaProvider>>> { |
| 130 | + // use variables to avoid unused variable warnings |
| 131 | + let _ = name; |
| 132 | + let _ = schema; |
| 133 | + not_impl_err!("Registering new schemas is not supported") |
| 134 | + } |
| 135 | + |
| 136 | + /// Removes a schema from this catalog. Implementations of this method should return |
| 137 | + /// errors if the schema exists but cannot be dropped. For example, in DataFusion's |
| 138 | + /// default in-memory catalog, [`MemoryCatalogProvider`], a non-empty schema |
| 139 | + /// will only be successfully dropped when `cascade` is true. |
| 140 | + /// This is equivalent to how DROP SCHEMA works in PostgreSQL. |
| 141 | + /// |
| 142 | + /// Implementations of this method should return None if schema with `name` |
| 143 | + /// does not exist. |
| 144 | + /// |
| 145 | + /// By default returns a "Not Implemented" error |
| 146 | + fn deregister_schema( |
| 147 | + &self, |
| 148 | + _name: &str, |
| 149 | + _cascade: bool, |
| 150 | + ) -> Result<Option<Arc<dyn SchemaProvider>>> { |
| 151 | + not_impl_err!("Deregistering new schemas is not supported") |
| 152 | + } |
| 153 | +} |
0 commit comments