Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Simplify streaming placeholder replacement. #14915

Merged
merged 5 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use polars_core::prelude::*;
use polars_io::predicates::{PhysicalIoExpr, StatsEvaluator};
use polars_pipe::expressions::PhysicalPipedExpr;
use polars_pipe::operators::chunks::DataChunk;
use polars_pipe::pipeline::{create_pipeline, get_dummy_operator, get_operator, PipeLine};
use polars_pipe::pipeline::{
create_pipeline, get_dummy_operator, get_operator, CallBacks, PipeLine,
};
use polars_pipe::SExecutionContext;

use crate::physical_plan::planner::{create_physical_expr, ExpressionConversionState};
Expand Down Expand Up @@ -105,27 +107,37 @@ pub(super) fn construct(
use ALogicalPlan::*;

let mut pipelines = Vec::with_capacity(tree.len());
let mut callbacks = CallBacks::new();

let is_verbose = verbose();

// first traverse the branches and nodes to determine how often a sink is
// shared
// this shared count will be used in the pipeline to determine
// First traverse the branches and nodes to determine how often a sink is
// shared.
// This shared count will be used in the pipeline to determine
// when the sink can be finalized.
let mut sink_share_count = PlHashMap::new();
let n_branches = tree.len();
if n_branches > 1 {
for branch in &tree {
for sink in branch.iter_sinks() {
let count = sink_share_count
.entry(sink.0)
.or_insert(Rc::new(RefCell::new(0u32)));
*count.borrow_mut() += 1;
for op in branch.operators_sinks.iter() {
match op {
PipelineNode::Sink(sink) => {
let count = sink_share_count
.entry(sink.0)
.or_insert(Rc::new(RefCell::new(0u32)));
*count.borrow_mut() += 1;
},
PipelineNode::RhsJoin(node) => {
let _ = callbacks.insert(*node, get_dummy_operator());
},
_ => {},
}
}
}
}

// shared sinks are stored in a cache, so that they share info
// Shared sinks are stored in a cache, so that they share state.
// If the shared sink is already in cache, that one is used.
let mut sink_cache = PlHashMap::new();
let mut final_sink = None;

Expand Down Expand Up @@ -172,8 +184,8 @@ pub(super) fn construct(
PipelineNode::RhsJoin(node) => {
operator_nodes.push(node);
jit_insert_slice(node, lp_arena, &mut sink_nodes, operator_offset);
let op = get_dummy_operator();
operators.push(op)
let op = callbacks.get(&node).unwrap().clone();
operators.push(Box::new(op))
},
}
}
Expand All @@ -182,13 +194,13 @@ pub(super) fn construct(
let pipeline = create_pipeline(
&branch.sources,
operators,
operator_nodes,
sink_nodes,
lp_arena,
expr_arena,
to_physical_piped_expr,
is_verbose,
&mut sink_cache,
&mut callbacks,
)?;
pipelines.push((execution_id, pipeline));
}
Expand Down
4 changes: 0 additions & 4 deletions crates/polars-lazy/src/physical_plan/streaming/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ impl Branch {
// so the first sink is the final one.
self.operators_sinks.iter().find_map(sink_node)
}
pub(super) fn iter_sinks(&self) -> impl Iterator<Item = Node> + '_ {
self.operators_sinks.iter().flat_map(sink_node)
}

pub(super) fn split(&self) -> Self {
Self {
execution_id: self.execution_id,
Expand Down
77 changes: 73 additions & 4 deletions crates/polars-pipe/src/executors/operators/placeholder.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,75 @@
use std::sync::{Arc, Mutex};

use polars_core::error::PolarsResult;

use crate::operators::{DataChunk, Operator, OperatorResult, PExecutionContext};

#[derive(Default)]
pub struct PlaceHolder {}
#[derive(Clone)]
struct CallBack {
inner: Arc<Mutex<Option<Box<dyn Operator>>>>,
}

impl CallBack {
fn new() -> Self {
Self {
inner: Default::default(),
}
}

fn replace(&self, op: Box<dyn Operator>) {
let mut lock = self.inner.try_lock().expect("no-contention");
*lock = Some(op);
}
}

impl Operator for CallBack {
fn execute(
&mut self,
context: &PExecutionContext,
chunk: &DataChunk,
) -> PolarsResult<OperatorResult> {
let mut lock = self.inner.try_lock().expect("no-contention");
lock.as_mut().unwrap().execute(context, chunk)
}

fn flush(&mut self) -> PolarsResult<OperatorResult> {
let mut lock = self.inner.try_lock().expect("no-contention");
lock.as_mut().unwrap().flush()
}

fn must_flush(&self) -> bool {
let lock = self.inner.try_lock().expect("no-contention");
lock.as_ref().unwrap().must_flush()
}

fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
panic!("should not be called")
}

fn fmt(&self) -> &str {
"callback"
}
}

#[derive(Clone, Default)]
pub struct PlaceHolder {
inner: Arc<Mutex<Vec<(usize, CallBack)>>>,
}

impl PlaceHolder {
pub fn new() -> Self {
Self {
inner: Arc::new(Default::default()),
}
}

pub fn replace(&self, op: Box<dyn Operator>) {
let inner = self.inner.lock().unwrap();
for (thread_no, cb) in inner.iter() {
cb.replace(op.split(*thread_no))
}
}
}

impl Operator for PlaceHolder {
fn execute(
Expand All @@ -14,8 +80,11 @@ impl Operator for PlaceHolder {
panic!("placeholder should be replaced")
}

fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
Box::new(Self {})
fn split(&self, thread_no: usize) -> Box<dyn Operator> {
let cb = CallBack::new();
let mut inner = self.inner.lock().unwrap();
inner.push((thread_no, cb.clone()));
Box::new(cb)
}

fn fmt(&self) -> &str {
Expand Down
90 changes: 2 additions & 88 deletions crates/polars-pipe/src/executors/operators/reproject.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,8 @@
use polars_core::error::PolarsResult;
use polars_core::frame::DataFrame;
use polars_core::prelude::SchemaRef;
use polars_core::schema::Schema;

use crate::operators::{DataChunk, Operator, OperatorResult, PExecutionContext, PolarsResult};

/// An operator that will ensure we keep the schema order
pub(crate) struct ReProjectOperator {
schema: SchemaRef,
operator: Box<dyn Operator>,
// cache the positions
positions: Vec<usize>,
}

impl ReProjectOperator {
pub(crate) fn new(schema: SchemaRef, operator: Box<dyn Operator>) -> Self {
ReProjectOperator {
schema,
operator,
positions: vec![],
}
}
}
use crate::operators::DataChunk;

pub(crate) fn reproject_chunk(
chunk: &mut DataChunk,
Expand Down Expand Up @@ -50,71 +32,3 @@ pub(crate) fn reproject_chunk(
*chunk = chunk.with_data(out);
Ok(())
}

impl Operator for ReProjectOperator {
fn execute(
&mut self,
context: &PExecutionContext,
chunk: &DataChunk,
) -> PolarsResult<OperatorResult> {
let (mut chunk, finished) = match self.operator.execute(context, chunk)? {
OperatorResult::Finished(chunk) => (chunk, true),
OperatorResult::HaveMoreOutPut(chunk) => (chunk, false),
OperatorResult::NeedsNewData => return Ok(OperatorResult::NeedsNewData),
};
reproject_chunk(&mut chunk, &mut self.positions, self.schema.as_ref())?;
Ok(if finished {
OperatorResult::Finished(chunk)
} else {
OperatorResult::HaveMoreOutPut(chunk)
})
}

fn split(&self, thread_no: usize) -> Box<dyn Operator> {
let operator = self.operator.split(thread_no);
Box::new(Self {
schema: self.schema.clone(),
positions: self.positions.clone(),
operator,
})
}

fn fmt(&self) -> &str {
"re-project-operator"
}
}

#[cfg(test)]
mod test {
use polars_core::prelude::*;

use super::*;

#[test]
fn test_reproject_chunk() {
let df = df![
"a" => [1, 2],
"b" => [1, 2],
"c" => [1, 2],
"d" => [1, 2],
]
.unwrap();

let mut chunk1 = DataChunk::new(0, df.clone());
let mut chunk2 = DataChunk::new(1, df);

let mut positions = vec![];

let mut out_schema = Schema::new();
out_schema.with_column("c".into(), DataType::Int32);
out_schema.with_column("b".into(), DataType::Int32);
out_schema.with_column("d".into(), DataType::Int32);
out_schema.with_column("a".into(), DataType::Int32);

reproject_chunk(&mut chunk1, &mut positions, &out_schema).unwrap();
// second call cached the positions
reproject_chunk(&mut chunk2, &mut positions, &out_schema).unwrap();
assert_eq!(&chunk1.data.schema(), &out_schema);
assert_eq!(&chunk2.data.schema(), &out_schema);
}
}
19 changes: 15 additions & 4 deletions crates/polars-pipe/src/executors/sinks/joins/cross.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use polars_ops::prelude::CrossJoin as CrossJoinTrait;
use polars_utils::arena::Node;
use smartstring::alias::String as SmartString;

use crate::executors::operators::PlaceHolder;
use crate::operators::{
chunks_to_df_unchecked, DataChunk, FinalizedSink, Operator, OperatorResult, PExecutionContext,
Sink, SinkResult,
Expand All @@ -21,15 +22,22 @@ pub struct CrossJoin {
suffix: SmartString,
swapped: bool,
node: Node,
placeholder: PlaceHolder,
}

impl CrossJoin {
pub(crate) fn new(suffix: SmartString, swapped: bool, node: Node) -> Self {
pub(crate) fn new(
suffix: SmartString,
swapped: bool,
node: Node,
placeholder: PlaceHolder,
) -> Self {
CrossJoin {
chunks: vec![],
suffix,
swapped,
node,
placeholder,
}
}
}
Expand Down Expand Up @@ -57,21 +65,24 @@ impl Sink for CrossJoin {
Box::new(Self {
suffix: self.suffix.clone(),
swapped: self.swapped,
placeholder: self.placeholder.clone(),
..Default::default()
})
}

fn finalize(&mut self, _context: &PExecutionContext) -> PolarsResult<FinalizedSink> {
// todo! share sink
Ok(FinalizedSink::Operator(Box::new(CrossJoinProbe {
let op = Box::new(CrossJoinProbe {
df: Arc::new(chunks_to_df_unchecked(std::mem::take(&mut self.chunks))),
suffix: Arc::from(self.suffix.as_ref()),
in_process_left: None,
in_process_right: None,
in_process_left_df: Default::default(),
output_names: None,
swapped: self.swapped,
})))
});
self.placeholder.replace(op);

Ok(FinalizedSink::Operator)
}

fn as_any(&mut self) -> &mut dyn Any {
Expand Down
Loading
Loading