Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

An asynchronous version of CatalogList/CatalogProvider/SchemaProvider #3777

Closed
v0y4g3r opened this issue Oct 10, 2022 · 36 comments · Fixed by #4607
Closed

An asynchronous version of CatalogList/CatalogProvider/SchemaProvider #3777

v0y4g3r opened this issue Oct 10, 2022 · 36 comments · Fixed by #4607
Labels
enhancement New feature or request

Comments

@v0y4g3r
Copy link

v0y4g3r commented Oct 10, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

I'm trying to implement a CatalogList/CatalogProvider/SchemaProvider that based on some remote storage service like ZooKeeper/Etcd.

Describe the solution you'd like
But current CatalogList/CatalogProvider/SchemaProvider trait is in a sync manner.

https://github.com/apache/arrow-datafusion/blob/e54110fb592e03704da5f6ebd832b8fe1c51123b/datafusion/core/src/catalog/catalog.rs#L29-L47

That makes asynchronously accessing remote storage service impossible while implementing CatalogList/CatalogProvider/SchemaProvider.

Describe alternatives you've considered

Maybe change these traits to async or provide an async version?

Additional context
I see projects like influxdb-iox did some hack to get around this problem.

@v0y4g3r v0y4g3r added the enhancement New feature or request label Oct 10, 2022
@JanKaul
Copy link
Contributor

JanKaul commented Nov 9, 2022

I would also be interested in this in this feature. I tried using futures::executor::block_on but this leads to my execution hanging as described in tokio-rs/tokio#2376.

@alamb
Copy link
Contributor

alamb commented Nov 9, 2022

Hi @JanKaul and @v0y4g3r -- I wonder if the (very) newly added TableProviderFactories may be helpful / fulfill this usecase #4126

@v0y4g3r
Copy link
Author

v0y4g3r commented Nov 10, 2022

I would also be interested in this in this feature. I tried using futures::executor::block_on but this leads to my execution hanging as described in tokio-rs/tokio#2376.

I found a workaround. In this case you can spawn a thread dedicating to block:

use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

trait CatalogList {
    fn catalog(&self, name: &str) -> Option<Arc<CatalogProvider>>;
}

struct CatalogProvider;

#[derive(Default)]
struct CatalogListImpl {
    map: Arc<RwLock<HashMap<String, Arc<CatalogProvider>>>>,
}

impl CatalogList for CatalogListImpl {
    fn catalog(&self, name: &str) -> Option<Arc<CatalogProvider>> {
        let map = self.map.clone();
        let name = name.to_string();
        std::thread::spawn(move || {
            futures::executor::block_on(async {
                map.read().await.get(&name).cloned()
            })
        }).join().unwrap()
    }
}

#[tokio::main]
async fn main() {
    let catalog_list = CatalogListImpl::default();
    assert!(catalog_list.catalog("hello").is_none());
}

@JanKaul
Copy link
Contributor

JanKaul commented Nov 10, 2022

Hi @JanKaul and @v0y4g3r -- I wonder if the (very) newly added TableProviderFactories may be helpful / fulfill this usecase #4126

Hi, thanks for the quick reply. I'm trying to implement an iceberg catalog for datafusion. It functions much like a common catalog with schemas(namespaces) and table identifiers. From what I understood about the TableProviderFactory, it wouldn't really fit the use case. One could use the url field to pass in a table identifer like "my_catalog.my_schema.my_table" and get the TableProvider from that. But I don't think this is what it is intended for.

I found a workaround. In this case you can spawn a thread dedicating to block:

use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

trait CatalogList {
    fn catalog(&self, name: &str) -> Option<Arc<CatalogProvider>>;
}

struct CatalogProvider;

#[derive(Default)]
struct CatalogListImpl {
    map: Arc<RwLock<HashMap<String, Arc<CatalogProvider>>>>,
}

impl CatalogList for CatalogListImpl {
    fn catalog(&self, name: &str) -> Option<Arc<CatalogProvider>> {
        let map = self.map.clone();
        let name = name.to_string();
        std::thread::spawn(move || {
            futures::executor::block_on(async {
                map.read().await.get(&name).cloned()
            })
        }).join().unwrap()
    }
}

#[tokio::main]
async fn main() {
    let catalog_list = CatalogListImpl::default();
    assert!(catalog_list.catalog("hello").is_none());
}

