Skip to content

Commit c97e550

Browse files
committed
fix join column handling logic for On and Using constraints
1 parent 27dc5d6 commit c97e550

File tree

20 files changed

+613
-338
lines changed

20 files changed

+613
-338
lines changed

ballista/rust/core/proto/ballista.proto

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -378,12 +378,18 @@ enum JoinType {
378378
ANTI = 5;
379379
}
380380

381+
enum JoinConstraint {
382+
ON = 0;
383+
USING = 1;
384+
}
385+
381386
message JoinNode {
382387
LogicalPlanNode left = 1;
383388
LogicalPlanNode right = 2;
384389
JoinType join_type = 3;
385-
repeated Column left_join_column = 4;
386-
repeated Column right_join_column = 5;
390+
JoinConstraint join_constraint = 4;
391+
repeated Column left_join_column = 5;
392+
repeated Column right_join_column = 6;
387393
}
388394

389395
message LimitNode {
@@ -570,7 +576,7 @@ message HashJoinExecNode {
570576
PhysicalPlanNode right = 2;
571577
repeated JoinOn on = 3;
572578
JoinType join_type = 4;
573-
579+
JoinConstraint join_constraint = 5;
574580
}
575581

576582
message PhysicalColumn {

ballista/rust/core/src/serde/logical_plan/from_proto.rs

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ use datafusion::logical_plan::window_frames::{
2626
};
2727
use datafusion::logical_plan::{
2828
abs, acos, asin, atan, ceil, cos, exp, floor, ln, log10, log2, round, signum, sin,
29-
sqrt, tan, trunc, Column, DFField, DFSchema, Expr, JoinType, LogicalPlan,
30-
LogicalPlanBuilder, Operator,
29+
sqrt, tan, trunc, Column, DFField, DFSchema, Expr, JoinConstraint, JoinType,
30+
LogicalPlan, LogicalPlanBuilder, Operator,
3131
};
3232
use datafusion::physical_plan::aggregates::AggregateFunction;
3333
use datafusion::physical_plan::csv::CsvReadOptions;
@@ -257,23 +257,34 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
257257
join.join_type
258258
))
259259
})?;
260-
let join_type = match join_type {
261-
protobuf::JoinType::Inner => JoinType::Inner,
262-
protobuf::JoinType::Left => JoinType::Left,
263-
protobuf::JoinType::Right => JoinType::Right,
264-
protobuf::JoinType::Full => JoinType::Full,
265-
protobuf::JoinType::Semi => JoinType::Semi,
266-
protobuf::JoinType::Anti => JoinType::Anti,
267-
};
268-
LogicalPlanBuilder::from(convert_box_required!(join.left)?)
269-
.join(
260+
let join_constraint = protobuf::JoinConstraint::from_i32(
261+
join.join_constraint,
262+
)
263+
.ok_or_else(|| {
264+
proto_error(format!(
265+
"Received a JoinNode message with unknown JoinConstraint {}",
266+
join.join_constraint
267+
))
268+
})?;
269+
270+
let builder =
271+
LogicalPlanBuilder::from(&convert_box_required!(join.left)?);
272+
273+
let builder = match join_constraint.into() {
274+
JoinConstraint::On => builder.join(
270275
&convert_box_required!(join.right)?,
271-
join_type,
276+
join_type.into(),
272277
left_keys,
273278
right_keys,
274-
)?
275-
.build()
276-
.map_err(|e| e.into())
279+
)?,
280+
JoinConstraint::Using => builder.join_using(
281+
&convert_box_required!(join.right)?,
282+
join_type.into(),
283+
left_keys,
284+
)?,
285+
};
286+
287+
builder.build().map_err(|e| e.into())
277288
}
278289
}
279290
}

