Skip to content

Commit

Permalink
refactor: Simplify streaming placeholder replacement. (#14915)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Mar 8, 2024
1 parent e2f8188 commit 5eeae45
Show file tree
Hide file tree
Showing 15 changed files with 221 additions and 228 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use polars_core::prelude::*;
use polars_io::predicates::{PhysicalIoExpr, StatsEvaluator};
use polars_pipe::expressions::PhysicalPipedExpr;
use polars_pipe::operators::chunks::DataChunk;
use polars_pipe::pipeline::{create_pipeline, get_dummy_operator, get_operator, PipeLine};
use polars_pipe::pipeline::{
create_pipeline, get_dummy_operator, get_operator, CallBacks, PipeLine,
};
use polars_pipe::SExecutionContext;

use crate::physical_plan::planner::{create_physical_expr, ExpressionConversionState};
Expand Down Expand Up @@ -105,27 +107,37 @@ pub(super) fn construct(
use ALogicalPlan::*;

let mut pipelines = Vec::with_capacity(tree.len());
let mut callbacks = CallBacks::new();

let is_verbose = verbose();

// first traverse the branches and nodes to determine how often a sink is
// shared
// this shared count will be used in the pipeline to determine
// First traverse the branches and nodes to determine how often a sink is
// shared.
// This shared count will be used in the pipeline to determine
// when the sink can be finalized.
let mut sink_share_count = PlHashMap::new();
let n_branches = tree.len();
if n_branches > 1 {
for branch in &tree {
for sink in branch.iter_sinks() {
let count = sink_share_count
.entry(sink.0)
.or_insert(Rc::new(RefCell::new(0u32)));
*count.borrow_mut() += 1;
for op in branch.operators_sinks.iter() {
match op {
PipelineNode::Sink(sink) => {
let count = sink_share_count
.entry(sink.0)
.or_insert(Rc::new(RefCell::new(0u32)));
*count.borrow_mut() += 1;
},
PipelineNode::RhsJoin(node) => {
let _ = callbacks.insert(*node, get_dummy_operator());
},
_ => {},
}
}
}
}

// shared sinks are stored in a cache, so that they share info
// Shared sinks are stored in a cache, so that they share state.
// If the shared sink is already in cache, that one is used.
let mut sink_cache = PlHashMap::new();
let mut final_sink = None;

Expand Down Expand Up @@ -172,8 +184,8 @@ pub(super) fn construct(
PipelineNode::RhsJoin(node) => {
operator_nodes.push(node);
jit_insert_slice(node, lp_arena, &mut sink_nodes, operator_offset);
let op = get_dummy_operator();
operators.push(op)
let op = callbacks.get(&node).unwrap().clone();
operators.push(Box::new(op))
},
}
}
Expand All @@ -182,13 +194,13 @@ pub(super) fn construct(
let pipeline = create_pipeline(
&branch.sources,
operators,
operator_nodes,
sink_nodes,
lp_arena,
expr_arena,
to_physical_piped_expr,
is_verbose,
&mut sink_cache,
&mut callbacks,
)?;
pipelines.push((execution_id, pipeline));
}
Expand Down
4 changes: 0 additions & 4 deletions crates/polars-lazy/src/physical_plan/streaming/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ impl Branch {
// so the first sink is the final one.
self.operators_sinks.iter().find_map(sink_node)
}
pub(super) fn iter_sinks(&self) -> impl Iterator<Item = Node> + '_ {
self.operators_sinks.iter().flat_map(sink_node)
}

pub(super) fn split(&self) -> Self {
Self {
execution_id: self.execution_id,
Expand Down
77 changes: 73 additions & 4 deletions crates/polars-pipe/src/executors/operators/placeholder.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,75 @@
use std::sync::{Arc, Mutex};

use polars_core::error::PolarsResult;

use crate::operators::{DataChunk, Operator, OperatorResult, PExecutionContext};

#[derive(Default)]
pub struct PlaceHolder {}
#[derive(Clone)]
struct CallBack {
inner: Arc<Mutex<Option<Box<dyn Operator>>>>,
}

impl CallBack {
fn new() -> Self {
Self {
inner: Default::default(),
}
}

fn replace(&self, op: Box<dyn Operator>) {
let mut lock = self.inner.try_lock().expect("no-contention");
*lock = Some(op);
}
}

impl Operator for CallBack {
fn execute(
&mut self,
context: &PExecutionContext,
chunk: &DataChunk,
) -> PolarsResult<OperatorResult> {
let mut lock = self.inner.try_lock().expect("no-contention");
lock.as_mut().unwrap().execute(context, chunk)
}

fn flush(&mut self) -> PolarsResult<OperatorResult> {
let mut lock = self.inner.try_lock().expect("no-contention");
lock.as_mut().unwrap().flush()
}

fn must_flush(&self) -> bool {
let lock = self.inner.try_lock().expect("no-contention");
lock.as_ref().unwrap().must_flush()
}

fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
panic!("should not be called")
}

fn fmt(&self) -> &str {
"callback"
}
}

#[derive(Clone, Default)]
pub struct PlaceHolder {
inner: Arc<Mutex<Vec<(usize, CallBack)>>>,
}

impl PlaceHolder {
pub fn new() -> Self {
Self {
inner: Arc::new(Default::default()),
}
}

pub fn replace(&self, op: Box<dyn Operator>) {
let inner = self.inner.lock().unwrap();
for (thread_no, cb) in inner.iter() {
cb.replace(op.split(*thread_no))
}
}
}

impl Operator for PlaceHolder {
fn execute(
Expand All @@ -14,8 +80,11 @@ impl Operator for PlaceHolder {
panic!("placeholder should be replaced")
}

fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
Box::new(Self {})
fn split(&self, thread_no: usize) -> Box<dyn Operator> {
let cb = CallBack::new();
let mut inner = self.inner.lock().unwrap();
inner.push((thread_no, cb.clone()));
Box::new(cb)
}

fn fmt(&self) -> &str {
Expand Down
90 changes: 2 additions & 88 deletions crates/polars-pipe/src/executors/operators/reproject.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,8 @@
use polars_core::error::PolarsResult;
use polars_core::frame::DataFrame;
use polars_core::prelude::SchemaRef;
use polars_core::schema::Schema;

use crate::operators::{DataChunk, Operator, OperatorResult, PExecutionContext, PolarsResult};

/// An operator that will ensure we keep the schema order
pub(crate) struct ReProjectOperator {
schema: SchemaRef,
operator: Box<dyn Operator>,
// cache the positions
positions: Vec<usize>,
}

impl ReProjectOperator {
pub(crate) fn new(schema: SchemaRef, operator: Box<dyn Operator>) -> Self {
ReProjectOperator {
schema,
operator,
positions: vec![],
}
}
}
use crate::operators::DataChunk;

pub(crate) fn reproject_chunk(
chunk: &mut DataChunk,
Expand Down Expand Up @@ -50,71 +32,3 @@ pub(crate) fn reproject_chunk(
*chunk = chunk.with_data(out);
Ok(())
}

impl Operator for ReProjectOperator {
fn execute(
&mut self,
context: &PExecutionContext,
chunk: &DataChunk,
) -> PolarsResult<OperatorResult> {
let (mut chunk, finished) = match self.operator.execute(context, chunk)? {
OperatorResult::Finished(chunk) => (chunk, true),
OperatorResult::HaveMoreOutPut(chunk) => (chunk, false),
OperatorResult::NeedsNewData => return Ok(OperatorResult::NeedsNewData),
};
reproject_chunk(&mut chunk, &mut self.positions, self.schema.as_ref())?;
Ok(if finished {
OperatorResult::Finished(chunk)
} else {
OperatorResult::HaveMoreOutPut(chunk)
})
}

fn split(&self, thread_no: usize) -> Box<dyn Operator> {
let operator = self.operator.split(thread_no);
Box::new(Self {
schema: self.schema.clone(),
positions: self.positions.clone(),
operator,
})
}

fn fmt(&self) -> &str {
"re-project-operator"
}
}

#[cfg(test)]
mod test {
use polars_core::prelude::*;

use super::*;

#[test]
fn test_reproject_chunk() {
let df = df![
"a" => [1, 2],
"b" => [1, 2],
"c" => [1, 2],
"d" => [1, 2],
]
.unwrap();

let mut chunk1 = DataChunk::new(0, df.clone());
let mut chunk2 = DataChunk::new(1, df);

let mut positions = vec![];

let mut out_schema = Schema::new();
out_schema.with_column("c".into(), DataType::Int32);
out_schema.with_column("b".into(), DataType::Int32);
out_schema.with_column("d".into(), DataType::Int32);
out_schema.with_column("a".into(), DataType::Int32);

reproject_chunk(&mut chunk1, &mut positions, &out_schema).unwrap();
// second call cached the positions
reproject_chunk(&mut chunk2, &mut positions, &out_schema).unwrap();
assert_eq!(&chunk1.data.schema(), &out_schema);
assert_eq!(&chunk2.data.schema(), &out_schema);
}
}
19 changes: 15 additions & 4 deletions crates/polars-pipe/src/executors/sinks/joins/cross.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use polars_ops::prelude::CrossJoin as CrossJoinTrait;
use polars_utils::arena::Node;
use smartstring::alias::String as SmartString;

use crate::executors::operators::PlaceHolder;
use crate::operators::{
chunks_to_df_unchecked, DataChunk, FinalizedSink, Operator, OperatorResult, PExecutionContext,
Sink, SinkResult,
Expand All @@ -21,15 +22,22 @@ pub struct CrossJoin {
suffix: SmartString,
swapped: bool,
node: Node,
placeholder: PlaceHolder,
}

impl CrossJoin {
pub(crate) fn new(suffix: SmartString, swapped: bool, node: Node) -> Self {
pub(crate) fn new(
suffix: SmartString,
swapped: bool,
node: Node,
placeholder: PlaceHolder,
) -> Self {
CrossJoin {
chunks: vec![],
suffix,
swapped,
node,
placeholder,
}
}
}
Expand Down Expand Up @@ -57,21 +65,24 @@ impl Sink for CrossJoin {
Box::new(Self {
suffix: self.suffix.clone(),
swapped: self.swapped,
placeholder: self.placeholder.clone(),
..Default::default()
})
}

fn finalize(&mut self, _context: &PExecutionContext) -> PolarsResult<FinalizedSink> {
// todo! share sink
Ok(FinalizedSink::Operator(Box::new(CrossJoinProbe {
let op = Box::new(CrossJoinProbe {
df: Arc::new(chunks_to_df_unchecked(std::mem::take(&mut self.chunks))),
suffix: Arc::from(self.suffix.as_ref()),
in_process_left: None,
in_process_right: None,
in_process_left_df: Default::default(),
output_names: None,
swapped: self.swapped,
})))
});
self.placeholder.replace(op);

Ok(FinalizedSink::Operator)
}

fn as_any(&mut self) -> &mut dyn Any {
Expand Down
Loading

0 comments on commit 5eeae45

Please sign in to comment.