Awesome, thanks for the heads up. It is probably not the cleanest thing to span another thread while the rest of the application is executed with tokio, but it should work for the time being.

@JanKaul
Copy link
Contributor

JanKaul commented Nov 10, 2022

I had a look at the code and async versions of CatalogList/CatalogProvider/SchemaProvider would have a huge impact on the code base and also the external API. A lot would need to be async. I understand that that is not desirable.

The solution with std::thread doesn't work for me because the async function that I'm calling uses hyper which expects an async Context from tokio. I could probably create a new Tokio runtime and spawn a thread from there. But it seems not right to create a second tokio runtime.

Investigating the futures::executor::block_on hang that I was experiencing, it looks like the problem comes from calling a synchronous function using block_on from inside an async tokio context. If block on is called from other synchronous code it works. The solution to the problem would be to wrap the call to the blocking synchronous function from an async context with a spawn_blocking.

I'm not sure if that is a route to go, but I could create a branch with the changes that would need to be included in datafusion.

@alamb
Copy link
Contributor

alamb commented Nov 10, 2022

I am not 100% familiar but I know there has been a bunch of work recently to support delta-lake style catalogs in DataFusion using ListingTableProviders and ListingSchemaProviders and TableProviderFactories. So my pattern matching mind says "can iceberg use the same approach"

Maybe someone like @avantgardnerio @timvw @milenkovicm who worked on these features more directly has some other suggestions

Some related PRs:
#4125
#4112
#4095

Here is
https://github.com/delta-io/delta-rs/blob/main/rust/src/delta_datafusion.rs

The stuff in IOx predates any of these new interfaces, so I would likely not recommend it going forward

cc @crepererum

@crepererum
Copy link
Contributor

IMHO it makes sense to make the catalog interfaces async.

FWIW IOx gets around this because we "discover" the whole catalog/tables/schemas before we build the DataFusion data structures. This mostly works because our APIs are scoped in a way that allows that. However midterm I think even IOx could be improved by async schema exploration, because this would lower our the amount of data we need to unconditionally prefetech/prepare (e.g. we would only need to construct schemas for tables that are actually touched by a query).

@milenkovicm
Copy link
Contributor

milenkovicm commented Nov 10, 2022

IMHO it makes sense to make the catalog interfaces async.

I tend to agree with @crepererum, most of the factory methods are async so this would not be a strange change of signature, it would be backward incompatible, but I guess thats not usually show stopper.

change may not be a small one, as it will end up changing context.rs methods, ContextProvider ...

@timvw
Copy link
Contributor

timvw commented Nov 10, 2022

In https://github.com/datafusion-contrib/datafusion-catalogprovider-glue we also first "discover/fetch" the tables/schemas and as of then it's "static/cached".. Async would enable a whole set of use-cases (eg: refreshing catalog(parts)).

@v0y4g3r
Copy link
Author

v0y4g3r commented Nov 10, 2022

Did an attempt to refactor CatalogList/CatalogProvider/... stuff to async trait, turned out to be not that easy, lots of method defintions need to change.

@avantgardnerio
Copy link
Contributor

Maybe someone like @avantgardnerio

I'm not opposed, as long as we are mindful not to break Ballista (which I don't think we will).

FWIW, when we needed this for Ballista, we just added the async refresh() method and called it at convenient times:

@crepererum
Copy link
Contributor

The issue with refresh() though is that it only enables a small subset of features, namely async full discovery. It doesn't allow for partial schema discovery. For example, if you have a hive dataset and you know the tables by scanning one directory, but there might be thousands of them (I'm exaggerating here) and you wanna spare yourself reading all the _common_metadata files, because the user may only query a single table. We in IOx have a similar situation where it would be nice if we could get away with only constructing the schema for tables we actually need (it's not a major blocker, but I nice to have).

@avantgardnerio
Copy link
Contributor

The issue with refresh()

Makes sense. Also refresh() is just "kicking the can down the road". Someone has to call it, the caller needs to be async anyway, so creating a proper async interface would clean things up.

@JanKaul
Copy link
Contributor

JanKaul commented Nov 10, 2022

If there is a consensus that an async interface is preferable, I can try to make the necessary changes.

@alamb
Copy link
Contributor

alamb commented Nov 10, 2022

I recommend we send send a note to the mailing list / on slack to see if we can make sure we have wide consensus

I remember last time we tried proposing something like this there was non trivial pushback

@alamb
Copy link
Contributor

alamb commented Nov 10, 2022

I am happy to help with that communication

@waynexia
Copy link
Member

Vote for the async interface. Although some workarounds can achieve the needed functionality, an async fetch method looks more intuitive to me. From a positive aspect, the existing sync code inside those providers is compatible with the async version. (but there are still lots of work to do in other places 🥲)

@JanKaul
Copy link
Contributor

JanKaul commented Nov 10, 2022

I am happy to help with that communication

That would be great. Should we draft a more detailed spec for the communication or is it clear as is?

@alamb
Copy link
Contributor

alamb commented Nov 10, 2022

That would be great. Should we draft a more detailed spec for the communication or is it clear as is?

I think a more concise proposal would be a great idea. Perhaps basically explain what methods would become async and we can write a brief intro about what the usecase is (remote catalogs that might need to make async calls to fetch information)

@JanKaul
Copy link
Contributor

JanKaul commented Nov 10, 2022

Is your feature request related to a problem or challenge? Please describe
what you are trying to do.

The traits CatalogList/CatalogProvider/SchemaProvider currently provide a
synchronous interfaces for catalog and schema information. But due to their
synchronous nature it is not possible to implement the traits for use cases
where the information cannot simply be stored in memory. These use cases include
catalog implementations based on:

  • ZooKeeper/Etcd
  • Databases
  • Catalogs for Delta_lake/Iceberg
  • API endpoints

Describe the solution you'd like

The same traits with asynchronous functions that additionally include error handling (return Result). The proposed traits would look like
the following:

[async_trait]
pub trait CatalogList: Sync + Send {
    fn as_any(&self) -> &dyn Any;
    async fn register_catalog(
        &self,
        name: String,
        catalog: Arc<dyn CatalogProvider>
    ) -> Result<Option<Arc<dyn CatalogProvider>>>;
    async fn catalog_names(&self) -> Result<Vec<String>>;
    async fn catalog(&self, name: &str) -> Result<Option<Arc<dyn CatalogProvider>>>;
}
[async_trait]
pub trait CatalogProvider: Sync + Send {
    fn as_any(&self) -> &dyn Any;
    async fn schema_names(&self) -> Result<Vec<String>>;
    async fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>>;

    async fn register_schema(
        &self,
        name: &str,
        schema: Arc<dyn SchemaProvider>
    ) -> Result<Option<Arc<dyn SchemaProvider>>> { ... }
}
[async_trait]
pub trait SchemaProvider: Sync + Send {
    fn as_any(&self) -> &dyn Any;
    async fn table_names(&self) -> Result<Vec<String>>;
    async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>>;
    async fn table_exist(&self, name: &str) -> Result<bool>;

    async fn register_table(
        &self,
        name: String,
        table: Arc<dyn TableProvider>
    ) -> Result<Option<Arc<dyn TableProvider>>> { ... }
    async fn deregister_table(
        &self,
        name: &str
    ) -> Result<Option<Arc<dyn TableProvider>>> { ... }
}

Consequences

These traits are extensively used across the datafusion crate and would require
many internal changes. Additionally, the proposal leads to changes in the public
interface of datafusion. SessionContext would probably be affected the
most by the change. Methods like catalog, register_catalog, table,
register_table will also be async.

Due to the changes in the public interface of datafusion the proposal will lead
to breaking changes for dependent crates such as ballista.

Describe alternatives you've considered

  1. Async call beforehand
    • fetch async information beforehand and provide synchronous interface
    • potentially large overhead
    • doesn't account for catalog/schema changes
  2. use futures::executor::block_on
    • tricky to get it working with tokio
    • potentially exposes complexity to the user

@JanKaul
Copy link
Contributor

JanKaul commented Nov 10, 2022

Here is a draft for a proposal. Feel free to add anything if I forgot something

@alamb
Copy link
Contributor

alamb commented Nov 11, 2022

Looks great -- thank you @JanKaul -- I sent out a "FYI" email / slack notifications and hopefully we'll get enough consensus over the next few days to get a sense of how to proceed

@crepererum
Copy link
Contributor

Regarding all three crates: it seems weird to me that we mix state access (i.e. listing and requesting objects) with modifiers. If you have a completely static / DB-backed / remote catalog, it's unlikely that the modifiers make sense. Instead, modifications (i.e. when/how/if sub-units are found) is a mere implementation detail of the provider/list. In many cases, you may not be able to modify the provider at all. So I would propose to maybe split the modifiers from the actual provider traits? Like:

[async_trait]
pub trait CatalogList: Sync + Send {
    fn as_any(&self) -> &dyn Any;
    async fn catalog_names(&self) -> Result<Vec<String>>;
    async fn catalog(&self, name: &str) -> Result<Option<Arc<dyn CatalogProvider>>>;
}

[async_trait]
pub trait CatalogListWriter: CatalogList {
    fn as_any(&self) -> &dyn Any;

    async fn register_catalog(
        &self,
        name: String,
        catalog: Arc<dyn CatalogProvider>
    ) -> Result<Option<Arc<dyn CatalogProvider>>>;
    async fn deregister_catalog(&self, name: &str) -> Result<Option<Arc<dyn CatalogProvider>>>;
}

Also since the providers now may perform IO, this is a good opportunity to introduce error handling (e.g. make all methods return a Result).

@waynexia
Copy link
Member

Also since the providers now may perform IO, this is a good opportunity to introduce error handling (e.g. make all methods return a Result).

Good point, I agree 👍

So I would propose to maybe split the modifiers from the actual provider traits? Like:

This makes sense in some perspectives, but how would the proposed traits (CatalogList and CatalogListWriter) be used? I.e., which one should SessionState requires. Consider the access operations are now modeled together with modify operations, if we split the CatalogList then SessionState (or the upper level operations) may also need to be separated.

For the situation "not be able to modify the provider" another way is to keep the definition as-is, and leave the detail to the implementor. If it (like a completely static catalog) can not do modification, it can return an error and fail this operation. But I'm not very familiar with other use cases like iceberg catalog, would like to hear more opinions (and thanks in advance).

@crepererum
Copy link
Contributor

For the situation "not be able to modify the provider" another way is to keep the definition as-is, and leave the detail to the implementor. If it (like a completely static catalog) can not do modification, it can return an error and fail this operation. But I'm not very familiar with other use cases like iceberg catalog, would like to hear more opinions (and thanks in advance).

Sounds good to me. If all modifiers return Result, we can easily return DataFusionError::NotImplemented 👍

@JanKaul
Copy link
Contributor

JanKaul commented Nov 11, 2022

I have updated my proposal accordingly

@thinkharderdev
Copy link
Contributor

Does it make sense to make the interface async at both the CatalogProvider and SchemaProvider level? I haven't really looked through the concrete use cases but it seems like we could make SchemaProvider async but still have CatalogProvider be sync. If a particular implementation needed to prefetch metadata (meta-metadata?) then that could be something handled at construction for that implementation.

@alamb
Copy link
Contributor

alamb commented Nov 14, 2022

I think the current async proposal returning Result looks good to me

The Result<Option<_>> is always a little hard to grok at first glance, but I think it does make sense

@JanKaul
Copy link
Contributor

JanKaul commented Nov 16, 2022

Does it make sense to make the interface async at both the CatalogProvider and SchemaProvider level? I haven't really looked through the concrete use cases but it seems like we could make SchemaProvider async but still have CatalogProvider be sync. If a particular implementation needed to prefetch metadata (meta-metadata?) then that could be something handled at construction for that implementation.

That's also a good point. I guess this would work for most use cases. On the other hand I think the biggest burden is to change the public interface of datafusion and update the dependencies. If the dependiencies have to be updated due to changes in SchemaProvider, I think also updating the interface for CatalogProvider is just a minor issue. And one avoids a potential breaking change in the future, in case there are use cases for an async CatalogProvider. Overall, I think having an async interface is a cleaner solution.

@JanKaul
Copy link
Contributor

JanKaul commented Nov 16, 2022

It looks like no one was opposed to the proposal. I guess I can start the implementation.

@JanKaul
Copy link
Contributor

JanKaul commented Nov 17, 2022

Does it make sense to make the interface async at both the CatalogProvider and SchemaProvider level? I haven't really looked through the concrete use cases but it seems like we could make SchemaProvider async but still have CatalogProvider be sync. If a particular implementation needed to prefetch metadata (meta-metadata?) then that could be something handled at construction for that implementation.

After a more detailed look at the source code I would like to come back to your suggestion @thinkharderdev. If we use async traits for CatalogProvider and CatalogListProvider the new constructors of SessionState and SessionContext will be async. Maybe that was obvious, but I didn't think of it.

I would like to get your opinion on which of the following two solutions you prefer:

  1. SchemaProvider, CatalogProvider and CatalogListProvider async
    => SessionContext::new() and SessionState::new() are async

  2. only SchemaProvider async
    => less flexibility with CatalogProvider and CatalogListProvider

React with:
🎉 for solution 1 and
🚀 for solution 2

@tustvold
Copy link
Contributor

tustvold commented Nov 28, 2022

where the information cannot simply be stored in memory.

Looking at the interface of SchemaProvider the only interface it needs is to provide access to TableProvider by name, it doesn't actually need any more information than this.

The constraint then becomes, what information is needed to construct a TableProvider, which boils down to what information TableProvider needs to be able to provide. Currently this is just the schema, there is support for statistics but I'm not sure this is exploited anywhere.

My question is therefore, are there use-cases where the number of tables exceeds what can be stored in memory? If not I don't see a compelling reason to make SchemaProvider async, we can potentially make TableProvider methods async to allow deferred loading of metadata, but SchemaProvider itself I think can remain sync?

A similar argument could be made for CatalogProvider, where you only need to know the schema names up front.

Edit: Apologies for not commenting sooner I've had too many plates lately to keep track of all these things, but seeing the complexity explosion this results in, with self-recursive async functions etc..., I'm a little bit apprehensive about this...

Edit Edit: Making TableProvider::schema async runs into the same issue of ending up making the planner async. I'll keep playing around

@JanKaul
Copy link
Contributor

JanKaul commented Nov 29, 2022

I think the biggest issue with the making SchemaProvider async is that it makes query planning async. The async gets introduced to the query planner in the ContextProvider implementation of SessionState:

impl ContextProvider for SessionState {
    fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
        let resolved_ref = self.resolve_table_ref(name);
        match self.schema_for_ref(resolved_ref) {
            Ok(schema) => {
                let provider = schema.table(resolved_ref.table).ok_or_else(|| { // <= async would be introduced here
                    DataFusionError::Plan(format!(
                        "table '{}.{}.{}' not found",
                        resolved_ref.catalog, resolved_ref.schema, resolved_ref.table
                    ))
                })?;
                Ok(provider_as_source(provider))
            }
            Err(e) => Err(e),
        }
    }
    ...
}

