@@ -20,35 +20,29 @@ use std::sync::Arc;
2020
2121use arrow:: array:: { RecordBatch , StringArray } ;
2222use arrow:: datatypes:: { DataType , Field , FieldRef , Schema , SchemaRef } ;
23- use async_trait:: async_trait;
2423
2524use datafusion:: assert_batches_eq;
26- use datafusion:: catalog:: memory:: DataSourceExec ;
27- use datafusion:: catalog:: { Session , TableProvider } ;
2825use datafusion:: common:: tree_node:: {
2926 Transformed , TransformedResult , TreeNode , TreeNodeRecursion ,
3027} ;
31- use datafusion:: common:: { assert_contains, DFSchema , Result } ;
32- use datafusion:: datasource:: listing:: PartitionedFile ;
33- use datafusion:: datasource:: physical_plan:: { FileScanConfigBuilder , ParquetSource } ;
28+ use datafusion:: common:: { assert_contains, Result } ;
29+ use datafusion:: datasource:: listing:: {
30+ ListingTable , ListingTableConfig , ListingTableUrl ,
31+ } ;
3432use datafusion:: execution:: context:: SessionContext ;
3533use datafusion:: execution:: object_store:: ObjectStoreUrl ;
36- use datafusion:: logical_expr:: utils:: conjunction;
3734use datafusion:: logical_expr:: {
38- ColumnarValue , Expr , ScalarFunctionArgs , ScalarUDF , ScalarUDFImpl , Signature ,
39- TableProviderFilterPushDown , TableType , Volatility ,
35+ ColumnarValue , ScalarFunctionArgs , ScalarUDF , ScalarUDFImpl , Signature , Volatility ,
4036} ;
4137use datafusion:: parquet:: arrow:: ArrowWriter ;
4238use datafusion:: parquet:: file:: properties:: WriterProperties ;
4339use datafusion:: physical_expr:: PhysicalExpr ;
4440use datafusion:: physical_expr:: { expressions, ScalarFunctionExpr } ;
45- use datafusion:: physical_plan:: ExecutionPlan ;
46- use datafusion:: prelude:: { lit, SessionConfig } ;
41+ use datafusion:: prelude:: SessionConfig ;
4742use datafusion:: scalar:: ScalarValue ;
4843use datafusion_physical_expr_adapter:: {
4944 DefaultPhysicalExprAdapterFactory , PhysicalExprAdapter , PhysicalExprAdapterFactory ,
5045} ;
51- use futures:: StreamExt ;
5246use object_store:: memory:: InMemory ;
5347use object_store:: path:: Path ;
5448use object_store:: { ObjectStore , PutPayload } ;
@@ -95,23 +89,29 @@ async fn main() -> Result<()> {
9589 let payload = PutPayload :: from_bytes ( buf. into ( ) ) ;
9690 store. put ( & path, payload) . await ?;
9791
98- // Create a custom table provider that rewrites struct field access
99- let table_provider = Arc :: new ( ExampleTableProvider :: new ( table_schema) ) ;
100-
10192 // Set up query execution
10293 let mut cfg = SessionConfig :: new ( ) ;
10394 cfg. options_mut ( ) . execution . parquet . pushdown_filters = true ;
10495 let ctx = SessionContext :: new_with_config ( cfg) ;
105-
106- // Register our table
107- ctx. register_table ( "structs" , table_provider) ?;
108- ctx. register_udf ( ScalarUDF :: new_from_impl ( JsonGetStr :: default ( ) ) ) ;
109-
11096 ctx. runtime_env ( ) . register_object_store (
11197 ObjectStoreUrl :: parse ( "memory://" ) ?. as_ref ( ) ,
11298 Arc :: new ( store) ,
11399 ) ;
114100
101+ // Create a custom table provider that rewrites struct field access
102+ let listing_table_config =
103+ ListingTableConfig :: new ( ListingTableUrl :: parse ( "memory:///example.parquet" ) ?)
104+ . infer_options ( & ctx. state ( ) )
105+ . await ?
106+ . with_schema ( table_schema)
107+ . with_expr_adapter_factory ( Arc :: new ( ShreddedJsonRewriterFactory ) ) ;
108+ let table = ListingTable :: try_new ( listing_table_config) . unwrap ( ) ;
109+ let table_provider = Arc :: new ( table) ;
110+
111+ // Register our table
112+ ctx. register_table ( "structs" , table_provider) ?;
113+ ctx. register_udf ( ScalarUDF :: new_from_impl ( JsonGetStr :: default ( ) ) ) ;
114+
115115 println ! ( "\n === Showing all data ===" ) ;
116116 let batches = ctx. sql ( "SELECT * FROM structs" ) . await ?. collect ( ) . await ?;
117117 arrow:: util:: pretty:: print_batches ( & batches) ?;
@@ -191,96 +191,6 @@ fn create_sample_record_batch(file_schema: &Schema) -> RecordBatch {
191191 . unwrap ( )
192192}
193193
194- /// Custom TableProvider that uses a StructFieldRewriter
195- #[ derive( Debug ) ]
196- struct ExampleTableProvider {
197- schema : SchemaRef ,
198- }
199-
200- impl ExampleTableProvider {
201- fn new ( schema : SchemaRef ) -> Self {
202- Self { schema }
203- }
204- }
205-
206- #[ async_trait]
207- impl TableProvider for ExampleTableProvider {
208- fn as_any ( & self ) -> & dyn Any {
209- self
210- }
211-
212- fn schema ( & self ) -> SchemaRef {
213- self . schema . clone ( )
214- }
215-
216- fn table_type ( & self ) -> TableType {
217- TableType :: Base
218- }
219-
220- fn supports_filters_pushdown (
221- & self ,
222- filters : & [ & Expr ] ,
223- ) -> Result < Vec < TableProviderFilterPushDown > > {
224- // Implementers can choose to mark these filters as exact or inexact.
225- // If marked as exact they cannot have false positives and must always be applied.
226- // If marked as Inexact they can have false positives and at runtime the rewriter
227- // can decide to not rewrite / ignore some filters since they will be re-evaluated upstream.
228- // For the purposes of this example we mark them as Exact to demonstrate the rewriter is working and the filtering is not being re-evaluated upstream.
229- Ok ( vec ! [ TableProviderFilterPushDown :: Exact ; filters. len( ) ] )
230- }
231-
232- async fn scan (
233- & self ,
234- state : & dyn Session ,
235- projection : Option < & Vec < usize > > ,
236- filters : & [ Expr ] ,
237- limit : Option < usize > ,
238- ) -> Result < Arc < dyn ExecutionPlan > > {
239- let schema = self . schema . clone ( ) ;
240- let df_schema = DFSchema :: try_from ( schema. clone ( ) ) ?;
241- let filter = state. create_physical_expr (
242- conjunction ( filters. iter ( ) . cloned ( ) ) . unwrap_or_else ( || lit ( true ) ) ,
243- & df_schema,
244- ) ?;
245-
246- let parquet_source = ParquetSource :: default ( )
247- . with_predicate ( filter)
248- . with_pushdown_filters ( true ) ;
249-
250- let object_store_url = ObjectStoreUrl :: parse ( "memory://" ) ?;
251-
252- let store = state. runtime_env ( ) . object_store ( object_store_url) ?;
253-
254- let mut files = vec ! [ ] ;
255- let mut listing = store. list ( None ) ;
256- while let Some ( file) = listing. next ( ) . await {
257- if let Ok ( file) = file {
258- files. push ( file) ;
259- }
260- }
261-
262- let file_group = files
263- . iter ( )
264- . map ( |file| PartitionedFile :: new ( file. location . clone ( ) , file. size ) )
265- . collect ( ) ;
266-
267- let file_scan_config = FileScanConfigBuilder :: new (
268- ObjectStoreUrl :: parse ( "memory://" ) ?,
269- schema,
270- Arc :: new ( parquet_source) ,
271- )
272- . with_projection ( projection. cloned ( ) )
273- . with_limit ( limit)
274- . with_file_group ( file_group)
275- // if the rewriter needs a reference to the table schema you can bind self.schema() here
276- . with_expr_adapter ( Some ( Arc :: new ( ShreddedJsonRewriterFactory ) as _ ) ) ;
277-
278- Ok ( Arc :: new ( DataSourceExec :: new ( Arc :: new (
279- file_scan_config. build ( ) ,
280- ) ) ) )
281- }
282- }
283-
284194/// Scalar UDF that uses serde_json to access json fields
285195#[ derive( Debug , PartialEq , Eq , Hash ) ]
286196pub struct JsonGetStr {
0 commit comments