Skip to content

Clean up hash_join's ExecutionPlan::execute #15418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from

Conversation

ctsk
Copy link
Contributor

@ctsk ctsk commented Mar 25, 2025

Rationale for this change + What changes are included in this PR?

The logic of HashJoin's execute implements differs from the other operators - The recursive call to the execute step of the build side are delayed until collect_left_join is poll'ed. This PR changes that to make it more alike to the standard implementations of execute.

Are these changes tested?

No changes in behaviour are expected.

Are there any user-facing changes?

No

@ctsk
Copy link
Contributor Author

ctsk commented Mar 25, 2025

I think similar changes can be made to cross_join and nested_loop_join if desired.

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm thanks @ctsk

Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @ctsk. I believe we can merge this as is. However I'd like to raise one thing that comes to mind whenever I look this code. I'm not very comfortable with adding a CoalescePartitions after calling execute(). Modifying the plan post-execute() feels a bit off to me. Does it seem like a smell to you as well?

@comphead
Copy link
Contributor

Thank you @ctsk. I believe we can merge this as is. However I'd like to raise one thing that comes to mind whenever I look this code. I'm not very comfortable with adding a CoalescePartitions after calling execute(). Modifying the plan post-execute() feels a bit off to me. Does it seem like a smell to you as well?

🤔

              let left = coalesce_partitions_if_needed(Arc::clone(&self.left));
              let left_stream = left.execute(0, Arc::clone(&context))?;

I think coalesce added before execute?

However just noticed the execute currently happens in main thread instead of future::once. But it shouldn't be an issue and the stream is lazy evaluated? WDYT @berkaysynnada

@ctsk
Copy link
Contributor Author

ctsk commented Mar 28, 2025

Modifying the plan post-execute() feels a bit off to me. Does it seem like a smell to you as well?

This should not trigger for physical plans generated by datafusion, since the EnforceDistribution pass already adds that CoalescePartitionsExec.

@berkaysynnada
Copy link
Contributor

I think coalesce added before execute?

However just noticed the execute currently happens in main thread instead of future::once. But it shouldn't be an issue and the stream is lazy evaluated? WDYT @berkaysynnada

I mean HashJoin::execute()

@berkaysynnada
Copy link
Contributor

This should not trigger for physical plans generated by datafusion, since the EnforceDistribution pass already adds that CoalescePartitionsExec.

You mean coalesce_partitions_if_needed() call is redundant in datafusion? I don't think that's the case, but if it is so, why don't we remove that line?

@ctsk
Copy link
Contributor Author

ctsk commented Mar 28, 2025

You mean coalesce_partitions_if_needed() call is redundant in datafusion? I don't think that's the case, but if it is so, why don't we remove that line?

I wanted to keep the PR simple and just refactor. Here is a follow-up that removes it: #15476. Let's see if the tests pass :)

EnforceDistribution adds it here:

fn add_spm_on_top(input: DistributionContext) -> DistributionContext {
// Add SortPreservingMerge only when partition count is larger than 1.
if input.plan.output_partitioning().partition_count() > 1 {
// When there is an existing ordering, we preserve ordering
// when decreasing partitions. This will be un-done in the future
// if any of the following conditions is true
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
// - Usage of order preserving variants is not desirable
// (determined by flag `config.optimizer.bounded_order_preserving_variants`)
let should_preserve_ordering = input.plan.output_ordering().is_some();
let new_plan = if should_preserve_ordering {
Arc::new(SortPreservingMergeExec::new(
input
.plan
.output_ordering()
.unwrap_or(&LexOrdering::default())
.clone(),
Arc::clone(&input.plan),
)) as _
} else {
Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
};
DistributionContext::new(new_plan, true, vec![input])
} else {
input
}
}

add_spm_on_top gets called when the child operator has the SinglePartition requirement

match &requirement {
Distribution::SinglePartition => {
child = add_spm_on_top(child);
}

This apples to CollectLeft HJs left child:

fn required_input_distribution(&self) -> Vec<Distribution> {
match self.mode {
PartitionMode::CollectLeft => vec![
Distribution::SinglePartition,
Distribution::UnspecifiedDistribution,
],

@ctsk
Copy link
Contributor Author

ctsk commented Mar 29, 2025

Closed in favor of #15476

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants