@@ -35,193 +35,17 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
3535
3636use arrow:: json:: ReaderBuilder ;
3737use arrow:: { datatypes:: SchemaRef , json} ;
38- use datafusion_common:: { Constraints , Statistics } ;
38+ use datafusion_common:: Statistics ;
3939use datafusion_datasource:: file:: FileSource ;
4040use datafusion_datasource:: file_scan_config:: FileScanConfig ;
41- use datafusion_datasource:: source:: DataSourceExec ;
42- use datafusion_execution:: { SendableRecordBatchStream , TaskContext } ;
43- use datafusion_physical_expr:: { EquivalenceProperties , Partitioning } ;
44- use datafusion_physical_expr_common:: sort_expr:: LexOrdering ;
45- use datafusion_physical_plan:: execution_plan:: { Boundedness , EmissionType } ;
46- use datafusion_physical_plan:: metrics:: { ExecutionPlanMetricsSet , MetricsSet } ;
47- use datafusion_physical_plan:: { DisplayAs , DisplayFormatType , PlanProperties } ;
48-
49- use datafusion_datasource:: file_groups:: FileGroup ;
41+ use datafusion_execution:: TaskContext ;
42+ use datafusion_physical_plan:: metrics:: ExecutionPlanMetricsSet ;
43+
5044use futures:: { StreamExt , TryStreamExt } ;
5145use object_store:: buffered:: BufWriter ;
5246use object_store:: { GetOptions , GetResultPayload , ObjectStore } ;
5347use tokio:: io:: AsyncWriteExt ;
5448
55- /// Execution plan for scanning NdJson data source
56- #[ derive( Debug , Clone ) ]
57- #[ deprecated( since = "46.0.0" , note = "use DataSourceExec instead" ) ]
58- pub struct NdJsonExec {
59- inner : DataSourceExec ,
60- base_config : FileScanConfig ,
61- file_compression_type : FileCompressionType ,
62- }
63-
64- #[ allow( unused, deprecated) ]
65- impl NdJsonExec {
66- /// Create a new JSON reader execution plan provided base configurations
67- pub fn new (
68- base_config : FileScanConfig ,
69- file_compression_type : FileCompressionType ,
70- ) -> Self {
71- let (
72- projected_schema,
73- projected_constraints,
74- projected_statistics,
75- projected_output_ordering,
76- ) = base_config. project ( ) ;
77- let cache = Self :: compute_properties (
78- projected_schema,
79- & projected_output_ordering,
80- projected_constraints,
81- & base_config,
82- ) ;
83-
84- let json = JsonSource :: default ( ) ;
85- let base_config = base_config
86- . with_file_compression_type ( file_compression_type)
87- . with_source ( Arc :: new ( json) ) ;
88-
89- Self {
90- inner : DataSourceExec :: new ( Arc :: new ( base_config. clone ( ) ) ) ,
91- file_compression_type : base_config. file_compression_type ,
92- base_config,
93- }
94- }
95-
96- /// Ref to the base configs
97- pub fn base_config ( & self ) -> & FileScanConfig {
98- & self . base_config
99- }
100-
101- /// Ref to file compression type
102- pub fn file_compression_type ( & self ) -> & FileCompressionType {
103- & self . file_compression_type
104- }
105-
106- fn file_scan_config ( & self ) -> FileScanConfig {
107- self . inner
108- . data_source ( )
109- . as_any ( )
110- . downcast_ref :: < FileScanConfig > ( )
111- . unwrap ( )
112- . clone ( )
113- }
114-
115- fn json_source ( & self ) -> JsonSource {
116- let source = self . file_scan_config ( ) ;
117- source
118- . file_source ( )
119- . as_any ( )
120- . downcast_ref :: < JsonSource > ( )
121- . unwrap ( )
122- . clone ( )
123- }
124-
125- fn output_partitioning_helper ( file_scan_config : & FileScanConfig ) -> Partitioning {
126- Partitioning :: UnknownPartitioning ( file_scan_config. file_groups . len ( ) )
127- }
128-
129- /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
130- fn compute_properties (
131- schema : SchemaRef ,
132- orderings : & [ LexOrdering ] ,
133- constraints : Constraints ,
134- file_scan_config : & FileScanConfig ,
135- ) -> PlanProperties {
136- // Equivalence Properties
137- let eq_properties = EquivalenceProperties :: new_with_orderings ( schema, orderings)
138- . with_constraints ( constraints) ;
139-
140- PlanProperties :: new (
141- eq_properties,
142- Self :: output_partitioning_helper ( file_scan_config) , // Output Partitioning
143- EmissionType :: Incremental ,
144- Boundedness :: Bounded ,
145- )
146- }
147-
148- fn with_file_groups ( mut self , file_groups : Vec < FileGroup > ) -> Self {
149- self . base_config . file_groups = file_groups. clone ( ) ;
150- let mut file_source = self . file_scan_config ( ) ;
151- file_source = file_source. with_file_groups ( file_groups) ;
152- self . inner = self . inner . with_data_source ( Arc :: new ( file_source) ) ;
153- self
154- }
155- }
156-
157- #[ allow( unused, deprecated) ]
158- impl DisplayAs for NdJsonExec {
159- fn fmt_as (
160- & self ,
161- t : DisplayFormatType ,
162- f : & mut std:: fmt:: Formatter ,
163- ) -> std:: fmt:: Result {
164- self . inner . fmt_as ( t, f)
165- }
166- }
167-
168- #[ allow( unused, deprecated) ]
169- impl ExecutionPlan for NdJsonExec {
170- fn name ( & self ) -> & ' static str {
171- "NdJsonExec"
172- }
173-
174- fn as_any ( & self ) -> & dyn Any {
175- self
176- }
177- fn properties ( & self ) -> & PlanProperties {
178- self . inner . properties ( )
179- }
180-
181- fn children ( & self ) -> Vec < & Arc < dyn ExecutionPlan > > {
182- Vec :: new ( )
183- }
184-
185- fn with_new_children (
186- self : Arc < Self > ,
187- _: Vec < Arc < dyn ExecutionPlan > > ,
188- ) -> Result < Arc < dyn ExecutionPlan > > {
189- Ok ( self )
190- }
191-
192- fn repartitioned (
193- & self ,
194- target_partitions : usize ,
195- config : & datafusion_common:: config:: ConfigOptions ,
196- ) -> Result < Option < Arc < dyn ExecutionPlan > > > {
197- self . inner . repartitioned ( target_partitions, config)
198- }
199-
200- fn execute (
201- & self ,
202- partition : usize ,
203- context : Arc < TaskContext > ,
204- ) -> Result < SendableRecordBatchStream > {
205- self . inner . execute ( partition, context)
206- }
207-
208- fn statistics ( & self ) -> Result < Statistics > {
209- self . inner . statistics ( )
210- }
211-
212- fn metrics ( & self ) -> Option < MetricsSet > {
213- self . inner . metrics ( )
214- }
215-
216- fn fetch ( & self ) -> Option < usize > {
217- self . inner . fetch ( )
218- }
219-
220- fn with_fetch ( & self , limit : Option < usize > ) -> Option < Arc < dyn ExecutionPlan > > {
221- self . inner . with_fetch ( limit)
222- }
223- }
224-
22549/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
22650pub struct JsonOpener {
22751 batch_size : usize ,
0 commit comments