Skip to content

Commit 57eeb64

Browse files
authored
[ARROW-12441] [DataFusion] Cross join implementation (#11)
* Cross join implementation * Add to ballista, debug line * Add to tpch test, format * Simplify a bit * Row-by-row processing for the left side to keep memory down * Fix * Fmt * Clippy * Fix doc, don't include as much debug info in memoryexec debug * Use join * Fix doc * Add test cases with partitions * Make clear that mutex is locked for very short amount of time * Unwrap the lock
1 parent 395d9d6 commit 57eeb64

File tree

17 files changed

+479
-26
lines changed

17 files changed

+479
-26
lines changed

ballista/rust/core/src/serde/logical_plan/to_proto.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -940,6 +940,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
940940
}
941941
LogicalPlan::Extension { .. } => unimplemented!(),
942942
LogicalPlan::Union { .. } => unimplemented!(),
943+
LogicalPlan::CrossJoin { .. } => unimplemented!(),
943944
}
944945
}
945946
}

benchmarks/src/bin/tpch.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,6 +1374,11 @@ mod tests {
13741374
run_query(6).await
13751375
}
13761376

1377+
#[tokio::test]
1378+
async fn run_q9() -> Result<()> {
1379+
run_query(9).await
1380+
}
1381+
13771382
#[tokio::test]
13781383
async fn run_q10() -> Result<()> {
13791384
run_query(10).await

datafusion/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,9 @@ DataFusion also includes a simple command-line interactive SQL utility. See the
213213
- [ ] MINUS
214214
- [x] Joins
215215
- [x] INNER JOIN
216-
- [ ] CROSS JOIN
216+
- [x] LEFT JOIN
217+
- [x] RIGHT JOIN
218+
- [x] CROSS JOIN
217219
- [ ] OUTER JOIN
218220
- [ ] Window
219221

datafusion/src/logical_plan/builder.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,16 @@ impl LogicalPlanBuilder {
270270
}))
271271
}
272272
}
273+
/// Apply a cross join
274+
pub fn cross_join(&self, right: &LogicalPlan) -> Result<Self> {
275+
let schema = self.plan.schema().join(right.schema())?;
276+
277+
Ok(Self::from(&LogicalPlan::CrossJoin {
278+
left: Arc::new(self.plan.clone()),
279+
right: Arc::new(right.clone()),
280+
schema: DFSchemaRef::new(schema),
281+
}))
282+
}
273283

