File tree Expand file tree Collapse file tree 13 files changed +23
-31
lines changed
core/src/serde/physical_plan Expand file tree Collapse file tree 13 files changed +23
-31
lines changed Original file line number Diff line number Diff line change @@ -142,7 +142,7 @@ impl BallistaContext {
142142 let path = fs:: canonicalize ( & path) ?;
143143
144144 // use local DataFusion context for now but later this might call the scheduler
145- let mut ctx = create_datafusion_context ( & self . state . lock ( ) . unwrap ( ) . config ( ) ) ;
145+ let mut ctx = create_datafusion_context ( self . state . lock ( ) . unwrap ( ) . config ( ) ) ;
146146 let df = ctx. read_parquet ( path. to_str ( ) . unwrap ( ) ) ?;
147147 Ok ( df)
148148 }
@@ -159,7 +159,7 @@ impl BallistaContext {
159159 let path = fs:: canonicalize ( & path) ?;
160160
161161 // use local DataFusion context for now but later this might call the scheduler
162- let mut ctx = create_datafusion_context ( & self . state . lock ( ) . unwrap ( ) . config ( ) ) ;
162+ let mut ctx = create_datafusion_context ( self . state . lock ( ) . unwrap ( ) . config ( ) ) ;
163163 let df = ctx. read_csv ( path. to_str ( ) . unwrap ( ) , options) ?;
164164 Ok ( df)
165165 }
@@ -193,7 +193,7 @@ impl BallistaContext {
193193 // use local DataFusion context for now but later this might call the scheduler
194194 // register tables
195195 let state = self . state . lock ( ) . unwrap ( ) ;
196- let mut ctx = create_datafusion_context ( & state. config ( ) ) ;
196+ let mut ctx = create_datafusion_context ( state. config ( ) ) ;
197197 for ( name, plan) in & state. tables {
198198 let plan = ctx. optimize ( plan) ?;
199199 let execution_plan = ctx. create_physical_plan ( & plan) ?;
Original file line number Diff line number Diff line change @@ -131,9 +131,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
131131 }
132132 PhysicalPlanType :: ParquetScan ( scan) => {
133133 let projection = scan. projection . iter ( ) . map ( |i| * i as usize ) . collect ( ) ;
134- let path: & str = & scan. filename [ 0 ] . as_str ( ) ;
134+ let path: & str = scan. filename [ 0 ] . as_str ( ) ;
135135 Ok ( Arc :: new ( ParquetExec :: try_from_path (
136- & path,
136+ path,
137137 Some ( projection) ,
138138 None ,
139139 scan. batch_size as usize ,
Original file line number Diff line number Diff line change @@ -259,7 +259,7 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
259259 let filenames = exec
260260 . partitions ( )
261261 . iter ( )
262- . flat_map ( |part| part. filenames ( ) . to_owned ( ) )
262+ . flat_map ( |part| part. filenames ( ) )
263263 . collect ( ) ;
264264 Ok ( protobuf:: PhysicalPlanNode {
265265 physical_plan_type : Some ( PhysicalPlanType :: ParquetScan (
Original file line number Diff line number Diff line change @@ -316,7 +316,7 @@ impl SchedulerState {
316316 ) )
317317 . unwrap ( ) ;
318318 let task_is_dead = self
319- . reschedule_dead_task ( & referenced_task, & executors)
319+ . reschedule_dead_task ( referenced_task, & executors)
320320 . await ?;
321321 if task_is_dead {
322322 continue ' tasks;
Original file line number Diff line number Diff line change @@ -484,7 +484,7 @@ fn get_table(
484484 let path = format ! ( "{}/{}" , path, table) ;
485485 Ok ( Arc :: new ( ParquetTable :: try_new (
486486 & path,
487- create_datafusion_context_concurrency ( max_concurrency) ,
487+ ExecutionContext :: with_concurrency ( max_concurrency) ,
488488 ) ?) )
489489 }
490490 other => {
@@ -493,11 +493,6 @@ fn get_table(
493493 }
494494}
495495
496- fn create_datafusion_context_concurrency ( n : usize ) -> ExecutionContext {
497- let config = ExecutionConfig :: new ( ) . with_concurrency ( n) ;
498- ExecutionContext :: with_config ( config)
499- }
500-
501496fn get_schema ( table : & str ) -> Schema {
502497 // note that the schema intentionally uses signed integers so that any generated Parquet
503498 // files can also be used to benchmark tools that only support signed integers, such as
Original file line number Diff line number Diff line change @@ -66,8 +66,8 @@ impl CsvFile {
6666 let schema = Arc :: new ( match options. schema {
6767 Some ( s) => s. clone ( ) ,
6868 None => {
69- let filenames =
70- LocalFileSystem { } . list_all_files ( & path, options. file_extension ) ?;
69+ let filenames = LocalFileSystem
70+ . list_all_files ( path. as_str ( ) , options. file_extension ) ?;
7171 if filenames. is_empty ( ) {
7272 return Err ( DataFusionError :: Plan ( format ! (
7373 "No files found at {path} with file extension {file_extension}" ,
Original file line number Diff line number Diff line change @@ -59,7 +59,7 @@ impl NdJsonFile {
5959 schema
6060 } else {
6161 let filenames =
62- LocalFileSystem { } . list_all_files ( & path, options. file_extension ) ?;
62+ LocalFileSystem . list_all_files ( path, options. file_extension ) ?;
6363 if filenames. is_empty ( ) {
6464 return Err ( DataFusionError :: Plan ( format ! (
6565 "No files found at {path} with file extension {file_extension}" ,
Original file line number Diff line number Diff line change @@ -33,7 +33,7 @@ pub struct LocalFileSystem;
3333
3434impl ObjectStore for LocalFileSystem {
3535 fn as_any ( & self ) -> & dyn Any {
36- return self ;
36+ self
3737 }
3838
3939 fn list_all_files ( & self , path : & str , ext : & str ) -> Result < Vec < String > > {
Original file line number Diff line number Diff line change @@ -78,7 +78,7 @@ impl ObjectStoreRegistry {
7878 store : Arc < dyn ObjectStore > ,
7979 ) -> Option < Arc < dyn ObjectStore > > {
8080 let mut stores = self . object_stores . write ( ) . unwrap ( ) ;
81- stores. insert ( scheme. to_string ( ) , store)
81+ stores. insert ( scheme, store)
8282 }
8383
8484 /// Get the store registered for scheme
Original file line number Diff line number Diff line change @@ -58,7 +58,7 @@ impl ParquetTable {
5858 let max_concurrency = context. state . lock ( ) . unwrap ( ) . config . concurrency ;
5959 let root_desc = ParquetRootDesc :: new ( path. as_str ( ) , context) ;
6060 Ok ( Self {
61- path : path . clone ( ) ,
61+ path,
6262 desc : Arc :: new ( root_desc?) ,
6363 max_concurrency,
6464 enable_pruning : true ,
@@ -222,7 +222,7 @@ impl SourceRootDescBuilder for ParquetRootDesc {
222222 } ;
223223
224224 Ok ( PartitionedFile {
225- file_path : file_path . clone ( ) ,
225+ file_path,
226226 schema,
227227 statistics,
228228 partition_value : None ,
You can’t perform that action at this time.
0 commit comments