diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 7416b73971b2..100d7bce440c 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -24,7 +24,7 @@ use crate::{ get_gcs_object_store_builder, get_oss_object_store_builder, get_s3_object_store_builder, }, - print_options::PrintOptions, + print_options::{MaxRows, PrintOptions}, }; use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str}; use datafusion::{ @@ -211,6 +211,15 @@ async fn exec_and_print( let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?; for statement in statements { let plan = ctx.state().statement_to_plan(statement).await?; + + // For plans like `Explain` ignore `MaxRows` option and always display all rows + let should_ignore_maxrows = matches!( + plan, + LogicalPlan::Explain(_) + | LogicalPlan::DescribeTable(_) + | LogicalPlan::Analyze(_) + ); + let df = match &plan { LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) => { create_external_table(ctx, cmd).await?; @@ -220,8 +229,18 @@ async fn exec_and_print( }; let results = df.collect().await?; + + let print_options = if should_ignore_maxrows { + PrintOptions { + maxrows: MaxRows::Unlimited, + ..print_options.clone() + } + } else { + print_options.clone() + }; print_options.print_batches(&results, now)?; } + Ok(()) } diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 25f412608453..33a1caeb1b5b 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -23,7 +23,10 @@ use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::prelude::SessionContext; use datafusion_cli::catalog::DynamicFileCatalog; use datafusion_cli::{ - exec, print_format::PrintFormat, print_options::PrintOptions, DATAFUSION_CLI_VERSION, + exec, + print_format::PrintFormat, + print_options::{MaxRows, PrintOptions}, + DATAFUSION_CLI_VERSION, }; use mimalloc::MiMalloc; use std::collections::HashMap; @@ -122,6 +125,13 @@ struct Args { help = "Specify the memory pool type 'greedy' or 'fair', default to 'greedy'" )] mem_pool_type: Option, + + #[clap( + long, + help = "The max number of rows to display for 'Table' format\n[default: 40] [possible values: numbers(0/10/...), inf(no limit)]", + default_value = "40" + )] + maxrows: MaxRows, } #[tokio::main] @@ -179,6 +189,7 @@ pub async fn main() -> Result<()> { let mut print_options = PrintOptions { format: args.format, quiet: args.quiet, + maxrows: args.maxrows, }; let commands = args.command; diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 8d8c0e4a3941..e2994bc14034 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -16,6 +16,7 @@ // under the License. //! Print format variants +use crate::print_options::MaxRows; use arrow::csv::writer::WriterBuilder; use arrow::json::{ArrayWriter, LineDelimitedWriter}; use arrow::util::pretty::pretty_format_batches_with_options; @@ -70,17 +71,86 @@ fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result String { + let lines: Vec = s.lines().map(String::from).collect(); + + assert!(lines.len() >= maxrows + 4); // 4 lines for top and bottom border + + let last_line = &lines[lines.len() - 1]; // bottom border line + + let spaces = last_line.len().saturating_sub(4); + let dotted_line = format!("| .{: Result { + match maxrows { + MaxRows::Limited(maxrows) => { + // Only format enough batches for maxrows + let mut filtered_batches = Vec::new(); + let mut batches = batches; + let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); + if row_count > maxrows { + let mut accumulated_rows = 0; + + for batch in batches { + filtered_batches.push(batch.clone()); + if accumulated_rows + batch.num_rows() > maxrows { + break; + } + accumulated_rows += batch.num_rows(); + } + + batches = &filtered_batches; + } + + let mut formatted = format!( + "{}", + pretty_format_batches_with_options(batches, &DEFAULT_FORMAT_OPTIONS)?, + ); + + if row_count > maxrows { + formatted = keep_only_maxrows(&formatted, maxrows); + } + + Ok(formatted) + } + MaxRows::Unlimited => { + // maxrows not specified, print all rows + Ok(format!( + "{}", + pretty_format_batches_with_options(batches, &DEFAULT_FORMAT_OPTIONS)?, + )) + } + } +} + impl PrintFormat { /// print the batches to stdout using the specified format - pub fn print_batches(&self, batches: &[RecordBatch]) -> Result<()> { + /// `maxrows` option is only used for `Table` format: + /// If `maxrows` is Some(n), then at most n rows will be displayed + /// If `maxrows` is None, then every row will be displayed + pub fn print_batches(&self, batches: &[RecordBatch], maxrows: MaxRows) -> Result<()> { + if batches.is_empty() { + return Ok(()); + } + match self { Self::Csv => println!("{}", print_batches_with_sep(batches, b',')?), Self::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?), Self::Table => { - println!( - "{}", - pretty_format_batches_with_options(batches, &DEFAULT_FORMAT_OPTIONS)? - ) + if maxrows == MaxRows::Limited(0) { + return Ok(()); + } + println!("{}", format_batches_with_maxrows(batches, maxrows)?,) } Self::Json => println!("{}", batches_to_json!(ArrayWriter, batches)), Self::NdJson => { @@ -157,4 +227,72 @@ mod tests { assert_eq!("{\"a\":1,\"b\":4,\"c\":7}\n{\"a\":2,\"b\":5,\"c\":8}\n{\"a\":3,\"b\":6,\"c\":9}\n", r); Ok(()) } + + #[test] + fn test_format_batches_with_maxrows() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + + let batch = + RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]) + .unwrap(); + + #[rustfmt::skip] + let all_rows_expected = [ + "+---+", + "| a |", + "+---+", + "| 1 |", + "| 2 |", + "| 3 |", + "+---+", + ].join("\n"); + + #[rustfmt::skip] + let one_row_expected = [ + "+---+", + "| a |", + "+---+", + "| 1 |", + "| . |", + "| . |", + "| . |", + "+---+", + ].join("\n"); + + #[rustfmt::skip] + let multi_batches_expected = [ + "+---+", + "| a |", + "+---+", + "| 1 |", + "| 2 |", + "| 3 |", + "| 1 |", + "| 2 |", + "| . |", + "| . |", + "| . |", + "+---+", + ].join("\n"); + + let no_limit = format_batches_with_maxrows(&[batch.clone()], MaxRows::Unlimited)?; + assert_eq!(all_rows_expected, no_limit); + + let maxrows_less_than_actual = + format_batches_with_maxrows(&[batch.clone()], MaxRows::Limited(1))?; + assert_eq!(one_row_expected, maxrows_less_than_actual); + let maxrows_more_than_actual = + format_batches_with_maxrows(&[batch.clone()], MaxRows::Limited(5))?; + assert_eq!(all_rows_expected, maxrows_more_than_actual); + let maxrows_equals_actual = + format_batches_with_maxrows(&[batch.clone()], MaxRows::Limited(3))?; + assert_eq!(all_rows_expected, maxrows_equals_actual); + let multi_batches = format_batches_with_maxrows( + &[batch.clone(), batch.clone(), batch.clone()], + MaxRows::Limited(5), + )?; + assert_eq!(multi_batches_expected, multi_batches); + + Ok(()) + } } diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 33ba7ef0863e..0a6c8d4c36fc 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -18,37 +18,89 @@ use crate::print_format::PrintFormat; use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::Result; +use std::fmt::{Display, Formatter}; +use std::str::FromStr; use std::time::Instant; +#[derive(Debug, Clone, PartialEq, Copy)] +pub enum MaxRows { + /// show all rows in the output + Unlimited, + /// Only show n rows + Limited(usize), +} + +impl FromStr for MaxRows { + type Err = String; + + fn from_str(maxrows: &str) -> Result { + if maxrows.to_lowercase() == "inf" + || maxrows.to_lowercase() == "infinite" + || maxrows.to_lowercase() == "none" + { + Ok(Self::Unlimited) + } else { + match maxrows.parse::() { + Ok(nrows) => Ok(Self::Limited(nrows)), + _ => Err(format!("Invalid maxrows {}. Valid inputs are natural numbers or \'none\', \'inf\', or \'infinite\' for no limit.", maxrows)), + } + } + } +} + +impl Display for MaxRows { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Unlimited => write!(f, "unlimited"), + Self::Limited(max_rows) => write!(f, "at most {max_rows}"), + } + } +} + #[derive(Debug, Clone)] pub struct PrintOptions { pub format: PrintFormat, pub quiet: bool, + pub maxrows: MaxRows, } -fn print_timing_info(row_count: usize, now: Instant) { - println!( - "{} {} in set. Query took {:.3} seconds.\n", +fn get_timing_info_str( + row_count: usize, + maxrows: MaxRows, + query_start_time: Instant, +) -> String { + let row_word = if row_count == 1 { "row" } else { "rows" }; + let nrows_shown_msg = match maxrows { + MaxRows::Limited(nrows) if nrows < row_count => format!(" ({} shown)", nrows), + _ => String::new(), + }; + + format!( + "{} {} in set{}. Query took {:.3} seconds.\n", row_count, - if row_count == 1 { "row" } else { "rows" }, - now.elapsed().as_secs_f64() - ); + row_word, + nrows_shown_msg, + query_start_time.elapsed().as_secs_f64() + ) } impl PrintOptions { /// print the batches to stdout using the specified format - pub fn print_batches(&self, batches: &[RecordBatch], now: Instant) -> Result<()> { - if batches.is_empty() { - if !self.quiet { - print_timing_info(0, now); - } - } else { - self.format.print_batches(batches)?; - if !self.quiet { - let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); - print_timing_info(row_count, now); - } + pub fn print_batches( + &self, + batches: &[RecordBatch], + query_start_time: Instant, + ) -> Result<()> { + let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); + // Elapsed time should not count time for printing batches + let timing_info = get_timing_info_str(row_count, self.maxrows, query_start_time); + + self.format.print_batches(batches, self.maxrows)?; + + if !self.quiet { + println!("{timing_info}"); } + Ok(()) } } diff --git a/docs/source/user-guide/cli.md b/docs/source/user-guide/cli.md index e1f332baf38b..05b4165e612f 100644 --- a/docs/source/user-guide/cli.md +++ b/docs/source/user-guide/cli.md @@ -75,17 +75,42 @@ USAGE: datafusion-cli [OPTIONS] OPTIONS: - -c, --batch-size The batch size of each query, or use DataFusion default - -f, --file ... Execute commands from file(s), then exit - --format [default: table] [possible values: csv, tsv, table, json, - nd-json] - -h, --help Print help information - -m, --memory-limit The memory pool limitation (e.g. '10g'), default to None (no limit) - --mem-pool-type Specify the memory pool type 'greedy' or 'fair', default to 'greedy' - -p, --data-path Path to your data, default to current directory - -q, --quiet Reduce printing other than the results and work quietly - -r, --rc ... Run the provided files on startup instead of ~/.datafusionrc - -V, --version Print version information + -b, --batch-size + The batch size of each query, or use DataFusion default + + -c, --command ... + Execute the given command string(s), then exit + + -f, --file ... + Execute commands from file(s), then exit + + --format + [default: table] [possible values: csv, tsv, table, json, nd-json] + + -h, --help + Print help information + + -m, --memory-limit + The memory pool limitation (e.g. '10g'), default to None (no limit) + + --maxrows + The max number of rows to display for 'Table' format + [default: 40] [possible values: numbers(0/10/...), inf(no limit)] + + --mem-pool-type + Specify the memory pool type 'greedy' or 'fair', default to 'greedy' + + -p, --data-path + Path to your data, default to current directory + + -q, --quiet + Reduce printing other than the results and work quietly + + -r, --rc ... + Run the provided files on startup instead of ~/.datafusionrc + + -V, --version + Print version information ``` ## Querying data from the files directly