ballista/rust/core/src/serde/logical_plan/to_proto.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUn
2626
use datafusion::datasource::CsvFile;
2727
use datafusion::logical_plan::{
2828
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
29-
Column, Expr, JoinType, LogicalPlan,
29+
Column, Expr, JoinConstraint, JoinType, LogicalPlan,
3030
};
3131
use datafusion::physical_plan::aggregates::AggregateFunction;
3232
use datafusion::physical_plan::functions::BuiltinScalarFunction;
@@ -804,26 +804,23 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
804804
right,
805805
on,
806806
join_type,
807+
join_constraint,
807808
..
808809
} => {
809810
let left: protobuf::LogicalPlanNode = left.as_ref().try_into()?;
810811
let right: protobuf::LogicalPlanNode = right.as_ref().try_into()?;
811-
let join_type = match join_type {
812-
JoinType::Inner => protobuf::JoinType::Inner,
813-
JoinType::Left => protobuf::JoinType::Left,
814-
JoinType::Right => protobuf::JoinType::Right,
815-
JoinType::Full => protobuf::JoinType::Full,
816-
JoinType::Semi => protobuf::JoinType::Semi,
817-
JoinType::Anti => protobuf::JoinType::Anti,
818-
};
819812
let (left_join_column, right_join_column) =
820813
on.iter().map(|(l, r)| (l.into(), r.into())).unzip();
814+
let join_type: protobuf::JoinType = join_type.to_owned().into();
815+
let join_constraint: protobuf::JoinConstraint =
816+
join_constraint.to_owned().into();
821817
Ok(protobuf::LogicalPlanNode {
822818
logical_plan_type: Some(LogicalPlanType::Join(Box::new(
823819
protobuf::JoinNode {
824820
left: Some(Box::new(left)),
825821
right: Some(Box::new(right)),
826822
join_type: join_type.into(),
823+
join_constraint: join_constraint.into(),
827824
left_join_column,
828825
right_join_column,
829826
},

ballista/rust/core/src/serde/mod.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
2121
use std::{convert::TryInto, io::Cursor};
2222

23-
use datafusion::logical_plan::Operator;
23+
use datafusion::logical_plan::{JoinConstraint, JoinType, Operator};
2424
use datafusion::physical_plan::aggregates::AggregateFunction;
2525
use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
2626

@@ -291,3 +291,47 @@ impl Into<datafusion::arrow::datatypes::DataType> for protobuf::PrimitiveScalarT
291291
}
292292
}
293293
}
294+
295+
impl From<protobuf::JoinType> for JoinType {
296+
fn from(t: protobuf::JoinType) -> Self {
297+
match t {
298+
protobuf::JoinType::Inner => JoinType::Inner,
299+
protobuf::JoinType::Left => JoinType::Left,
300+
protobuf::JoinType::Right => JoinType::Right,
301+
protobuf::JoinType::Full => JoinType::Full,
302+
protobuf::JoinType::Semi => JoinType::Semi,
303+
protobuf::JoinType::Anti => JoinType::Anti,
304+
}
305+
}
306+
}
307+
308+
impl From<JoinType> for protobuf::JoinType {
309+
fn from(t: JoinType) -> Self {
310+
match t {
311+
JoinType::Inner => protobuf::JoinType::Inner,
312+
JoinType::Left => protobuf::JoinType::Left,
313+
JoinType::Right => protobuf::JoinType::Right,
314+
JoinType::Full => protobuf::JoinType::Full,
315+
JoinType::Semi => protobuf::JoinType::Semi,
316+
JoinType::Anti => protobuf::JoinType::Anti,
317+
}
318+
}
319+
}
320+
321+
impl From<protobuf::JoinConstraint> for JoinConstraint {
322+
fn from(t: protobuf::JoinConstraint) -> Self {
323+
match t {
324+
protobuf::JoinConstraint::On => JoinConstraint::On,
325+
protobuf::JoinConstraint::Using => JoinConstraint::Using,
326+
}
327+
}
328+
}
329+
330+
impl From<JoinConstraint> for protobuf::JoinConstraint {
331+
fn from(t: JoinConstraint) -> Self {
332+
match t {
333+
JoinConstraint::On => protobuf::JoinConstraint::On,
334+
JoinConstraint::Using => protobuf::JoinConstraint::Using,
335+
}
336+
}
337+
}

ballista/rust/core/src/serde/physical_plan/from_proto.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ use datafusion::catalog::catalog::{
3535
use datafusion::execution::context::{
3636
ExecutionConfig, ExecutionContextState, ExecutionProps,
3737
};
38-
use datafusion::logical_plan::{window_frames::WindowFrame, DFSchema, Expr};
38+
use datafusion::logical_plan::{
39+
window_frames::WindowFrame, DFSchema, Expr, JoinConstraint, JoinType,
40+
};
3941
use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
4042
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
4143
use datafusion::physical_plan::hash_join::PartitionMode;
@@ -57,7 +59,6 @@ use datafusion::physical_plan::{
5759
filter::FilterExec,
5860
functions::{self, BuiltinScalarFunction, ScalarFunctionExpr},
5961
hash_join::HashJoinExec,
60-
hash_utils::JoinType,
6162
limit::{GlobalLimitExec, LocalLimitExec},
6263
parquet::ParquetExec,
6364
projection::ProjectionExec,
@@ -348,19 +349,22 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
348349
hashjoin.join_type
349350
))
350351
})?;
351-
let join_type = match join_type {
352-
protobuf::JoinType::Inner => JoinType::Inner,
353-
protobuf::JoinType::Left => JoinType::Left,
354-
protobuf::JoinType::Right => JoinType::Right,
355-
protobuf::JoinType::Full => JoinType::Full,
356-
protobuf::JoinType::Semi => JoinType::Semi,
357-
protobuf::JoinType::Anti => JoinType::Anti,
358-
};
352+
353+
let join_constraint =
354+
protobuf::JoinConstraint::from_i32(hashjoin.join_constraint)
355+
.ok_or_else(|| {
356+
proto_error(format!(
357+
"Received a HashJoinNode message with unknown JoinConstraint {}",
358+
hashjoin.join_constraint,
359+
))
360+
})?;
361+
359362
Ok(Arc::new(HashJoinExec::try_new(
360363
left,
361364
right,
362365
on,
363-
&join_type,
366+
&join_type.into(),
367+
join_constraint.into(),
364368
PartitionMode::CollectLeft,
365369
)?))
366370
}