274284
/// Repartition
275285
pub fn repartition(&self, partitioning_scheme: Partitioning) -> Result<Self> {

datafusion/src/logical_plan/plan.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,15 @@ pub enum LogicalPlan {
113113
/// The output schema, containing fields from the left and right inputs
114114
schema: DFSchemaRef,
115115
},
116+
/// Apply Cross Join to two logical plans
117+
CrossJoin {
118+
/// Left input
119+
left: Arc<LogicalPlan>,
120+
/// Right input
121+
right: Arc<LogicalPlan>,
122+
/// The output schema, containing fields from the left and right inputs
123+
schema: DFSchemaRef,
124+
},
116125
/// Repartition the plan based on a partitioning scheme.
117126
Repartition {
118127
/// The incoming logical plan
@@ -203,6 +212,7 @@ impl LogicalPlan {
203212
LogicalPlan::Aggregate { schema, .. } => &schema,
204213
LogicalPlan::Sort { input, .. } => input.schema(),
205214
LogicalPlan::Join { schema, .. } => &schema,
215+
LogicalPlan::CrossJoin { schema, .. } => &schema,
206216
LogicalPlan::Repartition { input, .. } => input.schema(),
207217
LogicalPlan::Limit { input, .. } => input.schema(),
208218
LogicalPlan::CreateExternalTable { schema, .. } => &schema,
@@ -229,6 +239,11 @@ impl LogicalPlan {
229239
right,
230240
schema,
231241
..
242+
}
243+
| LogicalPlan::CrossJoin {
244+
left,
245+
right,
246+
schema,
232247
} => {
233248
let mut schemas = left.all_schemas();
234249
schemas.extend(right.all_schemas());
@@ -290,8 +305,9 @@ impl LogicalPlan {
290305
| LogicalPlan::EmptyRelation { .. }
291306
| LogicalPlan::Limit { .. }
292307
| LogicalPlan::CreateExternalTable { .. }
293-
| LogicalPlan::Explain { .. } => vec![],
294-
LogicalPlan::Union { .. } => {
308+
| LogicalPlan::CrossJoin { .. }
309+
| LogicalPlan::Explain { .. }
310+
| LogicalPlan::Union { .. } => {
295311
vec![]
296312
}
297313
}
@@ -307,6 +323,7 @@ impl LogicalPlan {
307323
LogicalPlan::Aggregate { input, .. } => vec![input],
308324
LogicalPlan::Sort { input, .. } => vec![input],
309325
LogicalPlan::Join { left, right, .. } => vec![left, right],
326+
LogicalPlan::CrossJoin { left, right, .. } => vec![left, right],
310327
LogicalPlan::Limit { input, .. } => vec![input],
311328
LogicalPlan::Extension { node } => node.inputs(),
312329
LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
@@ -396,7 +413,8 @@ impl LogicalPlan {
396413
LogicalPlan::Repartition { input, .. } => input.accept(visitor)?,
397414
LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?,
398415
LogicalPlan::Sort { input, .. } => input.accept(visitor)?,
399-
LogicalPlan::Join { left, right, .. } => {
416+
LogicalPlan::Join { left, right, .. }
417+
| LogicalPlan::CrossJoin { left, right, .. } => {
400418
left.accept(visitor)? && right.accept(visitor)?
401419
}
402420
LogicalPlan::Union { inputs, .. } => {
@@ -669,6 +687,9 @@ impl LogicalPlan {
669687
keys.iter().map(|(l, r)| format!("{} = {}", l, r)).collect();
670688
write!(f, "Join: {}", join_expr.join(", "))
671689
}
690+
LogicalPlan::CrossJoin { .. } => {
691+
write!(f, "CrossJoin:")
692+
}
672693
LogicalPlan::Repartition {
673694
partitioning_scheme,
674695
..

datafusion/src/optimizer/constant_folding.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ impl OptimizerRule for ConstantFolding {
7272
| LogicalPlan::Explain { .. }
7373
| LogicalPlan::Limit { .. }
7474
| LogicalPlan::Union { .. }
75-
| LogicalPlan::Join { .. } => {
75+
| LogicalPlan::Join { .. }
76+
| LogicalPlan::CrossJoin { .. } => {
7677
// apply the optimization to all inputs of the plan
7778
let inputs = plan.inputs();
7879
let new_inputs = inputs

datafusion/src/optimizer/filter_push_down.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,8 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
314314
.collect::<HashSet<_>>();
315315
issue_filters(state, used_columns, plan)
316316
}
317-
LogicalPlan::Join { left, right, .. } => {
317+
LogicalPlan::Join { left, right, .. }
318+
| LogicalPlan::CrossJoin { left, right, .. } => {
318319
let (pushable_to_left, pushable_to_right, keep) =
319320
get_join_predicates(&state, &left.schema(), &right.schema());
320321

datafusion/src/optimizer/hash_build_probe_order.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
6767
// we cannot predict the cardinality of the join output
6868
None
6969
}
70+
LogicalPlan::CrossJoin { left, right, .. } => {
71+
// number of rows is equal to num_left * num_right
72+
get_num_rows(left).and_then(|x| get_num_rows(right).map(|y| x * y))
73+
}
7074
LogicalPlan::Repartition { .. } => {
7175
// we cannot predict how rows will be repartitioned
7276
None
@@ -138,6 +142,29 @@ impl OptimizerRule for HashBuildProbeOrder {
138142
})
139143
}
140144
}
145+
LogicalPlan::CrossJoin {
146+
left,
147+
right,
148+
schema,
149+
} => {
150+
let left = self.optimize(left)?;
151+
let right = self.optimize(right)?;
152+
if should_swap_join_order(&left, &right) {
153+
// Swap left and right
154+
Ok(LogicalPlan::CrossJoin {
155+
left: Arc::new(right),
156+
right: Arc::new(left),
157+
schema: schema.clone(),
158+
})
159+
} else {
160+
// Keep join as is
161+
Ok(LogicalPlan::CrossJoin {
162+
left: Arc::new(left),
163+
right: Arc::new(right),
164+
schema: schema.clone(),
165+
})
166+
}
167+
}
141168
// Rest: recurse into plan, apply optimization where possible
142169
LogicalPlan::Projection { .. }
143170
| LogicalPlan::Aggregate { .. }

datafusion/src/optimizer/projection_push_down.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ fn optimize_plan(
270270
| LogicalPlan::Sort { .. }
271271
| LogicalPlan::CreateExternalTable { .. }
272272
| LogicalPlan::Union { .. }
273+
| LogicalPlan::CrossJoin { .. }
273274
| LogicalPlan::Extension { .. } => {
274275
let expr = plan.expressions();
275276
// collect all required columns by this plan

datafusion/src/optimizer/utils.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,11 @@ pub fn from_plan(
208208
on: on.clone(),
209209
schema: schema.clone(),
210210
}),
211+
LogicalPlan::CrossJoin { schema, .. } => Ok(LogicalPlan::CrossJoin {
212+
left: Arc::new(inputs[0].clone()),
213+
right: Arc::new(inputs[1].clone()),
214+
schema: schema.clone(),
215+
}),
211216
LogicalPlan::Limit { n, .. } => Ok(LogicalPlan::Limit {
212217
n: *n,
213218
input: Arc::new(inputs[0].clone()),

0 commit comments

Comments
 (0)