Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support inlist in LiteralGurantee for pruning #8654

Merged
merged 7 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 14 additions & 107 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,10 @@ mod tests {
use arrow::datatypes::DataType::Decimal128;
use arrow::datatypes::Schema;
use arrow::datatypes::{DataType, Field};
use datafusion_common::{config::ConfigOptions, TableReference, ToDFSchema};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{
builder::LogicalTableSource, cast, col, lit, AggregateUDF, Expr, ScalarUDF,
TableSource, WindowUDF,
};
use datafusion_common::{Result, ToDFSchema};
use datafusion_expr::{cast, col, lit, Expr};
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion_sql::planner::ContextProvider;
use parquet::arrow::arrow_to_parquet_schema;
use parquet::arrow::async_reader::ParquetObjectReader;
use parquet::basic::LogicalType;
Expand Down Expand Up @@ -1105,13 +1100,18 @@ mod tests {
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

// generate pruning predicate
let schema = Schema::new(vec![
Field::new("String", DataType::Utf8, false),
Field::new("String3", DataType::Utf8, false),
]);
let sql =
"SELECT * FROM tbl WHERE \"String\" IN ('Hello_Not_Exists', 'Hello_Not_Exists2')";
let expr = sql_to_physical_plan(sql).unwrap();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This previous test relied on the optimizer who will convert in_list expr with no more 3 elements to and expr. Now we don't need this. logical2physical would convert logical_expr to physical_expr without optimizer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- this change makes sense

BTW I tried to improve these tests to avoid duplication in #8435 (not yet merged). I'll try and consolidate this test too when I get a chance

let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);

let expr = col(r#""String""#).in_list(
vec![
lit("Hello_Not_Exists"),
lit("Hello_Not_Exists2"),
lit("Hello_Not_Exists3"),
lit("Hello_Not_Exist4"),
],
false,
);
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();

Expand Down Expand Up @@ -1312,97 +1312,4 @@ mod tests {

Ok(pruned_row_group)
}

fn sql_to_physical_plan(sql: &str) -> Result<Arc<dyn PhysicalExpr>> {
use datafusion_optimizer::{
analyzer::Analyzer, optimizer::Optimizer, OptimizerConfig, OptimizerContext,
};
use datafusion_sql::{
planner::SqlToRel,
sqlparser::{ast::Statement, parser::Parser},
};
use sqlparser::dialect::GenericDialect;

// parse the SQL
let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ...
let ast: Vec<Statement> = Parser::parse_sql(&dialect, sql).unwrap();
let statement = &ast[0];

// create a logical query plan
let schema_provider = TestSchemaProvider::new();
let sql_to_rel = SqlToRel::new(&schema_provider);
let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap();

// hard code the return value of now()
let config = OptimizerContext::new().with_skip_failing_rules(false);
let analyzer = Analyzer::new();
let optimizer = Optimizer::new();
// analyze and optimize the logical plan
let plan = analyzer.execute_and_check(&plan, config.options(), |_, _| {})?;
let plan = optimizer.optimize(&plan, &config, |_, _| {})?;
// convert the logical plan into a physical plan
let exprs = plan.expressions();
let expr = &exprs[0];
let df_schema = plan.schema().as_ref().to_owned();
let tb_schema: Schema = df_schema.clone().into();
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, &tb_schema, &execution_props)
}

struct TestSchemaProvider {
options: ConfigOptions,
tables: HashMap<String, Arc<dyn TableSource>>,
}

impl TestSchemaProvider {
pub fn new() -> Self {
let mut tables = HashMap::new();
tables.insert(
"tbl".to_string(),
create_table_source(vec![Field::new(
"String".to_string(),
DataType::Utf8,
false,
)]),
);

Self {
options: Default::default(),
tables,
}
}
}

impl ContextProvider for TestSchemaProvider {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
match self.tables.get(name.table()) {
Some(table) => Ok(table.clone()),
_ => datafusion_common::plan_err!("Table not found: {}", name.table()),
}
}

fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
None
}

fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
None
}

fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
None
}

fn options(&self) -> &ConfigOptions {
&self.options
}

fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
None
}
}

fn create_table_source(fields: Vec<Field>) -> Arc<dyn TableSource> {
Arc::new(LogicalTableSource::new(Arc::new(Schema::new(fields))))
}
}
Loading