From b85bf10f17c49bca28f88314317bcdb2864f4f4f Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Fri, 12 Jul 2024 16:25:31 -0700 Subject: [PATCH] Fix regression in updating equi-join optimization (#686) --- crates/arroyo-planner/src/extension/join.rs | 4 ++ crates/arroyo-planner/src/lib.rs | 4 +- crates/arroyo-planner/src/plan/join.rs | 5 ++ crates/arroyo-planner/src/tables.rs | 19 ++++--- .../src/test/queries/updating_filter_join.sql | 30 +++++++++++ .../src/test/queries/windowed_inner_join.sql | 2 +- .../golden_outputs/updating_inner_join.json | 54 +++++++++++++++++-- .../src/test/queries/updating_inner_join.sql | 53 +++++++++--------- .../updating_inner_join_with_updating.sql | 29 ++++++++++ .../src/test/queries/windowed_inner_join.sql | 2 +- 10 files changed, 160 insertions(+), 42 deletions(-) create mode 100644 crates/arroyo-planner/src/test/queries/updating_filter_join.sql create mode 100644 crates/arroyo-sql-testing/src/test/queries/updating_inner_join_with_updating.sql diff --git a/crates/arroyo-planner/src/extension/join.rs b/crates/arroyo-planner/src/extension/join.rs index 0de00aa9f..c4e7e4b0d 100644 --- a/crates/arroyo-planner/src/extension/join.rs +++ b/crates/arroyo-planner/src/extension/join.rs @@ -42,11 +42,13 @@ impl ArroyoExtension for JoinExtension { join_plan.clone(), &ArroyoPhysicalExtensionCodec::default(), )?; + let operator_name = if self.is_instant { OperatorName::InstantJoin } else { OperatorName::Join }; + let config = JoinOperator { name: format!("join_{}", index), left_schema: Some(left_schema.as_ref().clone().into()), @@ -54,6 +56,7 @@ impl ArroyoExtension for JoinExtension { output_schema: Some(self.output_schema().into()), join_plan: physical_plan_node.encode_to_vec(), }; + let logical_node = LogicalNode { operator_id: format!("join_{}", index), description: "join".to_string(), @@ -61,6 +64,7 @@ impl ArroyoExtension for JoinExtension { operator_config: config.encode_to_vec(), parallelism: 1, }; + let left_edge = LogicalEdge::project_all(LogicalEdgeType::LeftJoin, left_schema.as_ref().clone()); let right_edge = diff --git a/crates/arroyo-planner/src/lib.rs b/crates/arroyo-planner/src/lib.rs index b4cfe3916..dde82baeb 100644 --- a/crates/arroyo-planner/src/lib.rs +++ b/crates/arroyo-planner/src/lib.rs @@ -72,7 +72,7 @@ use datafusion::logical_expr::expr_rewriter::FunctionRewrite; use std::time::{Duration, SystemTime}; use std::{collections::HashMap, sync::Arc}; use syn::Item; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use unicase::UniCase; const DEFAULT_IDLE_TIME: Option = Some(Duration::from_secs(5 * 60)); @@ -553,6 +553,8 @@ pub async fn parse_and_get_arrow_program( let plan_rewrite = rewrite_plan(plan, &schema_provider)?; + debug!("Plan = {:?}", plan_rewrite); + let mut metadata = SourceMetadataVisitor::new(&schema_provider); plan_rewrite.visit(&mut metadata)?; used_connections.extend(metadata.connection_ids.iter()); diff --git a/crates/arroyo-planner/src/plan/join.rs b/crates/arroyo-planner/src/plan/join.rs index 514ada02c..54bd68027 100644 --- a/crates/arroyo-planner/src/plan/join.rs +++ b/crates/arroyo-planner/src/plan/join.rs @@ -83,6 +83,7 @@ impl JoinRewriter { name: &'static str, ) -> Result { let key_count = join_expressions.len(); + let mut join_expressions: Vec<_> = join_expressions .into_iter() .enumerate() @@ -263,6 +264,10 @@ impl TreeNodeRewriter for JoinRewriter { }; Self::check_updating(&left, &right)?; + if on.is_empty() && !is_instant { + return not_impl_err!("Updating joins must include an equijoin condition"); + } + let (left_expressions, right_expressions): (Vec<_>, Vec<_>) = on.clone().into_iter().unzip(); diff --git a/crates/arroyo-planner/src/tables.rs b/crates/arroyo-planner/src/tables.rs index 39f789be0..8eae70d1e 100644 --- a/crates/arroyo-planner/src/tables.rs +++ b/crates/arroyo-planner/src/tables.rs @@ -5,6 +5,13 @@ use std::{collections::HashMap, time::Duration}; use arrow_schema::{DataType, Field, FieldRef, Schema}; use arroyo_connectors::connector_for_type; +use crate::extension::remote_table::RemoteTableExtension; +use crate::types::convert_data_type; +use crate::{ + external::{ProcessingMode, SqlSource}, + ArroyoSchemaProvider, +}; +use crate::{rewrite_plan, DEFAULT_IDLE_TIME}; use arroyo_datastream::default_sink; use arroyo_operator::connector::Connection; use arroyo_rpc::api_types::connections::{ @@ -29,6 +36,7 @@ use datafusion::optimizer::eliminate_limit::EliminateLimit; use datafusion::optimizer::eliminate_nested_union::EliminateNestedUnion; use datafusion::optimizer::eliminate_one_union::EliminateOneUnion; use datafusion::optimizer::eliminate_outer_join::EliminateOuterJoin; +use datafusion::optimizer::extract_equijoin_predicate::ExtractEquijoinPredicate; use datafusion::optimizer::filter_null_join_keys::FilterNullJoinKeys; use datafusion::optimizer::propagate_empty_relation::PropagateEmptyRelation; use datafusion::optimizer::push_down_filter::PushDownFilter; @@ -50,14 +58,6 @@ use datafusion::{ }, }; -use crate::extension::remote_table::RemoteTableExtension; -use crate::types::convert_data_type; -use crate::{ - external::{ProcessingMode, SqlSource}, - ArroyoSchemaProvider, -}; -use crate::{rewrite_plan, DEFAULT_IDLE_TIME}; - #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ConnectorTable { pub id: Option, @@ -124,8 +124,7 @@ fn produce_optimized_plan( Arc::new(EliminateJoin::new()), Arc::new(DecorrelatePredicateSubquery::new()), Arc::new(ScalarSubqueryToJoin::new()), - // Breaks window joins - // Arc::new(ExtractEquijoinPredicate::new()), + Arc::new(ExtractEquijoinPredicate::new()), Arc::new(SimplifyExpressions::new()), Arc::new(RewriteDisjunctivePredicate::new()), Arc::new(EliminateDuplicatedExpr::new()), diff --git a/crates/arroyo-planner/src/test/queries/updating_filter_join.sql b/crates/arroyo-planner/src/test/queries/updating_filter_join.sql new file mode 100644 index 000000000..d38040fdb --- /dev/null +++ b/crates/arroyo-planner/src/test/queries/updating_filter_join.sql @@ -0,0 +1,30 @@ +--fail=Updating joins must include an equijoin condition + +CREATE TABLE cars ( + timestamp TIMESTAMP, + car_id TEXT, + driver_id BIGINT, + event_type TEXT, + location TEXT +) WITH ( + connector = 'single_file', + path = '$input_dir/cars.json', + format = 'json', + type = 'source', + event_time_field = 'timestamp' +); + +CREATE TABLE passengers ( + timestamp TIMESTAMP, + passenger_id BIGINT +) WITH ( + connector = 'single_file', + path = '$input_dir/cars.json', + format = 'json', + type = 'source', + event_time_field = 'timestamp' +); + +select passenger_id, car_id +from passengers +join cars ON passenger_id < car_id; \ No newline at end of file diff --git a/crates/arroyo-planner/src/test/queries/windowed_inner_join.sql b/crates/arroyo-planner/src/test/queries/windowed_inner_join.sql index d37d2600b..5d0899385 100644 --- a/crates/arroyo-planner/src/test/queries/windowed_inner_join.sql +++ b/crates/arroyo-planner/src/test/queries/windowed_inner_join.sql @@ -33,4 +33,4 @@ INNER JOIN ( COUNT(distinct driver_id) as pickup_drivers FROM cars where event_type = 'pickup' GROUP BY 1 ) pickups -ON dropoffs.window.start = pickups.window.start) \ No newline at end of file +ON dropoffs.window = pickups.window) \ No newline at end of file diff --git a/crates/arroyo-sql-testing/golden_outputs/updating_inner_join.json b/crates/arroyo-sql-testing/golden_outputs/updating_inner_join.json index bb9e76da7..6d27dbf12 100644 --- a/crates/arroyo-sql-testing/golden_outputs/updating_inner_join.json +++ b/crates/arroyo-sql-testing/golden_outputs/updating_inner_join.json @@ -1,4 +1,50 @@ -{"before":null,"after":{"left_counter":1,"counter_mod_2":0,"right_count":1},"op":"c"} -{"before":null,"after":{"left_counter":1,"counter_mod_2":1,"right_count":1},"op":"c"} -{"before":null,"after":{"left_counter":2,"counter_mod_2":0,"right_count":2},"op":"c"} -{"before":{"left_counter":1,"counter_mod_2":0,"right_count":1},"after":null,"op":"d"} +{"before":null,"after":{"left_count":1,"right_count":1},"op":"c"} +{"before":null,"after":{"left_count":3,"right_count":3},"op":"c"} +{"before":null,"after":{"left_count":5,"right_count":5},"op":"c"} +{"before":null,"after":{"left_count":7,"right_count":7},"op":"c"} +{"before":null,"after":{"left_count":9,"right_count":9},"op":"c"} +{"before":null,"after":{"left_count":11,"right_count":11},"op":"c"} +{"before":null,"after":{"left_count":13,"right_count":13},"op":"c"} +{"before":null,"after":{"left_count":15,"right_count":15},"op":"c"} +{"before":null,"after":{"left_count":17,"right_count":17},"op":"c"} +{"before":null,"after":{"left_count":19,"right_count":19},"op":"c"} +{"before":null,"after":{"left_count":21,"right_count":21},"op":"c"} +{"before":null,"after":{"left_count":23,"right_count":23},"op":"c"} +{"before":null,"after":{"left_count":25,"right_count":25},"op":"c"} +{"before":null,"after":{"left_count":27,"right_count":27},"op":"c"} +{"before":null,"after":{"left_count":29,"right_count":29},"op":"c"} +{"before":null,"after":{"left_count":31,"right_count":31},"op":"c"} +{"before":null,"after":{"left_count":33,"right_count":33},"op":"c"} +{"before":null,"after":{"left_count":35,"right_count":35},"op":"c"} +{"before":null,"after":{"left_count":37,"right_count":37},"op":"c"} +{"before":null,"after":{"left_count":39,"right_count":39},"op":"c"} +{"before":null,"after":{"left_count":41,"right_count":41},"op":"c"} +{"before":null,"after":{"left_count":43,"right_count":43},"op":"c"} +{"before":null,"after":{"left_count":45,"right_count":45},"op":"c"} +{"before":null,"after":{"left_count":47,"right_count":47},"op":"c"} +{"before":null,"after":{"left_count":49,"right_count":49},"op":"c"} +{"before":null,"after":{"left_count":51,"right_count":51},"op":"c"} +{"before":null,"after":{"left_count":53,"right_count":53},"op":"c"} +{"before":null,"after":{"left_count":55,"right_count":55},"op":"c"} +{"before":null,"after":{"left_count":57,"right_count":57},"op":"c"} +{"before":null,"after":{"left_count":59,"right_count":59},"op":"c"} +{"before":null,"after":{"left_count":61,"right_count":61},"op":"c"} +{"before":null,"after":{"left_count":63,"right_count":63},"op":"c"} +{"before":null,"after":{"left_count":65,"right_count":65},"op":"c"} +{"before":null,"after":{"left_count":67,"right_count":67},"op":"c"} +{"before":null,"after":{"left_count":69,"right_count":69},"op":"c"} +{"before":null,"after":{"left_count":71,"right_count":71},"op":"c"} +{"before":null,"after":{"left_count":73,"right_count":73},"op":"c"} +{"before":null,"after":{"left_count":75,"right_count":75},"op":"c"} +{"before":null,"after":{"left_count":77,"right_count":77},"op":"c"} +{"before":null,"after":{"left_count":79,"right_count":79},"op":"c"} +{"before":null,"after":{"left_count":81,"right_count":81},"op":"c"} +{"before":null,"after":{"left_count":83,"right_count":83},"op":"c"} +{"before":null,"after":{"left_count":85,"right_count":85},"op":"c"} +{"before":null,"after":{"left_count":87,"right_count":87},"op":"c"} +{"before":null,"after":{"left_count":89,"right_count":89},"op":"c"} +{"before":null,"after":{"left_count":91,"right_count":91},"op":"c"} +{"before":null,"after":{"left_count":93,"right_count":93},"op":"c"} +{"before":null,"after":{"left_count":95,"right_count":95},"op":"c"} +{"before":null,"after":{"left_count":97,"right_count":97},"op":"c"} +{"before":null,"after":{"left_count":99,"right_count":99},"op":"c"} \ No newline at end of file diff --git a/crates/arroyo-sql-testing/src/test/queries/updating_inner_join.sql b/crates/arroyo-sql-testing/src/test/queries/updating_inner_join.sql index b9078afd6..8db556ee1 100644 --- a/crates/arroyo-sql-testing/src/test/queries/updating_inner_join.sql +++ b/crates/arroyo-sql-testing/src/test/queries/updating_inner_join.sql @@ -1,29 +1,32 @@ ---fail=Error during planning: can't handle updating right side of join CREATE TABLE impulse ( - timestamp TIMESTAMP, - counter bigint unsigned not null, - subtask_index bigint unsigned not null - ) WITH ( - connector = 'single_file', - path = '$input_dir/impulse.json', - format = 'json', - type = 'source', - event_time_field = 'timestamp' - ); + timestamp TIMESTAMP, + counter bigint unsigned not null, + subtask_index bigint unsigned not null +) WITH ( + connector = 'single_file', + path = '$input_dir/impulse.json', + format = 'json', + type = 'source', + event_time_field = 'timestamp' +); - CREATE TABLE output ( - left_counter bigint, - counter_mod_2 bigint, - right_count bigint - ) WITH ( - connector = 'single_file', - path = '$output_path', - format = 'debezium_json', - type = 'sink' - ); +CREATE VIEW impulse_odd AS ( + SELECT * FROM impulse + WHERE counter % 2 == 1 +); - INSERT INTO output - select counter as left_counter, counter_mod_2, right_count from impulse inner join - (select counter % 2 as counter_mod_2, cast(count(*) as bigint UNSIGNED) as right_count from impulse where counter < 3 group by 1) - on counter = right_count where counter < 3; \ No newline at end of file +CREATE TABLE output ( + left_count bigint, + right_count bigint +) WITH ( + connector = 'single_file', + path = '$output_path', + format = 'debezium_json', + type = 'sink' +); + +INSERT INTO output +SELECT A.counter, B.counter +FROM impulse A +JOIN impulse_odd B ON A.counter = B.counter; \ No newline at end of file diff --git a/crates/arroyo-sql-testing/src/test/queries/updating_inner_join_with_updating.sql b/crates/arroyo-sql-testing/src/test/queries/updating_inner_join_with_updating.sql new file mode 100644 index 000000000..82f93915a --- /dev/null +++ b/crates/arroyo-sql-testing/src/test/queries/updating_inner_join_with_updating.sql @@ -0,0 +1,29 @@ +--fail=Error during planning: can't handle updating right side of join +CREATE TABLE impulse ( + timestamp TIMESTAMP, + counter bigint unsigned not null, + subtask_index bigint unsigned not null + ) WITH ( + connector = 'single_file', + path = '$input_dir/impulse.json', + format = 'json', + type = 'source', + event_time_field = 'timestamp' + ); + + + CREATE TABLE output ( + left_counter bigint, + counter_mod_2 bigint, + right_count bigint + ) WITH ( + connector = 'single_file', + path = '$output_path', + format = 'debezium_json', + type = 'sink' + ); + +INSERT INTO output +select counter as left_counter, counter_mod_2, right_count from impulse inner join + (select counter % 2 as counter_mod_2, cast(count(*) as bigint UNSIGNED) as right_count from impulse where counter < 3 group by 1) + on counter = right_count where counter < 3; \ No newline at end of file diff --git a/crates/arroyo-sql-testing/src/test/queries/windowed_inner_join.sql b/crates/arroyo-sql-testing/src/test/queries/windowed_inner_join.sql index b127197ab..1a74b7384 100644 --- a/crates/arroyo-sql-testing/src/test/queries/windowed_inner_join.sql +++ b/crates/arroyo-sql-testing/src/test/queries/windowed_inner_join.sql @@ -33,4 +33,4 @@ INNER JOIN ( COUNT(distinct driver_id) as pickup_drivers FROM cars where event_type = 'pickup' GROUP BY 1 ) pickups -ON dropoffs.window.start = pickups.window.start) \ No newline at end of file +ON dropoffs.window = pickups.window) \ No newline at end of file