3232use arrow:: array:: record_batch;
3333use arrow_schema:: { Field , Fields , Schema , SchemaRef } ;
3434use async_trait:: async_trait;
35- use datafusion:: catalog:: { SchemaProvider , TableProvider } ;
36- use datafusion:: common:: DataFusionError ;
35+ use datafusion:: catalog:: TableProvider ;
3736use datafusion:: common:: Result ;
3837use datafusion:: execution:: SendableRecordBatchStream ;
3938use datafusion:: physical_plan:: memory:: MemoryExec ;
4039use datafusion:: physical_plan:: stream:: RecordBatchStreamAdapter ;
4140use datafusion:: physical_plan:: ExecutionPlan ;
4241use datafusion:: prelude:: { DataFrame , SessionContext } ;
43- use datafusion_catalog:: Session ;
44- use datafusion_common:: {
45- assert_batches_eq, internal_datafusion_err, plan_err, HashMap , TableReference ,
46- } ;
42+ use datafusion_catalog:: { AsyncSchemaProvider , Session } ;
43+ use datafusion_common:: { assert_batches_eq, internal_datafusion_err, plan_err} ;
4744use datafusion_expr:: { Expr , TableType } ;
4845use futures:: TryStreamExt ;
4946use std:: any:: Any ;
50- use std:: sync:: { Arc , Mutex } ;
47+ use std:: sync:: Arc ;
5148
5249#[ tokio:: main]
5350async fn main ( ) -> Result < ( ) > {
5451 // As always, we create a session context to interact with DataFusion
5552 let ctx = SessionContext :: new ( ) ;
5653
5754 // Make a connection to the remote catalog, asynchronously, and configure it
58- let remote_catalog_interface = RemoteCatalogInterface :: connect ( ) . await ?;
59-
60- // Register a SchemaProvider for tables in a schema named "remote_schema".
61- //
62- // This will let DataFusion query tables such as
63- // `datafusion.remote_schema.remote_table`
64- let remote_schema: Arc < dyn SchemaProvider > =
65- Arc :: new ( RemoteSchema :: new ( Arc :: new ( remote_catalog_interface) ) ) ;
66- ctx. catalog ( "datafusion" )
67- . ok_or_else ( || internal_datafusion_err ! ( "default catalog was not installed" ) ) ?
68- . register_schema ( "remote_schema" , Arc :: clone ( & remote_schema) ) ?;
55+ let remote_catalog_interface = Arc :: new ( RemoteCatalogInterface :: connect ( ) . await ?) ;
56+
57+ // Create an adapter to provide the AsyncSchemaProvider interface to DataFusion
58+ // based on our remote catalog interface
59+ let remote_catalog_adapter = RemoteCatalogDatafusionAdapter ( remote_catalog_interface) ;
6960
7061 // Here is a query that selects data from a table in the remote catalog.
7162 let sql = "SELECT * from remote_schema.remote_table" ;
7263
7364 // The `SessionContext::sql` interface is async, but it does not
74- // support asynchronous access to catalogs, so the following query errors.
65+ // support asynchronous access to catalogs, so we cannot register our schema provider
66+ // directly and the following query fails to find our table.
7567 let results = ctx. sql ( sql) . await ;
7668 assert_eq ! (
7769 results. unwrap_err( ) . to_string( ) ,
@@ -91,27 +83,26 @@ async fn main() -> Result<()> {
9183 // `remote_schema.remote_table`)
9284 let references = state. resolve_table_references ( & statement) ?;
9385
94- // Call `load_tables` to load information from the remote catalog for each
95- // of the referenced tables. Best practice is to fetch the the information
96- // for all tables required by the query once (rather than one per table) to
97- // minimize network overhead
98- let table_names = references. iter ( ) . filter_map ( |r| {
99- if refers_to_schema ( "datafusion" , "remote_schema" , r) {
100- Some ( r. table ( ) )
101- } else {
102- None
103- }
104- } ) ;
105- remote_schema
106- . as_any ( )
107- . downcast_ref :: < RemoteSchema > ( )
108- . expect ( "correct types" )
109- . load_tables ( table_names)
86+ // Now we can asynchronously resolve the table references to get a cached catalog
87+ // that we can use for our query
88+ let resolved_catalog = remote_catalog_adapter
89+ . resolve ( & references, state. config ( ) )
11090 . await ?;
11191
112- // Now continue planing the query after having fetched the remote table and
113- // it can run as normal
114- let plan = state. statement_to_plan ( statement) . await ?;
92+ // This resolved catalog only makes sense for this query and so we create a clone
93+ // of the session context with the resolved catalog
94+ let query_ctx = ctx. clone ( ) ;
95+
96+ query_ctx
97+ . catalog ( "datafusion" )
98+ . ok_or_else ( || internal_datafusion_err ! ( "default catalog was not installed" ) ) ?
99+ . register_schema ( "remote_schema" , resolved_catalog) ?;
100+
101+ // We can now continue planning the query with this new query-specific context that
102+ // contains our cached catalog
103+ let query_state = query_ctx. state ( ) ;
104+
105+ let plan = query_state. statement_to_plan ( statement) . await ?;
115106 let results = DataFrame :: new ( state, plan) . collect ( ) . await ?;
116107 assert_batches_eq ! (
117108 [
@@ -145,9 +136,9 @@ impl RemoteCatalogInterface {
145136 }
146137
147138 /// Fetches information for a specific table
148- pub async fn table_info ( & self , name : & str ) -> Result < SchemaRef > {
139+ pub async fn table_info ( & self , name : & str ) -> Result < Option < SchemaRef > > {
149140 if name != "remote_table" {
150- return plan_err ! ( "Remote table not found: {}" , name ) ;
141+ return Ok ( None ) ;
151142 }
152143
153144 // In this example, we'll model a remote table with columns "id" and
@@ -159,7 +150,7 @@ impl RemoteCatalogInterface {
159150 Field :: new( "id" , arrow:: datatypes:: DataType :: Int32 , false ) ,
160151 Field :: new( "name" , arrow:: datatypes:: DataType :: Utf8 , false ) ,
161152 ] ) ) ;
162- Ok ( Arc :: new ( schema) )
153+ Ok ( Some ( Arc :: new ( schema) ) )
163154 }
164155
165156 /// Fetches data for a table from a remote data source
@@ -186,95 +177,30 @@ impl RemoteCatalogInterface {
186177 }
187178}
188179
189- /// Implements the DataFusion Catalog API interface for tables
180+ /// Implements the DataFusion SchemaProvider API for tables
190181/// stored in a remote catalog.
191- #[ derive( Debug ) ]
192- struct RemoteSchema {
193- /// Connection with the remote catalog
194- remote_catalog_interface : Arc < RemoteCatalogInterface > ,
195- /// Local cache of tables that have been preloaded from the remote
196- /// catalog
197- tables : Mutex < HashMap < String , Arc < dyn TableProvider > > > ,
198- }
199-
200- impl RemoteSchema {
201- /// Create a new RemoteSchema
202- pub fn new ( remote_catalog_interface : Arc < RemoteCatalogInterface > ) -> Self {
203- Self {
204- remote_catalog_interface,
205- tables : Mutex :: new ( HashMap :: new ( ) ) ,
206- }
207- }
208-
209- /// Load information for the specified tables from the remote source into
210- /// the local cached copy.
211- pub async fn load_tables (
212- & self ,
213- references : impl IntoIterator < Item = & str > ,
214- ) -> Result < ( ) > {
215- for table_name in references {
216- if !self . table_exist ( table_name) {
217- // Fetch information about the table from the remote catalog
218- //
219- // Note that a real remote catalog interface could return more
220- // information, but at the minimum, DataFusion requires the
221- // table's schema for planing.
222- let schema = self . remote_catalog_interface . table_info ( table_name) . await ?;
223- let remote_table = RemoteTable :: new (
224- Arc :: clone ( & self . remote_catalog_interface ) ,
225- table_name,
226- schema,
227- ) ;
228-
229- // Add the table to our local cached list
230- self . tables
231- . lock ( )
232- . expect ( "mutex invalid" )
233- . insert ( table_name. to_string ( ) , Arc :: new ( remote_table) ) ;
234- } ;
235- }
236- Ok ( ( ) )
237- }
238- }
182+ struct RemoteCatalogDatafusionAdapter ( Arc < RemoteCatalogInterface > ) ;
239183
240- /// Implement the DataFusion Catalog API for [`RemoteSchema`]
241184#[ async_trait]
242- impl SchemaProvider for RemoteSchema {
243- fn as_any ( & self ) -> & dyn Any {
244- self
245- }
246-
247- fn table_names ( & self ) -> Vec < String > {
248- // Note this API is not async so we can't directly call the RemoteCatalogInterface
249- // instead we use the cached list of loaded tables
250- self . tables
251- . lock ( )
252- . expect ( "mutex valid" )
253- . keys ( )
254- . cloned ( )
255- . collect ( )
185+ impl AsyncSchemaProvider for RemoteCatalogDatafusionAdapter {
186+ fn name ( & self ) -> & str {
187+ "remote_schema"
256188 }
257189
258- // While this API is actually `async` and thus could consult a remote
259- // catalog directly it is more efficient to use a local cached copy instead,
260- // which is what we model in this example
261- async fn table (
262- & self ,
263- name : & str ,
264- ) -> Result < Option < Arc < dyn TableProvider > > , DataFusionError > {
265- // Look for any pre-loaded tables
266- let table = self
267- . tables
268- . lock ( )
269- . expect ( "mutex valid" )
270- . get ( name)
271- . map ( Arc :: clone) ;
272- Ok ( table)
190+ fn catalog_name ( & self ) -> & str {
191+ "datafusion"
273192 }
274193
275- fn table_exist ( & self , name : & str ) -> bool {
276- // Look for any pre-loaded tables, note this function is also `async`
277- self . tables . lock ( ) . expect ( "mutex valid" ) . contains_key ( name)
194+ async fn table ( & self , name : & str ) -> Result < Option < Arc < dyn TableProvider > > > {
195+ // Fetch information about the table from the remote catalog
196+ //
197+ // Note that a real remote catalog interface could return more
198+ // information, but at the minimum, DataFusion requires the
199+ // table's schema for planing.
200+ Ok ( self . 0 . table_info ( name) . await ?. map ( |schema| {
201+ Arc :: new ( RemoteTable :: new ( Arc :: clone ( & self . 0 ) , name, schema) )
202+ as Arc < dyn TableProvider >
203+ } ) )
278204 }
279205}
280206
@@ -343,27 +269,3 @@ impl TableProvider for RemoteTable {
343269 ) ?) )
344270 }
345271}
346-
347- /// Return true if this `table_reference` might be for a table in the specified
348- /// catalog and schema.
349- fn refers_to_schema (
350- catalog_name : & str ,
351- schema_name : & str ,
352- table_reference : & TableReference ,
353- ) -> bool {
354- // Check the references are in the correct catalog and schema
355- // references like foo.bar.baz
356- if let Some ( catalog) = table_reference. catalog ( ) {
357- if catalog != catalog_name {
358- return false ;
359- }
360- }
361- // references like bar.baz
362- if let Some ( schema) = table_reference. schema ( ) {
363- if schema != schema_name {
364- return false ;
365- }
366- }
367-
368- true
369- }
0 commit comments