Skip to content

Commit

Permalink
Fix regression in updating equi-join optimization (#686)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Jul 12, 2024
1 parent d127018 commit b85bf10
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 42 deletions.
4 changes: 4 additions & 0 deletions crates/arroyo-planner/src/extension/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,29 @@ impl ArroyoExtension for JoinExtension {
join_plan.clone(),
&ArroyoPhysicalExtensionCodec::default(),
)?;

let operator_name = if self.is_instant {
OperatorName::InstantJoin
} else {
OperatorName::Join
};

let config = JoinOperator {
name: format!("join_{}", index),
left_schema: Some(left_schema.as_ref().clone().into()),
right_schema: Some(right_schema.as_ref().clone().into()),
output_schema: Some(self.output_schema().into()),
join_plan: physical_plan_node.encode_to_vec(),
};

let logical_node = LogicalNode {
operator_id: format!("join_{}", index),
description: "join".to_string(),
operator_name,
operator_config: config.encode_to_vec(),
parallelism: 1,
};

let left_edge =
LogicalEdge::project_all(LogicalEdgeType::LeftJoin, left_schema.as_ref().clone());
let right_edge =
Expand Down
4 changes: 3 additions & 1 deletion crates/arroyo-planner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use datafusion::logical_expr::expr_rewriter::FunctionRewrite;
use std::time::{Duration, SystemTime};
use std::{collections::HashMap, sync::Arc};
use syn::Item;
use tracing::{info, warn};
use tracing::{debug, info, warn};
use unicase::UniCase;

const DEFAULT_IDLE_TIME: Option<Duration> = Some(Duration::from_secs(5 * 60));
Expand Down Expand Up @@ -553,6 +553,8 @@ pub async fn parse_and_get_arrow_program(

let plan_rewrite = rewrite_plan(plan, &schema_provider)?;

debug!("Plan = {:?}", plan_rewrite);

let mut metadata = SourceMetadataVisitor::new(&schema_provider);
plan_rewrite.visit(&mut metadata)?;
used_connections.extend(metadata.connection_ids.iter());
Expand Down
5 changes: 5 additions & 0 deletions crates/arroyo-planner/src/plan/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl JoinRewriter {
name: &'static str,
) -> Result<LogicalPlan> {
let key_count = join_expressions.len();

let mut join_expressions: Vec<_> = join_expressions
.into_iter()
.enumerate()
Expand Down Expand Up @@ -263,6 +264,10 @@ impl TreeNodeRewriter for JoinRewriter {
};
Self::check_updating(&left, &right)?;

if on.is_empty() && !is_instant {
return not_impl_err!("Updating joins must include an equijoin condition");
}

let (left_expressions, right_expressions): (Vec<_>, Vec<_>) =
on.clone().into_iter().unzip();

Expand Down
19 changes: 9 additions & 10 deletions crates/arroyo-planner/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ use std::{collections::HashMap, time::Duration};
use arrow_schema::{DataType, Field, FieldRef, Schema};
use arroyo_connectors::connector_for_type;

use crate::extension::remote_table::RemoteTableExtension;
use crate::types::convert_data_type;
use crate::{
external::{ProcessingMode, SqlSource},
ArroyoSchemaProvider,
};
use crate::{rewrite_plan, DEFAULT_IDLE_TIME};
use arroyo_datastream::default_sink;
use arroyo_operator::connector::Connection;
use arroyo_rpc::api_types::connections::{
Expand All @@ -29,6 +36,7 @@ use datafusion::optimizer::eliminate_limit::EliminateLimit;
use datafusion::optimizer::eliminate_nested_union::EliminateNestedUnion;
use datafusion::optimizer::eliminate_one_union::EliminateOneUnion;
use datafusion::optimizer::eliminate_outer_join::EliminateOuterJoin;
use datafusion::optimizer::extract_equijoin_predicate::ExtractEquijoinPredicate;
use datafusion::optimizer::filter_null_join_keys::FilterNullJoinKeys;
use datafusion::optimizer::propagate_empty_relation::PropagateEmptyRelation;
use datafusion::optimizer::push_down_filter::PushDownFilter;
Expand All @@ -50,14 +58,6 @@ use datafusion::{
},
};

use crate::extension::remote_table::RemoteTableExtension;
use crate::types::convert_data_type;
use crate::{
external::{ProcessingMode, SqlSource},
ArroyoSchemaProvider,
};
use crate::{rewrite_plan, DEFAULT_IDLE_TIME};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ConnectorTable {
pub id: Option<i64>,
Expand Down Expand Up @@ -124,8 +124,7 @@ fn produce_optimized_plan(
Arc::new(EliminateJoin::new()),
Arc::new(DecorrelatePredicateSubquery::new()),
Arc::new(ScalarSubqueryToJoin::new()),
// Breaks window joins
// Arc::new(ExtractEquijoinPredicate::new()),
Arc::new(ExtractEquijoinPredicate::new()),
Arc::new(SimplifyExpressions::new()),
Arc::new(RewriteDisjunctivePredicate::new()),
Arc::new(EliminateDuplicatedExpr::new()),
Expand Down
30 changes: 30 additions & 0 deletions crates/arroyo-planner/src/test/queries/updating_filter_join.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
--fail=Updating joins must include an equijoin condition

CREATE TABLE cars (
timestamp TIMESTAMP,
car_id TEXT,
driver_id BIGINT,
event_type TEXT,
location TEXT
) WITH (
connector = 'single_file',
path = '$input_dir/cars.json',
format = 'json',
type = 'source',
event_time_field = 'timestamp'
);

CREATE TABLE passengers (
timestamp TIMESTAMP,
passenger_id BIGINT
) WITH (
connector = 'single_file',
path = '$input_dir/cars.json',
format = 'json',
type = 'source',
event_time_field = 'timestamp'
);

select passenger_id, car_id
from passengers
join cars ON passenger_id < car_id;
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ INNER JOIN (
COUNT(distinct driver_id) as pickup_drivers FROM cars where event_type = 'pickup'
GROUP BY 1
) pickups
ON dropoffs.window.start = pickups.window.start)
ON dropoffs.window = pickups.window)
54 changes: 50 additions & 4 deletions crates/arroyo-sql-testing/golden_outputs/updating_inner_join.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,50 @@
{"before":null,"after":{"left_counter":1,"counter_mod_2":0,"right_count":1},"op":"c"}
{"before":null,"after":{"left_counter":1,"counter_mod_2":1,"right_count":1},"op":"c"}
{"before":null,"after":{"left_counter":2,"counter_mod_2":0,"right_count":2},"op":"c"}
{"before":{"left_counter":1,"counter_mod_2":0,"right_count":1},"after":null,"op":"d"}
{"before":null,"after":{"left_count":1,"right_count":1},"op":"c"}
{"before":null,"after":{"left_count":3,"right_count":3},"op":"c"}
{"before":null,"after":{"left_count":5,"right_count":5},"op":"c"}
{"before":null,"after":{"left_count":7,"right_count":7},"op":"c"}
{"before":null,"after":{"left_count":9,"right_count":9},"op":"c"}
{"before":null,"after":{"left_count":11,"right_count":11},"op":"c"}
{"before":null,"after":{"left_count":13,"right_count":13},"op":"c"}
{"before":null,"after":{"left_count":15,"right_count":15},"op":"c"}
{"before":null,"after":{"left_count":17,"right_count":17},"op":"c"}
{"before":null,"after":{"left_count":19,"right_count":19},"op":"c"}
{"before":null,"after":{"left_count":21,"right_count":21},"op":"c"}
{"before":null,"after":{"left_count":23,"right_count":23},"op":"c"}
{"before":null,"after":{"left_count":25,"right_count":25},"op":"c"}
{"before":null,"after":{"left_count":27,"right_count":27},"op":"c"}
{"before":null,"after":{"left_count":29,"right_count":29},"op":"c"}
{"before":null,"after":{"left_count":31,"right_count":31},"op":"c"}
{"before":null,"after":{"left_count":33,"right_count":33},"op":"c"}
{"before":null,"after":{"left_count":35,"right_count":35},"op":"c"}
{"before":null,"after":{"left_count":37,"right_count":37},"op":"c"}
{"before":null,"after":{"left_count":39,"right_count":39},"op":"c"}
{"before":null,"after":{"left_count":41,"right_count":41},"op":"c"}
{"before":null,"after":{"left_count":43,"right_count":43},"op":"c"}
{"before":null,"after":{"left_count":45,"right_count":45},"op":"c"}
{"before":null,"after":{"left_count":47,"right_count":47},"op":"c"}
{"before":null,"after":{"left_count":49,"right_count":49},"op":"c"}
{"before":null,"after":{"left_count":51,"right_count":51},"op":"c"}
{"before":null,"after":{"left_count":53,"right_count":53},"op":"c"}
{"before":null,"after":{"left_count":55,"right_count":55},"op":"c"}
{"before":null,"after":{"left_count":57,"right_count":57},"op":"c"}
{"before":null,"after":{"left_count":59,"right_count":59},"op":"c"}
{"before":null,"after":{"left_count":61,"right_count":61},"op":"c"}
{"before":null,"after":{"left_count":63,"right_count":63},"op":"c"}
{"before":null,"after":{"left_count":65,"right_count":65},"op":"c"}
{"before":null,"after":{"left_count":67,"right_count":67},"op":"c"}
{"before":null,"after":{"left_count":69,"right_count":69},"op":"c"}
{"before":null,"after":{"left_count":71,"right_count":71},"op":"c"}
{"before":null,"after":{"left_count":73,"right_count":73},"op":"c"}
{"before":null,"after":{"left_count":75,"right_count":75},"op":"c"}
{"before":null,"after":{"left_count":77,"right_count":77},"op":"c"}
{"before":null,"after":{"left_count":79,"right_count":79},"op":"c"}
{"before":null,"after":{"left_count":81,"right_count":81},"op":"c"}
{"before":null,"after":{"left_count":83,"right_count":83},"op":"c"}
{"before":null,"after":{"left_count":85,"right_count":85},"op":"c"}
{"before":null,"after":{"left_count":87,"right_count":87},"op":"c"}
{"before":null,"after":{"left_count":89,"right_count":89},"op":"c"}
{"before":null,"after":{"left_count":91,"right_count":91},"op":"c"}
{"before":null,"after":{"left_count":93,"right_count":93},"op":"c"}
{"before":null,"after":{"left_count":95,"right_count":95},"op":"c"}
{"before":null,"after":{"left_count":97,"right_count":97},"op":"c"}
{"before":null,"after":{"left_count":99,"right_count":99},"op":"c"}
53 changes: 28 additions & 25 deletions crates/arroyo-sql-testing/src/test/queries/updating_inner_join.sql
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
--fail=Error during planning: can't handle updating right side of join
CREATE TABLE impulse (
timestamp TIMESTAMP,
counter bigint unsigned not null,
subtask_index bigint unsigned not null
) WITH (
connector = 'single_file',
path = '$input_dir/impulse.json',
format = 'json',
type = 'source',
event_time_field = 'timestamp'
);
timestamp TIMESTAMP,
counter bigint unsigned not null,
subtask_index bigint unsigned not null
) WITH (
connector = 'single_file',
path = '$input_dir/impulse.json',
format = 'json',
type = 'source',
event_time_field = 'timestamp'
);


CREATE TABLE output (
left_counter bigint,
counter_mod_2 bigint,
right_count bigint
) WITH (
connector = 'single_file',
path = '$output_path',
format = 'debezium_json',
type = 'sink'
);
CREATE VIEW impulse_odd AS (
SELECT * FROM impulse
WHERE counter % 2 == 1
);

INSERT INTO output
select counter as left_counter, counter_mod_2, right_count from impulse inner join
(select counter % 2 as counter_mod_2, cast(count(*) as bigint UNSIGNED) as right_count from impulse where counter < 3 group by 1)
on counter = right_count where counter < 3;
CREATE TABLE output (
left_count bigint,
right_count bigint
) WITH (
connector = 'single_file',
path = '$output_path',
format = 'debezium_json',
type = 'sink'
);

INSERT INTO output
SELECT A.counter, B.counter
FROM impulse A
JOIN impulse_odd B ON A.counter = B.counter;
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
--fail=Error during planning: can't handle updating right side of join
CREATE TABLE impulse (
timestamp TIMESTAMP,
counter bigint unsigned not null,
subtask_index bigint unsigned not null
) WITH (
connector = 'single_file',
path = '$input_dir/impulse.json',
format = 'json',
type = 'source',
event_time_field = 'timestamp'
);


CREATE TABLE output (
left_counter bigint,
counter_mod_2 bigint,
right_count bigint
) WITH (
connector = 'single_file',
path = '$output_path',
format = 'debezium_json',
type = 'sink'
);

INSERT INTO output
select counter as left_counter, counter_mod_2, right_count from impulse inner join
(select counter % 2 as counter_mod_2, cast(count(*) as bigint UNSIGNED) as right_count from impulse where counter < 3 group by 1)
on counter = right_count where counter < 3;
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ INNER JOIN (
COUNT(distinct driver_id) as pickup_drivers FROM cars where event_type = 'pickup'
GROUP BY 1
) pickups
ON dropoffs.window.start = pickups.window.start)
ON dropoffs.window = pickups.window)

0 comments on commit b85bf10

Please sign in to comment.