-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
An asynchronous version of CatalogList
/CatalogProvider
/SchemaProvider
#3777
Comments
I would also be interested in this in this feature. I tried using |
I found a workaround. In this case you can spawn a thread dedicating to block: use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
trait CatalogList {
fn catalog(&self, name: &str) -> Option<Arc<CatalogProvider>>;
}
struct CatalogProvider;
#[derive(Default)]
struct CatalogListImpl {
map: Arc<RwLock<HashMap<String, Arc<CatalogProvider>>>>,
}
impl CatalogList for CatalogListImpl {
fn catalog(&self, name: &str) -> Option<Arc<CatalogProvider>> {
let map = self.map.clone();
let name = name.to_string();
std::thread::spawn(move || {
futures::executor::block_on(async {
map.read().await.get(&name).cloned()
})
}).join().unwrap()
}
}
#[tokio::main]
async fn main() {
let catalog_list = CatalogListImpl::default();
assert!(catalog_list.catalog("hello").is_none());
} |
Hi, thanks for the quick reply. I'm trying to implement an iceberg catalog for datafusion. It functions much like a common catalog with schemas(namespaces) and table identifiers. From what I understood about the
Awesome, thanks for the heads up. It is probably not the cleanest thing to span another thread while the rest of the application is executed with tokio, but it should work for the time being. |
I had a look at the code and async versions of The solution with Investigating the I'm not sure if that is a route to go, but I could create a branch with the changes that would need to be included in datafusion. |
I am not 100% familiar but I know there has been a bunch of work recently to support delta-lake style catalogs in DataFusion using Maybe someone like @avantgardnerio @timvw @milenkovicm who worked on these features more directly has some other suggestions Some related PRs: Here is The stuff in IOx predates any of these new interfaces, so I would likely not recommend it going forward cc @crepererum |
IMHO it makes sense to make the catalog interfaces async. FWIW IOx gets around this because we "discover" the whole catalog/tables/schemas before we build the DataFusion data structures. This mostly works because our APIs are scoped in a way that allows that. However midterm I think even IOx could be improved by async schema exploration, because this would lower our the amount of data we need to unconditionally prefetech/prepare (e.g. we would only need to construct schemas for tables that are actually touched by a query). |
I tend to agree with @crepererum, most of the factory methods are change may not be a small one, as it will end up changing |
In https://github.com/datafusion-contrib/datafusion-catalogprovider-glue we also first "discover/fetch" the tables/schemas and as of then it's "static/cached".. Async would enable a whole set of use-cases (eg: refreshing catalog(parts)). |
Did an attempt to refactor |
I'm not opposed, as long as we are mindful not to break Ballista (which I don't think we will). FWIW, when we needed this for Ballista, we just added the |
The issue with |
Makes sense. Also |
If there is a consensus that an async interface is preferable, I can try to make the necessary changes. |
I recommend we send send a note to the mailing list / on slack to see if we can make sure we have wide consensus I remember last time we tried proposing something like this there was non trivial pushback |
I am happy to help with that communication |
Vote for the async interface. Although some workarounds can achieve the needed functionality, an async fetch method looks more intuitive to me. From a positive aspect, the existing sync code inside those providers is compatible with the async version. (but there are still lots of work to do in other places 🥲) |
That would be great. Should we draft a more detailed spec for the communication or is it clear as is? |
I think a more concise proposal would be a great idea. Perhaps basically explain what methods would become async and we can write a brief intro about what the usecase is (remote catalogs that might need to make async calls to fetch information) |
Is your feature request related to a problem or challenge? Please describe The traits
Describe the solution you'd like The same traits with asynchronous functions that additionally include error handling (return [async_trait]
pub trait CatalogList: Sync + Send {
fn as_any(&self) -> &dyn Any;
async fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>
) -> Result<Option<Arc<dyn CatalogProvider>>>;
async fn catalog_names(&self) -> Result<Vec<String>>;
async fn catalog(&self, name: &str) -> Result<Option<Arc<dyn CatalogProvider>>>;
} [async_trait]
pub trait CatalogProvider: Sync + Send {
fn as_any(&self) -> &dyn Any;
async fn schema_names(&self) -> Result<Vec<String>>;
async fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>>;
async fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>
) -> Result<Option<Arc<dyn SchemaProvider>>> { ... }
} [async_trait]
pub trait SchemaProvider: Sync + Send {
fn as_any(&self) -> &dyn Any;
async fn table_names(&self) -> Result<Vec<String>>;
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>>;
async fn table_exist(&self, name: &str) -> Result<bool>;
async fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>
) -> Result<Option<Arc<dyn TableProvider>>> { ... }
async fn deregister_table(
&self,
name: &str
) -> Result<Option<Arc<dyn TableProvider>>> { ... }
} Consequences These traits are extensively used across the datafusion crate and would require Due to the changes in the public interface of datafusion the proposal will lead Describe alternatives you've considered
|
Here is a draft for a proposal. Feel free to add anything if I forgot something |
Looks great -- thank you @JanKaul -- I sent out a "FYI" email / slack notifications and hopefully we'll get enough consensus over the next few days to get a sense of how to proceed |
Regarding all three crates: it seems weird to me that we mix state access (i.e. listing and requesting objects) with modifiers. If you have a completely static / DB-backed / remote catalog, it's unlikely that the modifiers make sense. Instead, modifications (i.e. when/how/if sub-units are found) is a mere implementation detail of the provider/list. In many cases, you may not be able to modify the provider at all. So I would propose to maybe split the modifiers from the actual provider traits? Like: [async_trait]
pub trait CatalogList: Sync + Send {
fn as_any(&self) -> &dyn Any;
async fn catalog_names(&self) -> Result<Vec<String>>;
async fn catalog(&self, name: &str) -> Result<Option<Arc<dyn CatalogProvider>>>;
}
[async_trait]
pub trait CatalogListWriter: CatalogList {
fn as_any(&self) -> &dyn Any;
async fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>
) -> Result<Option<Arc<dyn CatalogProvider>>>;
async fn deregister_catalog(&self, name: &str) -> Result<Option<Arc<dyn CatalogProvider>>>;
} Also since the providers now may perform IO, this is a good opportunity to introduce error handling (e.g. make all methods return a |
Good point, I agree 👍
This makes sense in some perspectives, but how would the proposed traits ( For the situation "not be able to modify the provider" another way is to keep the definition as-is, and leave the detail to the implementor. If it (like a completely static catalog) can not do modification, it can return an error and fail this operation. But I'm not very familiar with other use cases like iceberg catalog, would like to hear more opinions (and thanks in advance). |
Sounds good to me. If all modifiers return |
I have updated my proposal accordingly |
Does it make sense to make the interface async at both the |
I think the current The |
That's also a good point. I guess this would work for most use cases. On the other hand I think the biggest burden is to change the public interface of datafusion and update the dependencies. If the dependiencies have to be updated due to changes in |
It looks like no one was opposed to the proposal. I guess I can start the implementation. |
After a more detailed look at the source code I would like to come back to your suggestion @thinkharderdev. If we use async traits for I would like to get your opinion on which of the following two solutions you prefer:
React with: |
Looking at the interface of The constraint then becomes, what information is needed to construct a My question is therefore, are there use-cases where the number of tables exceeds what can be stored in memory? If not I don't see a compelling reason to make A similar argument could be made for Edit: Apologies for not commenting sooner I've had too many plates lately to keep track of all these things, but seeing the complexity explosion this results in, with self-recursive async functions etc..., I'm a little bit apprehensive about this... Edit Edit: Making TableProvider::schema async runs into the same issue of ending up making the planner async. I'll keep playing around |
I think the biggest issue with the making impl ContextProvider for SessionState {
fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
let resolved_ref = self.resolve_table_ref(name);
match self.schema_for_ref(resolved_ref) {
Ok(schema) => {
let provider = schema.table(resolved_ref.table).ok_or_else(|| { // <= async would be introduced here
DataFusionError::Plan(format!(
"table '{}.{}.{}' not found",
resolved_ref.catalog, resolved_ref.schema, resolved_ref.table
))
})?;
Ok(provider_as_source(provider))
}
Err(e) => Err(e),
}
}
...
}
Actually, after rethinking my earlier comment, I think it could make sense to make One problem with this is that external changes to the underlying storage system are not automatically reflected in the in memory mirror, thus one would need to make sure to trigger a "synchronisation" operation before using the trait. Additionally, one could provide sync and async versions for the reading methods, allowing operations that want to be sure the mirror is up to date to use the async versions. |
Sorry for the delay, I plan to spend some time on this this week and try to come up with a proposal. I think the key thing is to make it easy and obvious how to hook in a remote catalog, without requiring all of planning to become async. I'll see what I can come up with |
#4607 is now ready for review, PTAL. Hopefully we can get this in to make the release at the end of this week 😄 |
* Make SchemaProvider async (#3777) * Cleanup * Workaround for ShowCreate * Clippy and doctest * Update sqllogictests * Review feedback * Format
Thank you for your efforts! |
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
I'm trying to implement a
CatalogList
/CatalogProvider
/SchemaProvider
that based on some remote storage service like ZooKeeper/Etcd.Describe the solution you'd like
But current
CatalogList
/CatalogProvider
/SchemaProvider
trait is in a sync manner.https://github.com/apache/arrow-datafusion/blob/e54110fb592e03704da5f6ebd832b8fe1c51123b/datafusion/core/src/catalog/catalog.rs#L29-L47
That makes asynchronously accessing remote storage service impossible while implementing
CatalogList
/CatalogProvider
/SchemaProvider
.Describe alternatives you've considered
Maybe change these traits to async or provide an async version?
Additional context
I see projects like influxdb-iox did some hack to get around this problem.
The text was updated successfully, but these errors were encountered: