Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rust): Add multiplexer node to new streaming engine #18241

Merged
merged 1 commit into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion crates/polars-stream/src/morsel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl SourceToken {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Morsel {
/// The data contained in this morsel.
df: DataFrame,
Expand Down Expand Up @@ -152,4 +152,8 @@ impl Morsel {
pub fn source_token(&self) -> &SourceToken {
&self.source_token
}

pub fn replace_source_token(&mut self, new_token: SourceToken) -> SourceToken {
core::mem::replace(&mut self.source_token, new_token)
}
}
1 change: 1 addition & 0 deletions crates/polars-stream/src/nodes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod in_memory_map;
pub mod in_memory_sink;
pub mod in_memory_source;
pub mod map;
pub mod multiplexer;
pub mod ordered_union;
pub mod reduce;
pub mod select;
Expand Down
195 changes: 195 additions & 0 deletions crates/polars-stream/src/nodes/multiplexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
use std::collections::VecDeque;

use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};

use super::compute_node_prelude::*;
use crate::morsel::SourceToken;

// TODO: replace this with an out-of-core buffering solution.
enum BufferedStream {
Open(VecDeque<Morsel>),
Closed,
}

impl BufferedStream {
fn new() -> Self {
Self::Open(VecDeque::new())
}
}

pub struct MultiplexerNode {
buffers: Vec<BufferedStream>,
}

impl MultiplexerNode {
pub fn new() -> Self {
Self {
buffers: Vec::default(),
}
}
}

impl ComputeNode for MultiplexerNode {
fn name(&self) -> &str {
"multiplexer"
}

fn update_state(&mut self, recv: &mut [PortState], send: &mut [PortState]) {
assert!(recv.len() == 1 && !send.is_empty());

// Initialize buffered streams, and mark those for which the receiver
// is no longer interested as closed.
self.buffers.resize_with(send.len(), BufferedStream::new);
for (s, b) in send.iter().zip(&mut self.buffers) {
if *s == PortState::Done {
*b = BufferedStream::Closed;
}
}

// Check if either the input is done, or all outputs are done.
let input_done = recv[0] == PortState::Done
&& self.buffers.iter().all(|b| match b {
BufferedStream::Open(v) => v.is_empty(),
BufferedStream::Closed => true,
});
let output_done = send.iter().all(|p| *p == PortState::Done);

// If either side is done, everything is done.
if input_done || output_done {
recv[0] = PortState::Done;
for s in send {
*s = PortState::Done;
}
return;
}

let all_blocked = send.iter().all(|p| *p == PortState::Blocked);

// Pass along the input state to the output.
for s in send {
*s = recv[0];
}

// We say we are ready to receive unless all outputs are blocked.
recv[0] = if all_blocked {
PortState::Blocked
} else {
PortState::Ready
};
}

fn spawn<'env, 's>(
&'env mut self,
scope: &'s TaskScope<'s, 'env>,
recv: &mut [Option<RecvPort<'_>>],
send: &mut [Option<SendPort<'_>>],
_state: &'s ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
assert!(recv.len() == 1 && !send.is_empty());
assert!(self.buffers.len() == send.len());

enum Listener<'a> {
Active(UnboundedSender<Morsel>),
Buffering(&'a mut VecDeque<Morsel>),
Inactive,
}

let buffered_source_token = SourceToken::new();

let (mut buf_senders, buf_receivers): (Vec<_>, Vec<_>) = self
.buffers
.iter_mut()
.enumerate()
.map(|(port_idx, buffer)| {
if let BufferedStream::Open(buf) = buffer {
if send[port_idx].is_some() {
// TODO: replace with a bounded channel and store data
// out-of-core beyond a certain size.
let (rx, tx) = unbounded_channel();
(Listener::Active(rx), Some((buf, tx)))
} else {
(Listener::Buffering(buf), None)
}
} else {
(Listener::Inactive, None)
}
})
.unzip();

// TODO: parallel multiplexing.
if let Some(mut receiver) = recv[0].take().map(|r| r.serial()) {
let buffered_source_token = buffered_source_token.clone();
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
loop {
let Ok(morsel) = receiver.recv().await else {
break;
};

let mut anyone_interested = false;
let mut active_listener_interested = false;
for buf_sender in &mut buf_senders {
match buf_sender {
Listener::Active(s) => match s.send(morsel.clone()) {
Ok(_) => {
anyone_interested = true;
active_listener_interested = true;
},
Err(_) => *buf_sender = Listener::Inactive,
},
Listener::Buffering(b) => {
// Make sure to count buffered morsels as
// consumed to not block the source.
let mut m = morsel.clone();
m.take_consume_token();
b.push_front(m);
anyone_interested = true;
},
Listener::Inactive => {},
}
}

if !anyone_interested {
break;
}

// If only buffering inputs are left, or we got a stop
// request from an input reading from old buffered data,
// request a stop from the source.
if !active_listener_interested || buffered_source_token.stop_requested() {
morsel.source_token().stop();
}
}

Ok(())
}));
}

for (send_port, opt_buf_recv) in send.iter_mut().zip(buf_receivers) {
if let Some((buf, mut rx)) = opt_buf_recv {
let mut sender = send_port.take().unwrap().serial();

let buffered_source_token = buffered_source_token.clone();
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
// First we try to flush all the old buffered data.
while let Some(mut morsel) = buf.pop_back() {
morsel.replace_source_token(buffered_source_token.clone());
if sender.send(morsel).await.is_err()
|| buffered_source_token.stop_requested()
{
break;
}
}

// Then send along data from the multiplexer.
while let Some(morsel) = rx.recv().await {
if sender.send(morsel).await.is_err() {
break;
}
}
Ok(())
}));
}
}
}
}
5 changes: 5 additions & 0 deletions crates/polars-stream/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,9 @@ pub enum PhysNode {
/// which case they are broadcast.
null_extend: bool,
},

#[allow(unused)]
Multiplexer {
input: PhysNodeKey,
},
}
6 changes: 6 additions & 0 deletions crates/polars-stream/src/physical_plan/to_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ fn to_graph_rec<'a>(
input_keys,
)
},

Multiplexer { input } => {
let input_key = to_graph_rec(*input, ctx)?;
ctx.graph
.add_node(nodes::multiplexer::MultiplexerNode::new(), [input_key])
},
};

ctx.phys_to_graph.insert(phys_node_key, graph_key);
Expand Down