Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Compute pool for native executor #2986

Merged
merged 15 commits into from
Oct 23, 2024
Prev Previous commit
Next Next commit
nits
  • Loading branch information
Colin Ho authored and Colin Ho committed Oct 9, 2024
commit fa8e8bb89a9bb655b8b83b5321a48d8896b77bf1
Original file line number Diff line number Diff line change
Expand Up @@ -102,28 +102,22 @@ impl IntermediateOperator for AntiSemiProbeOperator {
input: &PipelineResultType,
state: &mut dyn IntermediateOperatorState,
) -> DaftResult<IntermediateOperatorResult> {
match idx {
0 => {
let state = state
.as_any_mut()
.downcast_mut::<AntiSemiProbeState>()
.expect("AntiSemiProbeOperator state should be AntiSemiProbeState");
let (probe_table, _) = input.as_probe_table();
state.set_table(probe_table);
Ok(IntermediateOperatorResult::NeedMoreInput(None))
}
_ => {
let state = state
.as_any_mut()
.downcast_mut::<AntiSemiProbeState>()
.expect("AntiSemiProbeOperator state should be AntiSemiProbeState");
let input = input.as_data();
let out = match self.join_type {
JoinType::Semi | JoinType::Anti => self.probe_anti_semi(input, state),
_ => unreachable!("Only Semi and Anti joins are supported"),
}?;
Ok(IntermediateOperatorResult::NeedMoreInput(Some(out)))
}
let state = state
.as_any_mut()
.downcast_mut::<AntiSemiProbeState>()
.expect("AntiSemiProbeOperator state should be AntiSemiProbeState");

if idx == 0 {
let (probe_table, _) = input.as_probe_table();
state.set_table(probe_table);
Ok(IntermediateOperatorResult::NeedMoreInput(None))
} else {
let input = input.as_data();
let out = match self.join_type {
JoinType::Semi | JoinType::Anti => self.probe_anti_semi(input, state),
_ => unreachable!("Only Semi and Anti joins are supported"),
}?;
Ok(IntermediateOperatorResult::NeedMoreInput(Some(out)))
}
}

Expand Down
46 changes: 21 additions & 25 deletions src/daft-local-execution/src/intermediate_ops/hash_join_probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,31 +233,27 @@ impl IntermediateOperator for HashJoinProbeOperator {
input: &PipelineResultType,
state: &mut dyn IntermediateOperatorState,
) -> DaftResult<IntermediateOperatorResult> {
match idx {
0 => {
let state = state
.as_any_mut()
.downcast_mut::<HashJoinProbeState>()
.expect("HashJoinProbeOperator state should be HashJoinProbeState");
let (probe_table, tables) = input.as_probe_table();
state.set_table(probe_table, tables);
Ok(IntermediateOperatorResult::NeedMoreInput(None))
}
_ => {
let state = state
.as_any_mut()
.downcast_mut::<HashJoinProbeState>()
.expect("HashJoinProbeOperator state should be HashJoinProbeState");
let input = input.as_data();
let out = match self.join_type {
JoinType::Inner => self.probe_inner(input, state),
JoinType::Left | JoinType::Right => self.probe_left_right(input, state),
_ => {
unimplemented!("Only Inner, Left, and Right joins are supported in HashJoinProbeOperator")
}
}?;
Ok(IntermediateOperatorResult::NeedMoreInput(Some(out)))
}
let state = state
.as_any_mut()
.downcast_mut::<HashJoinProbeState>()
.expect("HashJoinProbeOperator state should be HashJoinProbeState");

if idx == 0 {
let (probe_table, tables) = input.as_probe_table();
state.set_table(probe_table, tables);
Ok(IntermediateOperatorResult::NeedMoreInput(None))
} else {
let input = input.as_data();
let out = match self.join_type {
JoinType::Inner => self.probe_inner(input, state),
JoinType::Left | JoinType::Right => self.probe_left_right(input, state),
_ => {
unimplemented!(
"Only Inner, Left, and Right joins are supported in HashJoinProbeOperator"
)
}
}?;
Ok(IntermediateOperatorResult::NeedMoreInput(Some(out)))
}
}

Expand Down
Loading