ballista/rust/core/src/serde/physical_plan/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,14 @@ mod roundtrip_tests {
2727
compute::kernels::sort::SortOptions,
2828
datatypes::{DataType, Field, Schema},
2929
},
30-
logical_plan::Operator,
30+
logical_plan::{JoinConstraint, JoinType, Operator},
3131
physical_plan::{
3232
empty::EmptyExec,
3333
expressions::{binary, col, lit, InListExpr, NotExpr},
3434
expressions::{Avg, Column, PhysicalSortExpr},
3535
filter::FilterExec,
3636
hash_aggregate::{AggregateMode, HashAggregateExec},
3737
hash_join::{HashJoinExec, PartitionMode},
38-
hash_utils::JoinType,
3938
limit::{GlobalLimitExec, LocalLimitExec},
4039
sort::SortExec,
4140
AggregateExpr, ColumnarValue, Distribution, ExecutionPlan, Partitioning,
@@ -93,6 +92,7 @@ mod roundtrip_tests {
9392
Arc::new(EmptyExec::new(false, Arc::new(schema_right))),
9493
on,
9594
&JoinType::Inner,
95+
JoinConstraint::On,
9696
PartitionMode::CollectLeft,
9797
)?))
9898
}

ballista/rust/core/src/serde/physical_plan/to_proto.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use std::{
2626
sync::Arc,
2727
};
2828

29+
use datafusion::logical_plan::{JoinConstraint, JoinType};
2930
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
3031
use datafusion::physical_plan::csv::CsvExec;
3132
use datafusion::physical_plan::expressions::{
@@ -35,7 +36,6 @@ use datafusion::physical_plan::expressions::{CastExpr, TryCastExpr};
3536
use datafusion::physical_plan::filter::FilterExec;
3637
use datafusion::physical_plan::hash_aggregate::AggregateMode;
3738
use datafusion::physical_plan::hash_join::HashJoinExec;
38-
use datafusion::physical_plan::hash_utils::JoinType;
3939
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
4040
use datafusion::physical_plan::parquet::ParquetExec;
4141
use datafusion::physical_plan::projection::ProjectionExec;
@@ -135,21 +135,17 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
135135
}),
136136
})
137137
.collect();
138-
let join_type = match exec.join_type() {
139-
JoinType::Inner => protobuf::JoinType::Inner,
140-
JoinType::Left => protobuf::JoinType::Left,
141-
JoinType::Right => protobuf::JoinType::Right,
142-
JoinType::Full => protobuf::JoinType::Full,
143-
JoinType::Semi => protobuf::JoinType::Semi,
144-
JoinType::Anti => protobuf::JoinType::Anti,
145-
};
138+
let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
139+
let join_constraint: protobuf::JoinConstraint = exec.join_constraint().into();
140+
146141
Ok(protobuf::PhysicalPlanNode {
147142
physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
148143
protobuf::HashJoinExecNode {
149144
left: Some(Box::new(left)),
150145
right: Some(Box::new(right)),
151146
on,
152147
join_type: join_type.into(),
148+
join_constraint: join_constraint.into(),
153149
},
154150
))),
155151
})

benchmarks/queries/q7.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,4 @@ group by
3636
order by
3737
supp_nation,
3838
cust_nation,
39-
l_year;
39+
l_year;

0 commit comments

Comments
 (0)