Skip to content

Commit

Permalink
feat: Better large output display in datafusion-cli with --maxrows op…
Browse files Browse the repository at this point in the history
…tion (#7617)

* Better large output display for CLI

* review comments
  • Loading branch information
2010YOUY01 authored Sep 22, 2023
1 parent ee9078d commit 963680d
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 35 deletions.
21 changes: 20 additions & 1 deletion datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
get_gcs_object_store_builder, get_oss_object_store_builder,
get_s3_object_store_builder,
},
print_options::PrintOptions,
print_options::{MaxRows, PrintOptions},
};
use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str};
use datafusion::{
Expand Down Expand Up @@ -211,6 +211,15 @@ async fn exec_and_print(
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
let plan = ctx.state().statement_to_plan(statement).await?;

// For plans like `Explain` ignore `MaxRows` option and always display all rows
let should_ignore_maxrows = matches!(
plan,
LogicalPlan::Explain(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Analyze(_)
);

let df = match &plan {
LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) => {
create_external_table(ctx, cmd).await?;
Expand All @@ -220,8 +229,18 @@ async fn exec_and_print(
};

let results = df.collect().await?;

let print_options = if should_ignore_maxrows {
PrintOptions {
maxrows: MaxRows::Unlimited,
..print_options.clone()
}
} else {
print_options.clone()
};
print_options.print_batches(&results, now)?;
}

Ok(())
}

Expand Down
13 changes: 12 additions & 1 deletion datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicFileCatalog;
use datafusion_cli::{
exec, print_format::PrintFormat, print_options::PrintOptions, DATAFUSION_CLI_VERSION,
exec,
print_format::PrintFormat,
print_options::{MaxRows, PrintOptions},
DATAFUSION_CLI_VERSION,
};
use mimalloc::MiMalloc;
use std::collections::HashMap;
Expand Down Expand Up @@ -122,6 +125,13 @@ struct Args {
help = "Specify the memory pool type 'greedy' or 'fair', default to 'greedy'"
)]
mem_pool_type: Option<PoolType>,

#[clap(
long,
help = "The max number of rows to display for 'Table' format\n[default: 40] [possible values: numbers(0/10/...), inf(no limit)]",
default_value = "40"
)]
maxrows: MaxRows,
}

#[tokio::main]
Expand Down Expand Up @@ -179,6 +189,7 @@ pub async fn main() -> Result<()> {
let mut print_options = PrintOptions {
format: args.format,
quiet: args.quiet,
maxrows: args.maxrows,
};

let commands = args.command;
Expand Down
148 changes: 143 additions & 5 deletions datafusion-cli/src/print_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

//! Print format variants
use crate::print_options::MaxRows;
use arrow::csv::writer::WriterBuilder;
use arrow::json::{ArrayWriter, LineDelimitedWriter};
use arrow::util::pretty::pretty_format_batches_with_options;
Expand Down Expand Up @@ -70,17 +71,86 @@ fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result<Stri
Ok(formatted)
}

fn keep_only_maxrows(s: &str, maxrows: usize) -> String {
let lines: Vec<String> = s.lines().map(String::from).collect();

assert!(lines.len() >= maxrows + 4); // 4 lines for top and bottom border

let last_line = &lines[lines.len() - 1]; // bottom border line

let spaces = last_line.len().saturating_sub(4);
let dotted_line = format!("| .{:<spaces$}|", "", spaces = spaces);

let mut result = lines[0..(maxrows + 3)].to_vec(); // Keep top border and `maxrows` lines
result.extend(vec![dotted_line; 3]); // Append ... lines
result.push(last_line.clone());

result.join("\n")
}

fn format_batches_with_maxrows(
batches: &[RecordBatch],
maxrows: MaxRows,
) -> Result<String> {
match maxrows {
MaxRows::Limited(maxrows) => {
// Only format enough batches for maxrows
let mut filtered_batches = Vec::new();
let mut batches = batches;
let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
if row_count > maxrows {
let mut accumulated_rows = 0;

for batch in batches {
filtered_batches.push(batch.clone());
if accumulated_rows + batch.num_rows() > maxrows {
break;
}
accumulated_rows += batch.num_rows();
}

batches = &filtered_batches;
}

let mut formatted = format!(
"{}",
pretty_format_batches_with_options(batches, &DEFAULT_FORMAT_OPTIONS)?,
);

if row_count > maxrows {
formatted = keep_only_maxrows(&formatted, maxrows);
}

Ok(formatted)
}
MaxRows::Unlimited => {
// maxrows not specified, print all rows
Ok(format!(
"{}",
pretty_format_batches_with_options(batches, &DEFAULT_FORMAT_OPTIONS)?,
))
}
}
}

impl PrintFormat {
/// print the batches to stdout using the specified format
pub fn print_batches(&self, batches: &[RecordBatch]) -> Result<()> {
/// `maxrows` option is only used for `Table` format:
/// If `maxrows` is Some(n), then at most n rows will be displayed
/// If `maxrows` is None, then every row will be displayed
pub fn print_batches(&self, batches: &[RecordBatch], maxrows: MaxRows) -> Result<()> {
if batches.is_empty() {
return Ok(());
}

match self {
Self::Csv => println!("{}", print_batches_with_sep(batches, b',')?),
Self::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?),
Self::Table => {
println!(
"{}",
pretty_format_batches_with_options(batches, &DEFAULT_FORMAT_OPTIONS)?
)
if maxrows == MaxRows::Limited(0) {
return Ok(());
}
println!("{}", format_batches_with_maxrows(batches, maxrows)?,)
}
Self::Json => println!("{}", batches_to_json!(ArrayWriter, batches)),
Self::NdJson => {
Expand Down Expand Up @@ -157,4 +227,72 @@ mod tests {
assert_eq!("{\"a\":1,\"b\":4,\"c\":7}\n{\"a\":2,\"b\":5,\"c\":8}\n{\"a\":3,\"b\":6,\"c\":9}\n", r);
Ok(())
}

#[test]
fn test_format_batches_with_maxrows() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));

let batch =
RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))])
.unwrap();

#[rustfmt::skip]
let all_rows_expected = [
"+---+",
"| a |",
"+---+",
"| 1 |",
"| 2 |",
"| 3 |",
"+---+",
].join("\n");

#[rustfmt::skip]
let one_row_expected = [
"+---+",
"| a |",
"+---+",
"| 1 |",
"| . |",
"| . |",
"| . |",
"+---+",
].join("\n");

#[rustfmt::skip]
let multi_batches_expected = [
"+---+",
"| a |",
"+---+",
"| 1 |",
"| 2 |",
"| 3 |",
"| 1 |",
"| 2 |",
"| . |",
"| . |",
"| . |",
"+---+",
].join("\n");

let no_limit = format_batches_with_maxrows(&[batch.clone()], MaxRows::Unlimited)?;
assert_eq!(all_rows_expected, no_limit);

let maxrows_less_than_actual =
format_batches_with_maxrows(&[batch.clone()], MaxRows::Limited(1))?;
assert_eq!(one_row_expected, maxrows_less_than_actual);
let maxrows_more_than_actual =
format_batches_with_maxrows(&[batch.clone()], MaxRows::Limited(5))?;
assert_eq!(all_rows_expected, maxrows_more_than_actual);
let maxrows_equals_actual =
format_batches_with_maxrows(&[batch.clone()], MaxRows::Limited(3))?;
assert_eq!(all_rows_expected, maxrows_equals_actual);
let multi_batches = format_batches_with_maxrows(
&[batch.clone(), batch.clone(), batch.clone()],
MaxRows::Limited(5),
)?;
assert_eq!(multi_batches_expected, multi_batches);

Ok(())
}
}
86 changes: 69 additions & 17 deletions datafusion-cli/src/print_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,89 @@
use crate::print_format::PrintFormat;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use std::time::Instant;

#[derive(Debug, Clone, PartialEq, Copy)]
pub enum MaxRows {
/// show all rows in the output
Unlimited,
/// Only show n rows
Limited(usize),
}

impl FromStr for MaxRows {
type Err = String;

fn from_str(maxrows: &str) -> Result<Self, Self::Err> {
if maxrows.to_lowercase() == "inf"
|| maxrows.to_lowercase() == "infinite"
|| maxrows.to_lowercase() == "none"
{
Ok(Self::Unlimited)
} else {
match maxrows.parse::<usize>() {
Ok(nrows) => Ok(Self::Limited(nrows)),
_ => Err(format!("Invalid maxrows {}. Valid inputs are natural numbers or \'none\', \'inf\', or \'infinite\' for no limit.", maxrows)),
}
}
}
}

impl Display for MaxRows {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Unlimited => write!(f, "unlimited"),
Self::Limited(max_rows) => write!(f, "at most {max_rows}"),
}
}
}

#[derive(Debug, Clone)]
pub struct PrintOptions {
pub format: PrintFormat,
pub quiet: bool,
pub maxrows: MaxRows,
}

fn print_timing_info(row_count: usize, now: Instant) {
println!(
"{} {} in set. Query took {:.3} seconds.\n",
fn get_timing_info_str(
row_count: usize,
maxrows: MaxRows,
query_start_time: Instant,
) -> String {
let row_word = if row_count == 1 { "row" } else { "rows" };
let nrows_shown_msg = match maxrows {
MaxRows::Limited(nrows) if nrows < row_count => format!(" ({} shown)", nrows),
_ => String::new(),
};

format!(
"{} {} in set{}. Query took {:.3} seconds.\n",
row_count,
if row_count == 1 { "row" } else { "rows" },
now.elapsed().as_secs_f64()
);
row_word,
nrows_shown_msg,
query_start_time.elapsed().as_secs_f64()
)
}

impl PrintOptions {
/// print the batches to stdout using the specified format
pub fn print_batches(&self, batches: &[RecordBatch], now: Instant) -> Result<()> {
if batches.is_empty() {
if !self.quiet {
print_timing_info(0, now);
}
} else {
self.format.print_batches(batches)?;
if !self.quiet {
let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
print_timing_info(row_count, now);
}
pub fn print_batches(
&self,
batches: &[RecordBatch],
query_start_time: Instant,
) -> Result<()> {
let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
// Elapsed time should not count time for printing batches
let timing_info = get_timing_info_str(row_count, self.maxrows, query_start_time);

self.format.print_batches(batches, self.maxrows)?;

if !self.quiet {
println!("{timing_info}");
}

Ok(())
}
}
Loading

0 comments on commit 963680d

Please sign in to comment.