Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@ sqlparser = "0.33"
[dev-dependencies]
ctor = "0.2.0"
env_logger = "0.10"
paste = "^1.0"
rstest = "0.17"
27 changes: 7 additions & 20 deletions datafusion/sql/src/expr/binary_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::{DFSchema, DataFusionError, Result};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use sqlparser::ast::{BinaryOperator, Expr as SQLExpr};
use crate::planner::{ContextProvider, SqlToRel};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::Operator;
use sqlparser::ast::BinaryOperator;

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(crate) fn parse_sql_binary_op(
&self,
left: SQLExpr,
op: BinaryOperator,
right: SQLExpr,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let operator = match op {
pub(crate) fn parse_sql_binary_op(&self, op: BinaryOperator) -> Result<Operator> {
match op {
BinaryOperator::Gt => Ok(Operator::Gt),
BinaryOperator::GtEq => Ok(Operator::GtEq),
BinaryOperator::Lt => Ok(Operator::Lt),
Expand All @@ -56,12 +49,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported SQL binary operator {op:?}"
))),
}?;

Ok(Expr::BinaryExpr(BinaryExpr::new(
Box::new(self.sql_expr_to_logical_expr(left, schema, planner_context)?),
operator,
Box::new(self.sql_expr_to_logical_expr(right, schema, planner_context)?),
)))
}
}
}
188 changes: 169 additions & 19 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,56 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
// Workaround for https://github.com/apache/arrow-datafusion/issues/4065
//
// Minimize stack space required in debug builds to plan
// deeply nested binary operators by keeping the stack space
// needed for sql_expr_to_logical_expr minimal for BinaryOp
//
// The reason this reduces stack size in debug builds is
// explained in the "Technical Backstory" heading of
// https://github.com/apache/arrow-datafusion/pull/1047
//
// A likely better way to support deeply nested expressions
// would be to avoid recursion all together and use an
// iterative algorithm.
match sql {
SQLExpr::BinaryOp { left, op, right } => {
self.parse_sql_binary_op(*left, op, *right, schema, planner_context)
enum StackEntry {
SQLExpr(Box<SQLExpr>),
Operator(Operator),
}

// Virtual stack machine to convert SQLExpr to Expr
// This allows visiting the expr tree in a depth-first manner which
// produces expressions in postfix notations, i.e. `a + b` => `a b +`.
// See https://github.com/apache/arrow-datafusion/issues/1444
let mut stack = vec![StackEntry::SQLExpr(Box::new(sql))];
let mut eval_stack = vec![];

while let Some(entry) = stack.pop() {
match entry {
StackEntry::SQLExpr(sql_expr) => {
match *sql_expr {
SQLExpr::BinaryOp { left, op, right } => {
// Note the order that we push the entries to the stack
// is important. We want to visit the left node first.
let op = self.parse_sql_binary_op(op)?;
stack.push(StackEntry::Operator(op));
stack.push(StackEntry::SQLExpr(right));
stack.push(StackEntry::SQLExpr(left));
}
_ => {
let expr = self.sql_expr_to_logical_expr_internal(
*sql_expr,
schema,
planner_context,
)?;
eval_stack.push(expr);
}
}
}
StackEntry::Operator(op) => {
let right = eval_stack.pop().unwrap();
let left = eval_stack.pop().unwrap();
let expr = Expr::BinaryExpr(BinaryExpr::new(
Box::new(left),
op,
Box::new(right),
));
eval_stack.push(expr);
}
}
// since this function requires more space per frame
// avoid calling it for binary ops
_ => self.sql_expr_to_logical_expr_internal(sql, schema, planner_context),
}

assert_eq!(1, eval_stack.len());
let expr = eval_stack.pop().unwrap();
Ok(expr)
}

/// Generate a relational expression from a SQL expression
Expand Down Expand Up @@ -566,3 +595,124 @@ fn plan_indexed(expr: Expr, mut keys: Vec<SQLExpr>) -> Result<Expr> {
plan_key(key)?,
)))
}

#[cfg(test)]
mod tests {
use super::*;

use std::collections::HashMap;
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema};
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;

use datafusion_common::config::ConfigOptions;
use datafusion_expr::logical_plan::builder::LogicalTableSource;
use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource};

use crate::TableReference;

struct TestSchemaProvider {
options: ConfigOptions,
tables: HashMap<String, Arc<dyn TableSource>>,
}

impl TestSchemaProvider {
pub fn new() -> Self {
let mut tables = HashMap::new();
tables.insert(
"table1".to_string(),
create_table_source(vec![Field::new(
"column1".to_string(),
DataType::Utf8,
false,
)]),
);

Self {
options: Default::default(),
tables,
}
}
}

impl ContextProvider for TestSchemaProvider {
fn get_table_provider(
&self,
name: TableReference,
) -> Result<Arc<dyn TableSource>> {
match self.tables.get(name.table()) {
Some(table) => Ok(table.clone()),
_ => Err(DataFusionError::Plan(format!(
"Table not found: {}",
name.table()
))),
}
}

fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
None
}

fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
None
}

fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
None
}

fn options(&self) -> &ConfigOptions {
&self.options
}
}

fn create_table_source(fields: Vec<Field>) -> Arc<dyn TableSource> {
Arc::new(LogicalTableSource::new(Arc::new(
Schema::new_with_metadata(fields, HashMap::new()),
)))
}

macro_rules! test_stack_overflow {
($num_expr:expr) => {
paste::item! {
#[test]
fn [<test_stack_overflow_ $num_expr>]() {
let schema = DFSchema::empty();
let mut planner_context = PlannerContext::default();

let expr_str = (0..$num_expr)
.map(|i| format!("column1 = 'value{:?}'", i))
.collect::<Vec<String>>()
.join(" OR ");

let dialect = GenericDialect{};
let mut parser = Parser::new(&dialect)
.try_with_sql(expr_str.as_str())
.unwrap();
let sql_expr = parser.parse_expr().unwrap();

let schema_provider = TestSchemaProvider::new();
let sql_to_rel = SqlToRel::new(&schema_provider);

// Should not stack overflow
sql_to_rel.sql_expr_to_logical_expr(
sql_expr,
&schema,
&mut planner_context,
).unwrap();
}
}
};
}

test_stack_overflow!(64);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified that without the code in this PR, the tests fail as expected:

running 14 tests
test expr::arrow_cast::test::test_parse_data_type_whitespace_tolerance ... ok�(B
test expr::identifier::test::test_form_identifier ... ok�(B
test expr::identifier::test::test_generate_schema_search_terms ... ok�(B
test expr::arrow_cast::test::parse_data_type_errors ... ok�(B
test expr::arrow_cast::test::test_parse_data_type ... ok�(B
test parser::tests::create_external_table ... ok�(B
test expr::tests::test_stack_overflow_64 ... ok�(B
test expr::tests::test_stack_overflow_128 ... ok�(B
test expr::tests::test_stack_overflow_256 ... ok�(B
test expr::tests::test_stack_overflow_512 ... ok�(B

thread 'expr::tests::test_stack_overflow_1024' has overflowed its stack
fatal runtime error: stack overflow
error: test failed, to rerun pass `-p datafusion-sql --lib`

Caused by:
  process didn't exit successfully: `/Users/alamb/Software/target-df2/debug/deps/datafusion_sql-8a77caa8d397a04b` (signal: 6, SIGABRT: process abort signal)

test_stack_overflow!(128);
test_stack_overflow!(256);
test_stack_overflow!(512);
test_stack_overflow!(1024);
test_stack_overflow!(2048);
test_stack_overflow!(4096);
test_stack_overflow!(8192);
}