Skip to content

Commit 702a5a0

Browse files
committed
handling join column expansion during USING JOIN planning
get rid of shared field and move column expansion logic into plan builder and optimizer.
1 parent c97e550 commit 702a5a0

File tree

17 files changed

+481
-421
lines changed

17 files changed

+481
-421
lines changed

ballista/rust/core/proto/ballista.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,6 @@ message HashJoinExecNode {
576576
PhysicalPlanNode right = 2;
577577
repeated JoinOn on = 3;
578578
JoinType join_type = 4;
579-
JoinConstraint join_constraint = 5;
580579
}
581580

582581
message PhysicalColumn {

ballista/rust/core/src/serde/logical_plan/from_proto.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,9 +267,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
267267
))
268268
})?;
269269

270-
let builder =
271-
LogicalPlanBuilder::from(&convert_box_required!(join.left)?);
272-
270+
let builder = LogicalPlanBuilder::from(convert_box_required!(join.left)?);
273271
let builder = match join_constraint.into() {
274272
JoinConstraint::On => builder.join(
275273
&convert_box_required!(join.right)?,

ballista/rust/core/src/serde/physical_plan/from_proto.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -350,21 +350,11 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
350350
))
351351
})?;
352352

353-
let join_constraint =
354-
protobuf::JoinConstraint::from_i32(hashjoin.join_constraint)
355-
.ok_or_else(|| {
356-
proto_error(format!(
357-
"Received a HashJoinNode message with unknown JoinConstraint {}",
358-
hashjoin.join_constraint,
359-
))
360-
})?;
361-
362353
Ok(Arc::new(HashJoinExec::try_new(
363354
left,
364355
right,
365356
on,
366357
&join_type.into(),
367-
join_constraint.into(),
368358
PartitionMode::CollectLeft,
369359
)?))
370360
}

ballista/rust/core/src/serde/physical_plan/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ mod roundtrip_tests {
2727
compute::kernels::sort::SortOptions,
2828
datatypes::{DataType, Field, Schema},
2929
},
30-
logical_plan::{JoinConstraint, JoinType, Operator},
30+
logical_plan::{JoinType, Operator},
3131
physical_plan::{
3232
empty::EmptyExec,
3333
expressions::{binary, col, lit, InListExpr, NotExpr},
@@ -92,7 +92,6 @@ mod roundtrip_tests {
9292
Arc::new(EmptyExec::new(false, Arc::new(schema_right))),
9393
on,
9494
&JoinType::Inner,
95-
JoinConstraint::On,
9695
PartitionMode::CollectLeft,
9796
)?))
9897
}

ballista/rust/core/src/serde/physical_plan/to_proto.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
136136
})
137137
.collect();
138138
let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
139-
let join_constraint: protobuf::JoinConstraint = exec.join_constraint().into();
140139

141140
Ok(protobuf::PhysicalPlanNode {
142141
physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
@@ -145,7 +144,6 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
145144
right: Some(Box::new(right)),
146145
on,
147146
join_type: join_type.into(),
148-
join_constraint: join_constraint.into(),
149147
},
150148
))),
151149
})

datafusion/src/logical_plan/builder.rs

Lines changed: 18 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ use crate::logical_plan::{
4040
columnize_expr, normalize_col, normalize_cols, Column, DFField, DFSchema,
4141
DFSchemaRef, Partitioning,
4242
};
43-
use std::collections::HashSet;
4443

4544
/// Default table name for unnamed table
4645
pub const UNNAMED_TABLE: &str = "?table?";
@@ -213,7 +212,6 @@ impl LogicalPlanBuilder {
213212
/// * An invalid expression is used (e.g. a `sort` expression)
214213
pub fn project(&self, expr: impl IntoIterator<Item = Expr>) -> Result<Self> {
215214
let input_schema = self.plan.schema();
216-
let all_schemas = self.plan.all_schemas();
217215
let mut projected_expr = vec![];
218216
for e in expr {
219217
match e {
@@ -223,10 +221,8 @@ impl LogicalPlanBuilder {
223221
.push(Expr::Column(input_schema.field(i).qualified_column()))
224222
});
225223
}
226-
_ => projected_expr.push(columnize_expr(
227-
normalize_col(e, &all_schemas)?,
228-
input_schema,
229-
)),
224+
_ => projected_expr
225+
.push(columnize_expr(normalize_col(e, &self.plan)?, input_schema)),
230226
}
231227
}
232228

