Skip to content

Error if subqueries contain more than one result #5651

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Apr 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ __pycache__
.mypy_cache
venv
node_modules
**/*.swp
2 changes: 2 additions & 0 deletions src/dataflow/src/render/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ impl FilterPlan {
return Err("Unsupported temporal predicate: `mz_logical_timestamp()` must be directly compared to a non-temporal expression ".to_string());
}
}
// Order predicates to put literal errors last.
normal.sort_by_key(|p| p.is_literal_err());

Ok(Self {
normal,
Expand Down
4 changes: 3 additions & 1 deletion src/expr/src/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,10 @@ impl MapFilterProject {
self.predicates.push((max_support, predicate))
}
// Stable sort predicates by position at which they take effect.
// We put literal errors at the end as a stop-gap to avoid erroring
// before we are able to evaluate any predicates that might prevent it.
self.predicates
.sort_by_key(|(position, _predicate)| *position);
.sort_by_key(|(position, predicate)| (predicate.is_literal_err(), *position));
self
}

Expand Down
66 changes: 65 additions & 1 deletion src/expr/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,71 @@ impl MirRelationExpr {
.unwrap();
}
}
RelationType::new(base_cols)

// Generally, unions do not have any unique keys, because
// each input might duplicate some. However, there is at
// least one idiomatic structure that does preserve keys,
// which results from SQL aggregations that must populate
// absent records with default values. In that pattern,
// the union of one GET with its negation, which has first
// been subjected to a projection and map, we can remove
// their influence on the key structure.
//
// If there are A, B, each with a unique `key` such that
// we are looking at
//
// A + (B - A.proj(key)).map(stuff)
//
// Then we can report `key` as a unique key.
//
// TODO: make unique key structure an optimization analysis
// rather than part of the type information.
// TODO: perhaps ensure that (above) A.proj(key) is a
// subset of B, as otherwise there are negative records
// and who knows what is true (not expected, but again
// who knows what the query plan might look like).
Comment on lines +477 to +480
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this why are we looking for such a complicated pattern instead of just B-A, where B has unique key and A also has the same unique key ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow exactly, but .. if A is not a subset of B then there could be negative records.

let mut keys = Vec::new();
if let MirRelationExpr::Get {
id: first_id,
typ: _,
} = &**base
{
if inputs.len() == 1 {
if let MirRelationExpr::Map { input, .. } = &inputs[0] {
if let MirRelationExpr::Union { base, inputs } = &**input {
if inputs.len() == 1 {
if let MirRelationExpr::Project { input, outputs } = &**base {
if let MirRelationExpr::Negate { input } = &**input {
if let MirRelationExpr::Get {
id: second_id,
typ: _,
} = &**input
{
if first_id == second_id {
keys.extend(
inputs[0].typ().keys.drain(..).filter(
|key| {
key.iter().all(|c| {
outputs.get(*c) == Some(c)
})
},
),
);
}
}
}
}
}
}
}
}
}

let mut result = RelationType::new(base_cols);
for key in keys {
result = result.with_key(key);
}
result
// Important: do not inherit keys of either input, as not unique.
}
MirRelationExpr::ArrangeBy { input, .. } => input.typ(),
Expand Down
4 changes: 4 additions & 0 deletions src/expr/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,7 @@ pub enum EvalError {
InfinityOutOfDomain(String),
NegativeOutOfDomain(String),
ZeroOutOfDomain(String),
MultipleRowsFromSubquery,
}

impl fmt::Display for EvalError {
Expand Down Expand Up @@ -965,6 +966,9 @@ impl fmt::Display for EvalError {
EvalError::ZeroOutOfDomain(s) => {
write!(f, "function {} is not defined for zero", s)
}
EvalError::MultipleRowsFromSubquery => {
write!(f, "more than one record produced in subquery")
}
}
}
}
Expand Down
56 changes: 54 additions & 2 deletions src/sql/src/plan/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,13 @@ impl HirRelationExpr {

let left = left.applied_to(id_gen, get_outer, col_map);
left.let_in(id_gen, |id_gen, get_left| {
let apply_requires_distinct_outer = false;
let mut join = branch(
id_gen,
get_left.clone(),
col_map,
*right,
apply_requires_distinct_outer,
|id_gen, right, get_left, col_map| {
right.applied_to(id_gen, get_left, col_map)
},
Expand Down Expand Up @@ -621,11 +623,13 @@ impl HirScalarExpr {
// When the subquery would return 0 rows for some row in the outer query, `subquery.applied_to(get_inner)` will not have any corresponding row.
// Use `lookup` if you need to add default values for cases when the subquery returns 0 rows.
Exists(expr) => {
let apply_requires_distinct_outer = false;
*inner = branch(
id_gen,
inner.take_dangerous(),
col_map,
*expr,
apply_requires_distinct_outer,
|id_gen, expr, get_inner, col_map| {
let exists = expr
// compute for every row in get_inner
Expand All @@ -648,20 +652,58 @@ impl HirScalarExpr {
);
SS::Column(inner.arity() - 1)
}

Select(expr) => {
let apply_requires_distinct_outer = true;
*inner = branch(
id_gen,
inner.take_dangerous(),
col_map,
*expr,
apply_requires_distinct_outer,
|id_gen, expr, get_inner, col_map| {
let select = expr
// compute for every row in get_inner
.applied_to(id_gen, get_inner.clone(), col_map);
let col_type = select.typ().column_types.into_last();

let inner_arity = get_inner.arity();
// We must determine a count for each `get_inner` prefix,
// and report an error if that count exceeds one.
let guarded = select.let_in(id_gen, |_id_gen, get_select| {
// Count for each `get_inner` prefix.
let counts = get_select.clone().reduce(
(0..inner_arity).collect::<Vec<_>>(),
vec![expr::AggregateExpr {
func: expr::AggregateFunc::Count,
expr: expr::MirScalarExpr::literal_ok(
Datum::True,
ScalarType::Bool,
),
distinct: false,
}],
None,
);
// Errors should result from counts > 1.
let errors = counts
.filter(vec![expr::MirScalarExpr::Column(inner_arity).call_binary(
expr::MirScalarExpr::literal_ok(
Datum::Int64(1),
ScalarType::Int64,
),
expr::BinaryFunc::Gt,
)])
.project((0..inner_arity).collect::<Vec<_>>())
.map(vec![expr::MirScalarExpr::literal(
Err(expr::EvalError::MultipleRowsFromSubquery),
col_type.clone().scalar_type,
)]);
// Return `get_select` and any errors added in.
get_select.union(errors)
});
// append Null to anything that didn't return any rows
let default = vec![(Datum::Null, col_type.nullable(true))];
get_inner.lookup(id_gen, select, default)
get_inner.lookup(id_gen, guarded, default)
},
);
SS::Column(inner.arity() - 1)
Expand Down Expand Up @@ -713,13 +755,23 @@ impl HirScalarExpr {
/// not depended upon by `inner` are thrown away before the distinct, so that we
/// don't perform needless computation of `inner`.
///
/// `branch` will inspect the contents of `inner` to determine whether `inner`
/// is not multiplicity sensitive (roughly, contains only maps, filters,
/// projections, and calls to table functions). If it is not multiplicity
/// sensitive, `branch` will *not* distinctify outer. If this is problematic,
/// e.g. because the `apply` callback itself introduces multiplicity-sensitive
/// operations that were not present in `inner`, then set
/// `apply_requires_distinct_outer` to ensure that `branch` chooses the plan
/// that distinctifies `outer`.
///
/// The caller must supply the `apply` function that applies the rewritten
/// `inner` to `outer`.
fn branch<F>(
id_gen: &mut expr::IdGen,
outer: expr::MirRelationExpr,
col_map: &ColumnMap,
mut inner: HirRelationExpr,
apply_requires_distinct_outer: bool,
apply: F,
) -> expr::MirRelationExpr
where
Expand Down Expand Up @@ -786,7 +838,7 @@ where
| HirRelationExpr::CallTable { .. } => (),
_ => is_simple = false,
});
if is_simple {
if is_simple && !apply_requires_distinct_outer {
let new_col_map = col_map.enter_scope(outer.arity() - col_map.len());
return outer.let_in(id_gen, |id_gen, get_outer| {
apply(id_gen, inner, get_outer, &new_col_map)
Expand Down
87 changes: 58 additions & 29 deletions src/transform/src/predicate_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,17 @@
//! This action generally improves the quality of the query, in that selective per-record
//! filters reduce the volume of data before they arrive at more expensive operators.
//!
//!
//! The one time when this action might not improve the quality of a query is
//! if a filter gets pushed down on an arrangement because that blocks arrangement
//! reuse. It assumed that actions that need an arrangement are responsible for
//! lifting filters out of the way.
//!
//! Predicate pushdown will not push down literal errors, unless it is certain that
//! the literal errors will be unconditionally evaluated. For example, the pushdown
//! will not happen if not all predicates can be pushed down (e.g. reduce and map),
//! or if we are not certain that the input is non-empty (e.g. join).
//!
//! ```rust
//! use expr::{BinaryFunc, IdGen, MirRelationExpr, MirScalarExpr};
//! use repr::{ColumnType, Datum, RelationType, ScalarType};
Expand Down Expand Up @@ -158,6 +164,16 @@ impl PredicatePushdown {
// otherwise we should recursively descend.
match relation {
MirRelationExpr::Filter { input, predicates } => {
// Reduce the predicates to determine as best as possible
// whether they are literal errors before working with them.
let input_type = input.typ();
for predicate in predicates.iter_mut() {
predicate.reduce(&input_type);
}

// It can be helpful to know if there are any non-literal errors,
// as this is justification for not pushing down literal errors.
let all_errors = predicates.iter().all(|p| p.is_literal_err());
// Depending on the type of `input` we have different
// logic to apply to consider pushing `predicates` down.
match &mut **input {
Expand Down Expand Up @@ -265,7 +281,10 @@ impl PredicatePushdown {
// equivalences allow the predicate to be rewritten
// in terms of only columns from that input.
for (index, push_down) in push_downs.iter_mut().enumerate() {
if let Some(localized) = input_mapper
if predicate.is_literal_err() {
// Do nothing. We don't push down literal errors,
// as we can't know the join will be non-empty.
} else if let Some(localized) = input_mapper
.try_map_to_input_with_bound_expr(
predicate.clone(),
index,
Expand Down Expand Up @@ -316,30 +335,37 @@ impl PredicatePushdown {
let mut retain = Vec::new();
let mut push_down = Vec::new();
for predicate in predicates.drain(..) {
let mut supported = true;
let mut new_predicate = predicate.clone();
new_predicate.visit_mut(&mut |e| {
if let MirScalarExpr::Column(c) = e {
if *c >= group_key.len() {
supported = false;
}
}
});
if supported {
// Do not push down literal errors unless it is only errors.
if !predicate.is_literal_err() || all_errors {
let mut supported = true;
let mut new_predicate = predicate.clone();
new_predicate.visit_mut(&mut |e| {
if let MirScalarExpr::Column(i) = e {
*e = group_key[*i].clone();
if let MirScalarExpr::Column(c) = e {
if *c >= group_key.len() {
supported = false;
}
}
});
push_down.push(new_predicate);
} else if let MirScalarExpr::Column(col) = &predicate {
if *col == group_key.len()
&& aggregates.len() == 1
&& aggregates[0].func == AggregateFunc::Any
{
push_down.push(aggregates[0].expr.clone());
aggregates[0].expr =
MirScalarExpr::literal_ok(Datum::True, ScalarType::Bool);
if supported {
new_predicate.visit_mut(&mut |e| {
if let MirScalarExpr::Column(i) = e {
*e = group_key[*i].clone();
}
});
push_down.push(new_predicate);
} else if let MirScalarExpr::Column(col) = &predicate {
if *col == group_key.len()
&& aggregates.len() == 1
&& aggregates[0].func == AggregateFunc::Any
{
push_down.push(aggregates[0].expr.clone());
aggregates[0].expr = MirScalarExpr::literal_ok(
Datum::True,
ScalarType::Bool,
);
} else {
retain.push(predicate);
}
} else {
retain.push(predicate);
}
Expand Down Expand Up @@ -402,13 +428,16 @@ impl PredicatePushdown {
// First, check if we can push this predicate down. We can do so if each
// column it references is either from the input or is generated by an
// expression that can be inlined.
if predicate.support().iter().all(|c| {
*c < input_arity
|| PredicatePushdown::can_inline(
&scalars[*c - input_arity],
input_arity,
)
}) {
// We also will not push down literal errors, unless all predicates are.
if (!predicate.is_literal_err() || all_errors)
&& predicate.support().iter().all(|c| {
*c < input_arity
|| PredicatePushdown::can_inline(
&scalars[*c - input_arity],
input_arity,
)
})
{
predicate.visit_mut(&mut |e| {
if let MirScalarExpr::Column(c) = e {
// NB: this inlining would be invalid if can_inline did not
Expand Down
2 changes: 2 additions & 0 deletions src/transform/src/reduction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ impl FoldConstants {
{
relation.take_safely();
} else if let MirRelationExpr::Constant { rows, .. } = &**input {
// Evaluate errors last, to reduce risk of spurious errors.
predicates.sort_by_key(|p| p.is_literal_err());
let new_rows = match rows {
Ok(rows) => Self::fold_filter_constant(predicates, rows),
Err(e) => Err(e.clone()),
Expand Down
7 changes: 1 addition & 6 deletions test/sqllogictest/cte.slt
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,8 @@ SELECT * FROM x WHERE a IN
2
3

# Ensure trivial nesting doesn't crash
query I rowsort
statement error more than one record produced in subquery
WITH c AS (SELECT a + 1 FROM x) SELECT (SELECT * FROM c);
----
2
3
4

# Allow re-using names laterally.
query I rowsort
Expand Down
Loading