Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Streaming physical writes for native executor #2992

Merged
merged 18 commits into from
Oct 31, 2024
Prev Previous commit
Next Next commit
Merge branch main into colin/streaming-physical-writes
  • Loading branch information
Colin Ho authored and Colin Ho committed Oct 31, 2024
commit 92c4d03cf32d68f584d5f98df84abe8dede53fbe
8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ python = [
"common-daft-config/python",
"common-system-info/python",
"common-display/python",
"common-resource-request/python"
"common-resource-request/python",
"dep:pyo3",
"dep:pyo3-log"
]

[lib]
Expand Down Expand Up @@ -151,7 +153,9 @@ members = [
"src/daft-sketch",
"src/daft-sql",
"src/daft-writers",
"src/hyperloglog"
"src/daft-table",
"src/hyperloglog",
"src/parquet2"
]

[workspace.dependencies]
Expand Down
7 changes: 4 additions & 3 deletions src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use daft_core::{
use daft_dsl::{col, join::get_common_join_keys, Expr};
use daft_micropartition::MicroPartition;
use daft_physical_plan::{
Concat, EmptyScan, Explode, Filter, HashAggregate, HashJoin, InMemoryScan, Limit,
LocalPhysicalPlan, PhysicalWrite, Pivot, Project, Sample, Sort, UnGroupedAggregate, Unpivot,
ActorPoolProject, Concat, EmptyScan, Explode, Filter, HashAggregate, HashJoin, InMemoryScan,
Limit, LocalPhysicalPlan, PhysicalWrite, Pivot, Project, Sample, Sort, UnGroupedAggregate,
Unpivot,
};
use daft_plan::{populate_aggregation_stages, JoinType};
use daft_table::ProbeState;
Expand Down Expand Up @@ -136,7 +137,7 @@ pub fn physical_plan_to_pipeline(
input, projection, ..
}) => {
let proj_op = ActorPoolProjectOperator::new(projection.clone());
let child_node = physical_plan_to_pipeline(input, psets)?;
let child_node = physical_plan_to_pipeline(input, psets, cfg)?;
IntermediateNode::new(Arc::new(proj_op), vec![child_node]).boxed()
}
LocalPhysicalPlan::Sample(Sample {
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.