@@ -243,7 +239,7 @@ impl LogicalPlanBuilder {
243239

244240
/// Apply a filter
245241
pub fn filter(&self, expr: Expr) -> Result<Self> {
246-
let expr = normalize_col(expr, &self.plan.all_schemas())?;
242+
let expr = normalize_col(expr, &self.plan)?;
247243
Ok(Self::from(LogicalPlan::Filter {
248244
predicate: expr,
249245
input: Arc::new(self.plan.clone()),
@@ -260,9 +256,8 @@ impl LogicalPlanBuilder {
260256

261257
/// Apply a sort
262258
pub fn sort(&self, exprs: impl IntoIterator<Item = Expr>) -> Result<Self> {
263-
let schemas = self.plan.all_schemas();
264259
Ok(Self::from(LogicalPlan::Sort {
265-
expr: normalize_cols(exprs, &schemas)?,
260+
expr: normalize_cols(exprs, &self.plan)?,
266261
input: Arc::new(self.plan.clone()),
267262
}))
268263
}
@@ -288,20 +283,15 @@ impl LogicalPlanBuilder {
288283

289284
let left_keys: Vec<Column> = left_keys
290285
.into_iter()
291-
.map(|c| c.into().normalize(&self.plan.all_schemas()))
286+
.map(|c| c.into().normalize(&self.plan))
292287
.collect::<Result<_>>()?;
293288
let right_keys: Vec<Column> = right_keys
294289
.into_iter()
295-
.map(|c| c.into().normalize(&right.all_schemas()))
290+
.map(|c| c.into().normalize(right))
296291
.collect::<Result<_>>()?;
297292
let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys.into_iter()).collect();
298-
let join_schema = build_join_schema(
299-
self.plan.schema(),
300-
right.schema(),
301-
&on,
302-
&join_type,
303-
&JoinConstraint::On,
304-
)?;
293+
let join_schema =
294+
build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
305295

306296
Ok(Self::from(LogicalPlan::Join {
307297
left: Arc::new(self.plan.clone()),
@@ -323,21 +313,16 @@ impl LogicalPlanBuilder {
323313
let left_keys: Vec<Column> = using_keys
324314
.clone()
325315
.into_iter()
326-
.map(|c| c.into().normalize(&self.plan.all_schemas()))
316+
.map(|c| c.into().normalize(&self.plan))
327317
.collect::<Result<_>>()?;
328318
let right_keys: Vec<Column> = using_keys
329319
.into_iter()
330-
.map(|c| c.into().normalize(&right.all_schemas()))
320+
.map(|c| c.into().normalize(right))
331321
.collect::<Result<_>>()?;
332322

333323
let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys.into_iter()).collect();
334-
let join_schema = build_join_schema(
335-
self.plan.schema(),
336-
right.schema(),
337-
&on,
338-
&join_type,
339-
&JoinConstraint::Using,
340-
)?;
324+
let join_schema =
325+
build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
341326

342327
Ok(Self::from(LogicalPlan::Join {
343328
left: Arc::new(self.plan.clone()),
@@ -390,9 +375,8 @@ impl LogicalPlanBuilder {
390375
group_expr: impl IntoIterator<Item = Expr>,
391376
aggr_expr: impl IntoIterator<Item = Expr>,
392377
) -> Result<Self> {
393-
let schemas = self.plan.all_schemas();
394-
let group_expr = normalize_cols(group_expr, &schemas)?;
395-
let aggr_expr = normalize_cols(aggr_expr, &schemas)?;
378+
let group_expr = normalize_cols(group_expr, &self.plan)?;
379+
let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
396380
let all_expr = group_expr.iter().chain(aggr_expr.iter());
397381

398382
validate_unique_names("Aggregations", all_expr.clone(), self.plan.schema())?;
@@ -436,53 +420,14 @@ impl LogicalPlanBuilder {
436420
pub fn build_join_schema(
437421
left: &DFSchema,
438422
right: &DFSchema,
439-
on: &[(Column, Column)],
440423
join_type: &JoinType,
441-
join_constraint: &JoinConstraint,
442424
) -> Result<DFSchema> {
443425
let fields: Vec<DFField> = match join_type {
444426
JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => {
445-
match join_constraint {
446-
JoinConstraint::On => {
447-
let right_fields = right.fields().iter();
448-
let left_fields = left.fields().iter();
449-
// left then right
450-
left_fields.chain(right_fields).cloned().collect()
451-
}
452-
JoinConstraint::Using => {
453-
// using join requires unique join column in the output schema, so we mark all
454-
// right join keys as duplicate
455-
let duplicate_join_names =
456-
on.iter().map(|on| &on.1.name).collect::<HashSet<_>>();
457-
458-
let right_fields = right
459-
.fields()
460-
.iter()
461-
.filter(|f| !duplicate_join_names.contains(f.name()))
462-
.cloned();
463-
464-
let left_fields = left.fields().iter().map(|f| {
465-
for key in on.iter() {
466-
// update qualifiers for shared fields
467-
if duplicate_join_names.contains(f.name()) {
468-
let mut hs = HashSet::new();
469-
if let Some(q) = &key.0.relation {
470-
hs.insert(q.to_string());
471-
}
472-
if let Some(q) = &key.1.relation {
473-
hs.insert(q.to_string());
474-
}
475-
return f.clone().set_shared_qualifiers(hs);
476-
}
477-
}
478-
479-
f.clone()
480-
});
481-
482-
// left then right
483-
left_fields.chain(right_fields).collect()
484-
}
485-
}
427+
let right_fields = right.fields().iter();
428+
let left_fields = left.fields().iter();
429+
// left then right
430+
left_fields.chain(right_fields).cloned().collect()
486431
}
487432
JoinType::Semi | JoinType::Anti => {
488433
// Only use the left side for the schema

datafusion/src/logical_plan/dfschema.rs

Lines changed: 11 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,6 @@ impl DFSchema {
5757
field.qualified_name()
5858
)));
5959
}
60-
} else if let Some(shared_qualifiers) = field.shared_qualifiers() {
61-
for qualifier in shared_qualifiers {
62-
if !qualified_names.insert((qualifier, field.name())) {
63-
return Err(DataFusionError::Plan(format!(
64-
"Schema contains duplicate qualified field name '{}'",
65-
field.qualified_name()
66-
)));
67-
}
68-
}
6960
} else if !unqualified_names.insert(field.name()) {
7061
return Err(DataFusionError::Plan(format!(
7162
"Schema contains duplicate unqualified field name '{}'",
@@ -170,18 +161,8 @@ impl DFSchema {
170161
// current field is qualified and not shared between relations, compare both
171162
// qualifer and name.
172163
(Some(q), Some(field_q)) => q == field_q && field.name() == name,
173-
// field to lookup is qualified.
174-
// current field is either unqualified or qualified and shared between relations.
175-
(Some(q), None) => {
176-
if let Some(shared_q) = field.shared_qualifiers() {
177-
// current field is a shared qualified field, check for all shared
178-
// relation names.
179-
shared_q.contains(q) && field.name() == name
180-
} else {
181-
// current field is unqualifiied
182-
false
183-
}
184-
}
164+
// field to lookup is qualified but current field is unqualified.
165+
(Some(_), None) => false,
185166
// field to lookup is unqualified, no need to compare qualifier
186167
_ => field.name() == name,
187168
})
@@ -222,13 +203,17 @@ impl DFSchema {
222203
}
223204
}
224205

225-
/// Find the field with the given name
226-
pub fn field_with_unqualified_name(&self, name: &str) -> Result<&DFField> {
227-
let matches: Vec<&DFField> = self
228-
.fields
206+
/// Find all fields match the given name
207+
pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> {
208+
self.fields
229209
.iter()
230210
.filter(|field| field.name() == name)
231-
.collect();
211+
.collect()
212+
}
213+
214+
/// Find the field with the given name
215+
pub fn field_with_unqualified_name(&self, name: &str) -> Result<&DFField> {
216+
let matches = self.fields_with_unqualified_name(name);
232217
match matches.len() {
233218
0 => Err(DataFusionError::Plan(format!(
234219
"No field with unqualified name '{}'. Valid fields are {}.",
@@ -417,9 +402,6 @@ impl Display for DFSchema {
417402
pub struct DFField {
418403
/// Optional qualifier (usually a table or relation name)
419404
qualifier: Option<String>,
420-
/// Optional set of qualifiers that all share this same field. This is used for `JOIN USING`
421-
/// clause where the join keys are combined into a shared column.
422-
shared_qualifiers: Option<HashSet<String>>,
423405
/// Arrow field definition
424406
field: Field,
425407
}
@@ -434,7 +416,6 @@ impl DFField {
434416
) -> Self {
435417
DFField {
436418
qualifier: qualifier.map(|s| s.to_owned()),
437-
shared_qualifiers: None,
438419
field: Field::new(name, data_type, nullable),
439420
}
440421
}
@@ -443,7 +424,6 @@ impl DFField {
443424
pub fn from(field: Field) -> Self {
444425
Self {
445426
qualifier: None,
446-
shared_qualifiers: None,
447427
field,
448428
}
449429
}
@@ -452,7 +432,6 @@ impl DFField {
452432
pub fn from_qualified(qualifier: &str, field: Field) -> Self {
453433
Self {
454434
qualifier: Some(qualifier.to_owned()),
455-
shared_qualifiers: None,
456435
field,
457436
}
458437
}
@@ -502,11 +481,6 @@ impl DFField {
502481
self.qualifier.as_ref()
503482
}
504483

505-
/// Get the optional qualifier
506-
pub fn shared_qualifiers(&self) -> Option<&HashSet<String>> {
507-
self.shared_qualifiers.as_ref()
508-
}
509-
510484
/// Get the arrow field
511485
pub fn field(&self) -> &Field {
512486
&self.field
@@ -517,13 +491,6 @@ impl DFField {
517491
self.qualifier = None;
518492
self
519493
}
520-
521-
/// Return field with shared qualifiers set and qualifier stripped
522-
pub fn set_shared_qualifiers(mut self, shared_qualifiers: HashSet<String>) -> Self {
523-
self.qualifier = None;
524-
self.shared_qualifiers = Some(shared_qualifiers);
525-
self
526-
}
527494
}
528495

529496
#[cfg(test)]

0 commit comments

Comments
 (0)