Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion timely/examples/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn main() {
timely::execute_from_args(std::env::args().skip(2), move |worker| {

worker.dataflow(move |scope| {
let (handle, stream) = scope.feedback::<usize>(1);
let (handle, stream) = scope.feedback::<Vec<usize>>(1);
stream.unary_notify(
Pipeline,
"Barrier",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
//! Create cycles in a timely dataflow graph.

use crate::{Container, Data};
use crate::Container;

use crate::progress::{Timestamp, PathSummary};
use crate::progress::frontier::Antichain;
use crate::order::Product;

use crate::dataflow::channels::pushers::Tee;
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::{StreamCore, Scope, Stream};
use crate::dataflow::{StreamCore, Scope};
use crate::dataflow::scopes::child::Iterative;
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
use crate::dataflow::operators::generic::OutputWrapper;

/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
/// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`.
pub trait Feedback<G: Scope> {
/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.

/// Creates a [StreamCore] and a [Handle] to later bind the source of that `StreamCore`.
///
/// The resulting `Stream` will have its data defined by a future call to `connect_loop` with
/// its `Handle` passed as an argument. Data passed through the stream will have their
/// The resulting `StreamCore` will have its data defined by a future call to `connect_loop` with
/// its `Handle` passed as an argument. Containers passed through the stream will have their
/// timestamps advanced by `summary`.
///
/// # Examples
Expand All @@ -36,38 +37,15 @@ pub trait Feedback<G: Scope> {
/// .connect_loop(handle);
/// });
/// ```
fn feedback<D: Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, D>, Stream<G, D>);

/// Creates a [StreamCore] and a [HandleCore] to later bind the source of that `Stream`.
///
/// The resulting `Stream` will have its data defined by a future call to `connect_loop` with
/// its `Handle` passed as an argument. Data passed through the stream will have their
/// timestamps advanced by `summary`, and will be dropped if the result exceeds `limit`.
///
/// # Examples
/// ```
/// use timely::dataflow::Scope;
/// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
///
/// timely::example(|scope| {
/// // circulate 0..10 for 100 iterations.
/// let (handle, cycle) = scope.feedback_core::<Vec<_>>(1);
/// (0..10).to_stream(scope)
/// .concat(&cycle)
/// .inspect(|x| println!("seen: {:?}", x))
/// .branch_when(|t| t < &100).1
/// .connect_loop(handle);
/// });
/// ```
fn feedback_core<C: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (HandleCore<G, C>, StreamCore<G, C>);
fn feedback<C: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, C>, StreamCore<G, C>);
}

/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
/// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`.
pub trait LoopVariable<'a, G: Scope, T: Timestamp> {
/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
/// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`.
///
/// The resulting `Stream` will have its data defined by a future call to `connect_loop` with
/// its `Handle` passed as an argument. Data passed through the stream will have their
/// The resulting `StreamCore` will have its data defined by a future call to `connect_loop` with
/// its `Handle` passed as an argument. Containers passed through the stream will have their
/// timestamps advanced by `summary`.
///
/// # Examples
Expand All @@ -87,26 +65,23 @@ pub trait LoopVariable<'a, G: Scope, T: Timestamp> {
/// });
/// });
/// ```
fn loop_variable<C: Container>(&mut self, summary: T::Summary) -> (HandleCore<Iterative<'a, G, T>, C>, StreamCore<Iterative<'a, G, T>, C>);
fn loop_variable<C: Container>(&mut self, summary: T::Summary) -> (Handle<Iterative<'a, G, T>, C>, StreamCore<Iterative<'a, G, T>, C>);
}

impl<G: Scope> Feedback<G> for G {
fn feedback<D: Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, D>, Stream<G, D>) {
self.feedback_core(summary)
}

fn feedback_core<C: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (HandleCore<G, C>, StreamCore<G, C>) {
fn feedback<C: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, C>, StreamCore<G, C>) {

let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone());
let (output, stream) = builder.new_output();

(HandleCore { builder, summary, output }, stream)
(Handle { builder, summary, output }, stream)
}
}

impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> {
fn loop_variable<C: Container>(&mut self, summary: T::Summary) -> (HandleCore<Iterative<'a, G, T>, C>, StreamCore<Iterative<'a, G, T>, C>) {
self.feedback_core(Product::new(Default::default(), summary))
fn loop_variable<C: Container>(&mut self, summary: T::Summary) -> (Handle<Iterative<'a, G, T>, C>, StreamCore<Iterative<'a, G, T>, C>) {
self.feedback(Product::new(Default::default(), summary))
}
}

Expand All @@ -129,15 +104,15 @@ pub trait ConnectLoop<G: Scope, C: Container> {
/// .connect_loop(handle);
/// });
/// ```
fn connect_loop(&self, _: HandleCore<G, C>);
fn connect_loop(&self, handle: Handle<G, C>);
}

impl<G: Scope, C: Container> ConnectLoop<G, C> for StreamCore<G, C> {
fn connect_loop(&self, helper: HandleCore<G, C>) {
fn connect_loop(&self, handle: Handle<G, C>) {

let mut builder = helper.builder;
let summary = helper.summary;
let mut output = helper.output;
let mut builder = handle.builder;
let summary = handle.summary;
let mut output = handle.output;

let mut input = builder.new_input_connection(self, Pipeline, vec![Antichain::from_elem(summary.clone())]);

Expand All @@ -159,11 +134,8 @@ impl<G: Scope, C: Container> ConnectLoop<G, C> for StreamCore<G, C> {

/// A handle used to bind the source of a loop variable.
#[derive(Debug)]
pub struct HandleCore<G: Scope, C: Container> {
pub struct Handle<G: Scope, C: Container> {
builder: OperatorBuilder<G>,
summary: <G::Timestamp as Timestamp>::Summary,
output: OutputWrapper<G::Timestamp, C, Tee<G::Timestamp, C>>,
}

/// A `HandleCore` specialized for using `Vec` as container
pub type Handle<G, D> = HandleCore<G, Vec<D>>;
7 changes: 7 additions & 0 deletions timely/src/dataflow/operators/core/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
//! Extension traits for `Stream` implementing various operators that
//! are independent of specific container types.

pub mod concat;
pub mod exchange;
pub mod feedback;
pub mod inspect;
pub mod probe;
pub mod rc;
pub mod reclock;

pub use concat::{Concat, Concatenate};
pub use exchange::Exchange;
pub use feedback::{Feedback, LoopVariable, ConnectLoop};
pub use inspect::{Inspect, InspectCore};
pub use probe::Probe;
pub use reclock::Reclock;
11 changes: 4 additions & 7 deletions timely/src/dataflow/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@
pub use self::enterleave::{Enter, EnterAt, Leave};
pub use self::input::Input;
pub use self::unordered_input::{UnorderedInput, UnorderedInputCore};
pub use self::feedback::{Feedback, LoopVariable, ConnectLoop};
pub use self::concat::{Concat, Concatenate};
pub use self::partition::Partition;
pub use self::map::Map;
pub use self::inspect::{Inspect, InspectCore};
pub use self::filter::Filter;
pub use self::delay::Delay;
pub use self::exchange::Exchange;
pub use self::broadcast::Broadcast;
pub use self::probe::Probe;
pub use self::to_stream::{ToStream, ToStreamCore};
pub use self::capture::Capture;
pub use self::branch::{Branch, BranchWhen};
Expand All @@ -39,21 +36,21 @@ pub mod enterleave;
pub mod input;
pub mod flow_controlled;
pub mod unordered_input;
pub mod feedback;
pub mod concat;
pub use self::core::feedback::{self, Feedback, LoopVariable, ConnectLoop};
pub use self::core::concat::{self, Concat, Concatenate};
pub mod partition;
pub mod map;
pub use self::core::inspect;
pub mod filter;
pub mod delay;
pub use self::core::exchange;
pub mod broadcast;
pub mod probe;
pub use self::core::probe::{self, Probe};
pub mod to_stream;
pub mod capture;
pub mod branch;
pub mod ok_err;
pub mod rc;
pub use self::core::rc;
pub mod result;

pub mod aggregation;
Expand Down
2 changes: 1 addition & 1 deletion timely/tests/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn barrier_sync_helper(comm_config: ::timely::CommunicationConfig) {
};
timely::execute(config, move |worker| {
worker.dataflow(move |scope| {
let (handle, stream) = scope.feedback::<u64>(1);
let (handle, stream) = scope.feedback::<Vec<usize>>(1);
stream.unary_notify(
Pipeline,
"Barrier",
Expand Down