-
Notifications
You must be signed in to change notification settings - Fork 155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: Remove one redundant CopyExec for SMJ #962
Conversation
is_sort_merge: bool, | ||
plan: Arc<dyn ExecutionPlan>, | ||
) -> Arc<dyn ExecutionPlan> { | ||
if is_sort_merge { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a more general description for SMJ's property to name this variable, and then we could use this function for the duplicated code elsewhere in this file?
let copy_exec = if can_reuse_input_batch(&child) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will refactor this to reduce duplication
edit: I ran the wrong script 🤦 |
A single run of TPC-DS comparing main to this PR does not show a significant difference (~1% difference). I did not expect this to make much difference, but it does remove a redundant operator and simplified the plan, |
// DataFusion Join operators keep the input batch internally. We need | ||
// to copy the input batch to avoid the data corruption from reusing the input | ||
// batch. | ||
let left = if can_reuse_input_batch(&left) { | ||
Arc::new(CopyExec::new(left, CopyMode::UnpackOrDeepCopy)) | ||
} else { | ||
Arc::new(CopyExec::new(left, CopyMode::UnpackOrClone)) | ||
}; | ||
|
||
let right = if can_reuse_input_batch(&right) { | ||
Arc::new(CopyExec::new(right, CopyMode::UnpackOrDeepCopy)) | ||
} else { | ||
Arc::new(CopyExec::new(right, CopyMode::UnpackOrClone)) | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is specific to HashJoinExec and not to SortMergeJoinExec, so I moved it out of this code, which is common to both joins.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This a great catch, and a super clean refactor.
} else { | ||
Arc::new(CopyExec::new(child, CopyMode::UnpackOrClone)) | ||
}; | ||
let child = Self::wrap_in_copy_exec(child); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a similar comment here as you added with HashJoin on why Sort needs a CopyExec?
Which issue does this PR close?
N/A
Rationale for this change
We were injecting too many CopyExecs for SMJ. Plans looked like this:
What changes are included in this PR?
With the changes in this PR, the plan now looks like:
How are these changes tested?
Existing tests