Skip to content

Commit

Permalink
count, joinp, sqlp: use new OptFlags instead of OptState
Browse files Browse the repository at this point in the history
optimize init of OptFlags struct
  • Loading branch information
jqnatividad committed Aug 28, 2024
1 parent 4319a9d commit 7762f8f
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 43 deletions.
27 changes: 14 additions & 13 deletions src/cmd/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ pub fn polars_count_input(
low_memory: bool,
) -> Result<(u64, usize), crate::clitypes::CliError> {
use polars::{
lazy::frame::{LazyFrame, OptState},
lazy::frame::{LazyFrame, OptFlags},
prelude::*,
sql::SQLContext,
};
Expand Down Expand Up @@ -240,18 +240,19 @@ pub fn polars_count_input(
return Ok((count_regular, 0));
},
};
let mut optimization_state = OptState::default();
optimization_state |= OptState::PROJECTION_PUSHDOWN
| OptState::PREDICATE_PUSHDOWN
| OptState::CLUSTER_WITH_COLUMNS
| OptState::TYPE_COERCION
| OptState::SIMPLIFY_EXPR
| OptState::FILE_CACHING
| OptState::SLICE_PUSHDOWN
| OptState::COMM_SUBEXPR_ELIM
| OptState::FAST_PROJECTION
| OptState::STREAMING;
ctx.register("sql_lf", lazy_df.with_optimizations(optimization_state));
let optflags = OptFlags::from_bits_truncate(0)
| OptFlags::PROJECTION_PUSHDOWN
| OptFlags::PREDICATE_PUSHDOWN
| OptFlags::CLUSTER_WITH_COLUMNS
| OptFlags::TYPE_COERCION
| OptFlags::SIMPLIFY_EXPR
| OptFlags::FILE_CACHING
| OptFlags::SLICE_PUSHDOWN
| OptFlags::COMM_SUBPLAN_ELIM
| OptFlags::COMM_SUBEXPR_ELIM
| OptFlags::FAST_PROJECTION
| OptFlags::STREAMING;
ctx.register("sql_lf", lazy_df.with_optimizations(optflags));
"SELECT COUNT(*) FROM sql_lf".to_string()
};

Expand Down
34 changes: 22 additions & 12 deletions src/cmd/joinp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,22 +412,32 @@ impl JoinStruct {
JoinCoalesce::JoinSpecific
};

let mut optimization_state = polars::lazy::frame::OptState::default();
if self.streaming {
optimization_state |= OptState::STREAMING;
}
if self.no_optimizations {
optimization_state = OptState::from_bits_truncate(0) | OptState::TYPE_COERCION;
}
let mut optflags = if self.no_optimizations {
OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION
} else {
OptFlags::PROJECTION_PUSHDOWN
| OptFlags::PREDICATE_PUSHDOWN
| OptFlags::CLUSTER_WITH_COLUMNS
| OptFlags::TYPE_COERCION
| OptFlags::SIMPLIFY_EXPR
| OptFlags::FILE_CACHING
| OptFlags::SLICE_PUSHDOWN
| OptFlags::COMM_SUBPLAN_ELIM
| OptFlags::COMM_SUBEXPR_ELIM
| OptFlags::ROW_ESTIMATE
| OptFlags::FAST_PROJECTION
};

optflags.set(OptFlags::STREAMING, self.streaming);

log::debug!("Optimization state: {optimization_state:?}");
// log::debug!("Optimization state: {optimization_state:?}");

let join_results = if jointype == JoinType::Cross {
// cross join doesn't need join columns
self.left_lf
.with_optimizations(optimization_state)
.with_optimizations(optflags)
.join_builder()
.with(self.right_lf.with_optimizations(optimization_state))
.with(self.right_lf.with_optimizations(optflags))
.how(JoinType::Cross)
.coalesce(coalesce_flag)
.allow_parallel(true)
Expand All @@ -453,9 +463,9 @@ impl JoinStruct {
}

self.left_lf
.with_optimizations(optimization_state)
.with_optimizations(optflags)
.join_builder()
.with(self.right_lf.with_optimizations(optimization_state))
.with(self.right_lf.with_optimizations(optflags))
.left_on(left_selcols)
.right_on(right_selcols)
.how(jointype)
Expand Down
35 changes: 17 additions & 18 deletions src/cmd/sqlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ use polars::{
io::avro::{AvroWriter, Compression as AvroCompression},
prelude::{
CsvWriter, DataFrame, GzipLevel, IpcCompression, IpcWriter, JsonFormat, JsonWriter,
LazyCsvReader, LazyFileListReader, NullValues, OptState, ParquetCompression, ParquetWriter,
LazyCsvReader, LazyFileListReader, NullValues, OptFlags, ParquetCompression, ParquetWriter,
SerWriter, StatisticsOptions, ZstdLevel,
},
sql::SQLContext,
Expand Down Expand Up @@ -596,24 +596,23 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
None
};

let mut optimization_state = if args.flag_no_optimizations {
OptState::empty()
let mut optflags = if args.flag_no_optimizations {
OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION
} else {
OptState::PROJECTION_PUSHDOWN
| OptState::PREDICATE_PUSHDOWN
| OptState::CLUSTER_WITH_COLUMNS
| OptState::TYPE_COERCION
| OptState::SIMPLIFY_EXPR
| OptState::FILE_CACHING
| OptState::SLICE_PUSHDOWN
| OptState::COMM_SUBEXPR_ELIM
| OptState::FAST_PROJECTION
| OptState::ROW_ESTIMATE
OptFlags::PROJECTION_PUSHDOWN
| OptFlags::PREDICATE_PUSHDOWN
| OptFlags::CLUSTER_WITH_COLUMNS
| OptFlags::TYPE_COERCION
| OptFlags::SIMPLIFY_EXPR
| OptFlags::FILE_CACHING
| OptFlags::SLICE_PUSHDOWN
| OptFlags::COMM_SUBPLAN_ELIM
| OptFlags::COMM_SUBEXPR_ELIM
| OptFlags::ROW_ESTIMATE
| OptFlags::FAST_PROJECTION
};

if args.flag_streaming {
optimization_state |= OptState::STREAMING;
}
optflags.set(OptFlags::STREAMING, args.flag_streaming);

// check if the input is a SQL script (ends with .sql)
let is_sql_script = std::path::Path::new(&args.arg_sql)
Expand All @@ -638,7 +637,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
// for the optimization state struct
let debuglog_flag = log::log_enabled!(log::Level::Debug);
if debuglog_flag {
log::debug!("Optimization state: {optimization_state:?}");
log::debug!("Optimization state: {optflags:?}");
log::debug!(
"Delimiter: {delim} Infer_schema_len: {infer_len} try_parse_dates: {parse_dates} \
ignore_errors: {ignore_errors}, low_memory: {low_memory}, float_precision: \
Expand Down Expand Up @@ -747,7 +746,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
.with_decimal_comma(args.flag_decimal_comma)
.with_low_memory(args.flag_low_memory)
.finish()?;
ctx.register(table_name, lf.with_optimizations(optimization_state));
ctx.register(table_name, lf.with_optimizations(optflags));
}
}

Expand Down

0 comments on commit 7762f8f

Please sign in to comment.