And I don't see a clean solution to make ContextProvider sync while SchemaProvider being async. If you are pre-fetching the tables you could do it already in the SchemaProvider.

One approach could be to have an in memory mirror of the SchemaProvider that stores only the essential data. And everytime its state is changed it schedules an async task to update the actual storage without awaiting the response. This however could lead to data races if multiple users access the storage simultaneously.

Actually, after rethinking my earlier comment, I think it could make sense to make SchemaProvider partially async and ContextProvider sync. One could realize the SchemaProvider as an in memory mirror with a "synchronisation" operation with the underlying storage service. If we look at the SchemaProvider trait it has methods for reading and writing its state. One could make the reading methods sync, by reading from the in memory mirror and make the writing methods async, making sure the changes are propagated to the underlying storage service. Since the reading methods are sync, the ContextProvider trait could also be sync, simplifying the query planning step.

One problem with this is that external changes to the underlying storage system are not automatically reflected in the in memory mirror, thus one would need to make sure to trigger a "synchronisation" operation before using the trait.

Additionally, one could provide sync and async versions for the reading methods, allowing operations that want to be sure the mirror is up to date to use the async versions.

@tustvold
Copy link
Contributor

Sorry for the delay, I plan to spend some time on this this week and try to come up with a proposal.

I think the key thing is to make it easy and obvious how to hook in a remote catalog, without requiring all of planning to become async. I'll see what I can come up with

tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Dec 13, 2022
tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Dec 13, 2022
tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Jan 3, 2023
tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Jan 3, 2023
@tustvold
Copy link
Contributor

tustvold commented Jan 3, 2023

#4607 is now ready for review, PTAL. Hopefully we can get this in to make the release at the end of this week 😄

tustvold added a commit that referenced this issue Jan 5, 2023
* Make SchemaProvider async (#3777)

* Cleanup

* Workaround for ShowCreate

* Clippy and doctest

* Update sqllogictests

* Review feedback

* Format
@JanKaul
Copy link
Contributor

JanKaul commented Jan 6, 2023

Thank you for your efforts!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

10 participants