Skip to content

Commit

Permalink
feat: support inlist in LiteralGurantee for pruning (#8654)
Browse files Browse the repository at this point in the history
* support inlist in LiteralGuarantee for pruning.

* add more tests

* rm useless notes

* Apply suggestions from code review

Co-authored-by: Huaijin <haohuaijin@gmail.com>

* add tests in row_groups

* Apply suggestions from code review

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* update comment & add more tests

---------

Co-authored-by: Huaijin <haohuaijin@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
4 people authored Dec 28, 2023
1 parent 6403222 commit 1737d49
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 162 deletions.
121 changes: 14 additions & 107 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,10 @@ mod tests {
use arrow::datatypes::DataType::Decimal128;
use arrow::datatypes::Schema;
use arrow::datatypes::{DataType, Field};
use datafusion_common::{config::ConfigOptions, TableReference, ToDFSchema};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{
builder::LogicalTableSource, cast, col, lit, AggregateUDF, Expr, ScalarUDF,
TableSource, WindowUDF,
};
use datafusion_common::{Result, ToDFSchema};
use datafusion_expr::{cast, col, lit, Expr};
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion_sql::planner::ContextProvider;
use parquet::arrow::arrow_to_parquet_schema;
use parquet::arrow::async_reader::ParquetObjectReader;
use parquet::basic::LogicalType;
Expand Down Expand Up @@ -1105,13 +1100,18 @@ mod tests {
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

// generate pruning predicate
let schema = Schema::new(vec![
Field::new("String", DataType::Utf8, false),
Field::new("String3", DataType::Utf8, false),
]);
let sql =
"SELECT * FROM tbl WHERE \"String\" IN ('Hello_Not_Exists', 'Hello_Not_Exists2')";
let expr = sql_to_physical_plan(sql).unwrap();
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);

let expr = col(r#""String""#).in_list(
vec![
lit("Hello_Not_Exists"),
lit("Hello_Not_Exists2"),
lit("Hello_Not_Exists3"),
lit("Hello_Not_Exist4"),
],
false,
);
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();

Expand Down Expand Up @@ -1312,97 +1312,4 @@ mod tests {

Ok(pruned_row_group)
}

fn sql_to_physical_plan(sql: &str) -> Result<Arc<dyn PhysicalExpr>> {
use datafusion_optimizer::{
analyzer::Analyzer, optimizer::Optimizer, OptimizerConfig, OptimizerContext,
};
use datafusion_sql::{
planner::SqlToRel,
sqlparser::{ast::Statement, parser::Parser},
};
use sqlparser::dialect::GenericDialect;

// parse the SQL
let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ...
let ast: Vec<Statement> = Parser::parse_sql(&dialect, sql).unwrap();
let statement = &ast[0];

// create a logical query plan
let schema_provider = TestSchemaProvider::new();
let sql_to_rel = SqlToRel::new(&schema_provider);
let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap();

// hard code the return value of now()
let config = OptimizerContext::new().with_skip_failing_rules(false);
let analyzer = Analyzer::new();
let optimizer = Optimizer::new();
// analyze and optimize the logical plan
let plan = analyzer.execute_and_check(&plan, config.options(), |_, _| {})?;
let plan = optimizer.optimize(&plan, &config, |_, _| {})?;
// convert the logical plan into a physical plan
let exprs = plan.expressions();
let expr = &exprs[0];
let df_schema = plan.schema().as_ref().to_owned();
let tb_schema: Schema = df_schema.clone().into();
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, &tb_schema, &execution_props)
}

struct TestSchemaProvider {
options: ConfigOptions,
tables: HashMap<String, Arc<dyn TableSource>>,
}

impl TestSchemaProvider {
pub fn new() -> Self {
let mut tables = HashMap::new();
tables.insert(
"tbl".to_string(),
create_table_source(vec![Field::new(
"String".to_string(),
DataType::Utf8,
false,
)]),
);

Self {
options: Default::default(),
tables,
}
}
}

impl ContextProvider for TestSchemaProvider {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
match self.tables.get(name.table()) {
Some(table) => Ok(table.clone()),
_ => datafusion_common::plan_err!("Table not found: {}", name.table()),
}
}

fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
None
}

fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
None
}

fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
None
}

fn options(&self) -> &ConfigOptions {
&self.options
}

fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
None
}
}

fn create_table_source(fields: Vec<Field>) -> Arc<dyn TableSource> {
Arc::new(LogicalTableSource::new(Arc::new(Schema::new(fields))))
}
}
Loading

0 comments on commit 1737d49

Please sign in to comment.