@@ -26,6 +26,12 @@ use crate::{
2626 object_storage:: get_object_store,
2727 print_options:: { MaxRows , PrintOptions } ,
2828} ;
29+ use futures:: StreamExt ;
30+ use std:: collections:: HashMap ;
31+ use std:: fs:: File ;
32+ use std:: io:: prelude:: * ;
33+ use std:: io:: BufReader ;
34+
2935use datafusion:: common:: instant:: Instant ;
3036use datafusion:: common:: { plan_datafusion_err, plan_err} ;
3137use datafusion:: config:: ConfigFileType ;
@@ -35,15 +41,13 @@ use datafusion::logical_expr::{DdlStatement, LogicalPlan};
3541use datafusion:: physical_plan:: execution_plan:: EmissionType ;
3642use datafusion:: physical_plan:: { execute_stream, ExecutionPlanProperties } ;
3743use datafusion:: sql:: parser:: { DFParser , Statement } ;
38- use datafusion:: sql:: sqlparser;
3944use datafusion:: sql:: sqlparser:: dialect:: dialect_from_str;
45+
46+ use datafusion:: execution:: memory_pool:: MemoryConsumer ;
47+ use datafusion:: physical_plan:: spill:: get_record_batch_memory_size;
48+ use datafusion:: sql:: sqlparser;
4049use rustyline:: error:: ReadlineError ;
4150use rustyline:: Editor ;
42- use std:: collections:: HashMap ;
43- use std:: fs:: File ;
44- use std:: io:: prelude:: * ;
45- use std:: io:: BufReader ;
46- use std:: sync:: Arc ;
4751use tokio:: signal;
4852
4953/// run and execute SQL statements and commands, against a context with the given print options
@@ -225,17 +229,18 @@ pub(super) async fn exec_and_print(
225229 for statement in statements {
226230 let adjusted =
227231 AdjustedPrintOptions :: new ( print_options. clone ( ) ) . with_statement ( & statement) ;
232+
228233 let plan = create_plan ( ctx, statement) . await ?;
229234 let adjusted = adjusted. with_plan ( & plan) ;
230235
231236 let df = ctx. execute_logical_plan ( plan) . await ?;
232237 let physical_plan = df. create_physical_plan ( ) . await ?;
233238
234- let is_unbounded = physical_plan. boundedness ( ) . is_unbounded ( ) ;
235- let mut stream = execute_stream ( Arc :: clone ( & physical_plan) , task_ctx. clone ( ) ) ?;
239+ // Track memory usage for the query result if it's bounded
240+ let mut reservation =
241+ MemoryConsumer :: new ( "DataFusion-Cli" ) . register ( task_ctx. memory_pool ( ) ) ;
236242
237- // Both bounded and unbounded streams are streaming prints
238- if is_unbounded {
243+ if physical_plan. boundedness ( ) . is_unbounded ( ) {
239244 if physical_plan. pipeline_behavior ( ) == EmissionType :: Final {
240245 return plan_err ! (
241246 "The given query can generate a valid result only once \
@@ -244,43 +249,37 @@ pub(super) async fn exec_and_print(
244249 }
245250 // As the input stream comes, we can generate results.
246251 // However, memory safety is not guaranteed.
247- print_options
248- . print_stream ( MaxRows :: Unlimited , stream, now)
249- . await ?;
252+ let stream = execute_stream ( physical_plan, task_ctx. clone ( ) ) ?;
253+ print_options. print_stream ( stream, now) . await ?;
250254 } else {
251255 // Bounded stream; collected results size is limited by the maxrows option
252256 let schema = physical_plan. schema ( ) ;
257+ let mut stream = execute_stream ( physical_plan, task_ctx. clone ( ) ) ?;
258+ let mut results = vec ! [ ] ;
259+ let mut row_count = 0_usize ;
253260 let max_rows = match print_options. maxrows {
254261 MaxRows :: Unlimited => usize:: MAX ,
255262 MaxRows :: Limited ( n) => n,
256263 } ;
257- let stdout = std :: io :: stdout ( ) ;
258- let mut writer = stdout . lock ( ) ;
259-
260- // If we don't want to print the table, we should use the streaming print same as above
261- if print_options . format != PrintFormat :: Table
262- && print_options . format != PrintFormat :: Automatic
263- {
264- print_options
265- . print_stream ( print_options . maxrows , stream , now )
266- . await ? ;
267- continue ;
264+ while let Some ( batch ) = stream . next ( ) . await {
265+ let batch = batch? ;
266+ let curr_num_rows = batch . num_rows ( ) ;
267+ // Stop collecting results if the number of rows exceeds the limit
268+ // results batch should include the last batch that exceeds the limit
269+ if row_count < max_rows + curr_num_rows {
270+ // Try to grow the reservation to accommodate the batch in memory
271+ reservation . try_grow ( get_record_batch_memory_size ( & batch ) ) ? ;
272+ results . push ( batch ) ;
273+ }
274+ row_count += curr_num_rows ;
268275 }
269-
270- // into_inner will finalize the print options to table if it's automatic
271276 adjusted
272277 . into_inner ( )
273- . print_table_batch (
274- print_options,
275- schema,
276- & mut stream,
277- max_rows,
278- & mut writer,
279- now,
280- )
281- . await ?;
278+ . print_batches ( schema, & results, now, row_count) ?;
279+ reservation. free ( ) ;
282280 }
283281 }
282+
284283 Ok ( ( ) )
285284}
286285
0 commit comments