Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions communication/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde::{Serialize, Deserialize};
/// Configuration information about a communication thread.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
pub struct CommunicationSetup {
/// True when this is a send thread (or the receive thread).
/// `true` when this is a send thread (or the receive thread).
pub sender: bool,
/// The process id of the thread.
pub process: usize,
Expand All @@ -25,7 +25,7 @@ pub enum CommunicationEvent {
/// An observed message.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
pub struct MessageEvent {
/// true for send event, false for receive event
/// `true` for send event, `false` for receive event
pub is_send: bool,
/// associated message header.
pub header: crate::networking::MessageHeader,
Expand Down
8 changes: 4 additions & 4 deletions communication/src/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ impl MessageHeader {

/// Creates socket connections from a list of host addresses.
///
/// The item at index i in the resulting vec, is a Some(TcpSocket) to process i, except
/// for item `my_index` which is None (no socket to self).
/// The item at index `i` in the resulting vec, is a `Some(TcpSocket)` to process `i`, except
/// for item `my_index` which is `None` (no socket to self).
pub fn create_sockets(addresses: Vec<String>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {

let hosts1 = Arc::new(addresses);
Expand All @@ -108,7 +108,7 @@ pub fn create_sockets(addresses: Vec<String>, my_index: usize, noisy: bool) -> R
}


/// Result contains connections [0, my_index - 1].
/// Result contains connections `[0, my_index - 1]`.
pub fn start_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
let results = addresses.iter().take(my_index).enumerate().map(|(index, address)| {
loop {
Expand All @@ -131,7 +131,7 @@ pub fn start_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bo
Ok(results)
}

/// Result contains connections [my_index + 1, addresses.len() - 1].
/// Result contains connections `[my_index + 1, addresses.len() - 1]`.
pub fn await_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
let mut results: Vec<_> = (0..(addresses.len() - my_index - 1)).map(|_| None).collect();
let listener = TcpListener::bind(&addresses[my_index][..])?;
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/branch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{Container, Data};
pub trait Branch<S: Scope, D: Data> {
/// Takes one input stream and splits it into two output streams.
/// For each record, the supplied closure is called with a reference to
/// the data and its time. If it returns true, the record will be sent
/// the data and its time. If it returns `true`, the record will be sent
/// to the second returned stream, otherwise it will be sent to the first.
///
/// If the result of the closure only depends on the time, not the data,
Expand Down Expand Up @@ -73,7 +73,7 @@ impl<S: Scope, D: Data> Branch<S, D> for Stream<S, D> {
/// Extension trait for `Stream`.
pub trait BranchWhen<T>: Sized {
/// Takes one input stream and splits it into two output streams.
/// For each time, the supplied closure is called. If it returns true,
/// For each time, the supplied closure is called. If it returns `true`,
/// the records for that will be sent to the second returned stream, otherwise
/// they will be sent to the first.
///
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/core/capture/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ pub trait Replay<T: Timestamp, C> : Sized {
fn replay_into<S: Scope<Timestamp=T>>(self, scope: &mut S) -> StreamCore<S, C> {
self.replay_core(scope, Some(std::time::Duration::new(0, 0)))
}
/// Replays `self` into the provided scope, as a `StreamCore<S, C>'.
/// Replays `self` into the provided scope, as a `StreamCore<S, C>`.
///
/// The `period` argument allows the specification of a re-activation period, where the operator
/// will re-activate itself every so often. The `None` argument instructs the operator not to
/// re-activate itself.us
/// re-activate itself.
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> StreamCore<S, C>;
}

Expand Down
6 changes: 3 additions & 3 deletions timely/src/dataflow/operators/core/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ pub struct Handle<T:Timestamp> {
}

impl<T: Timestamp> Handle<T> {
/// returns true iff the frontier is strictly less than `time`.
/// Returns `true` iff the frontier is strictly less than `time`.
#[inline] pub fn less_than(&self, time: &T) -> bool { self.frontier.borrow().less_than(time) }
/// returns true iff the frontier is less than or equal to `time`.
/// Returns `true` iff the frontier is less than or equal to `time`.
#[inline] pub fn less_equal(&self, time: &T) -> bool { self.frontier.borrow().less_equal(time) }
/// returns true iff the frontier is empty.
/// Returns `true` iff the frontier is empty.
#[inline] pub fn done(&self) -> bool { self.frontier.borrow().is_empty() }
/// Allocates a new handle.
#[inline] pub fn new() -> Self { Handle { frontier: Rc::new(RefCell::new(MutableAntichain::new())) } }
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/flow_controlled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ pub struct IteratorSourceInput<T: Clone, D: Data, DI: IntoIterator<Item=D>, I: I
}

/// Construct a source that repeatedly calls the provided function to ingest input.
/// - The function can return None to signal the end of the input;
/// - otherwise, it should return a `IteratorSourceInput`, where:
/// - The function can return `None` to signal the end of the input;
/// - otherwise, it should return a [`IteratorSourceInput`], where:
/// * `lower_bound` is a lower bound on timestamps that can be emitted by this input in the future,
/// `Default::default()` can be used if this isn't needed (the source will assume that
/// the timestamps in `data` are monotonically increasing and will release capabilities
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/scopes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl<A: Allocate> ScopeParent for crate::worker::Worker<A> {
/// The fundamental operations required to add and connect operators in a timely dataflow graph.
///
/// Importantly, this is often a *shared* object, backed by a `Rc<RefCell<>>` wrapper. Each method
/// takes a shared reference, but can be thought of as first calling .clone() and then calling the
/// takes a shared reference, but can be thought of as first calling `.clone()` and then calling the
/// method. Each method does not hold the `RefCell`'s borrow, and should prevent accidental panics.
pub trait Scope: ScopeParent {
/// A useful name describing the scope.
Expand Down
10 changes: 5 additions & 5 deletions timely/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ pub enum StartStop {
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
/// Operator start or stop.
pub struct ScheduleEvent {
/// Worker-unique identifier for the operator, linkable to the identifiers in `OperatesEvent`.
/// Worker-unique identifier for the operator, linkable to the identifiers in [`OperatesEvent`].
pub id: usize,
/// `Start` if the operator is starting, `Stop` if it is stopping.
/// activity is true if it looks like some useful work was performed during this call (data was
Expand All @@ -200,7 +200,7 @@ impl ScheduleEvent {
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
/// Operator shutdown.
pub struct ShutdownEvent {
/// Worker-unique identifier for the operator, linkable to the identifiers in `OperatesEvent`.
/// Worker-unique identifier for the operator, linkable to the identifiers in [`OperatesEvent`].
pub id: usize,
}

Expand All @@ -209,21 +209,21 @@ pub struct ShutdownEvent {
pub struct ApplicationEvent {
/// Unique event type identifier
pub id: usize,
/// True when activity begins, false when it stops
/// `true` when activity begins, `false` when it stops
pub is_start: bool,
}

#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
/// Application-defined code start or stop
pub struct GuardedMessageEvent {
/// True when activity begins, false when it stops
/// `true` when activity begins, `false` when it stops
pub is_start: bool,
}

#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
/// Application-defined code start or stop
pub struct GuardedProgressEvent {
/// True when activity begins, false when it stops
/// `true` when activity begins, `false` when it stops
pub is_start: bool,
}

Expand Down
4 changes: 2 additions & 2 deletions timely/src/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
/// of that trait precludes a distinct `Ord` implementation. We need an independent
/// trait if we want to have a partially ordered type that can also be sorted.
pub trait PartialOrder<Rhs: ?Sized = Self>: PartialEq<Rhs> {
/// Returns true iff one element is strictly less than the other.
/// Returns `true` iff one element is strictly less than the other.
fn less_than(&self, other: &Rhs) -> bool {
self.less_equal(other) && self != other
}
/// Returns true iff one element is less than or equal to the other.
/// Returns `true` iff one element is less than or equal to the other.
fn less_equal(&self, other: &Rhs) -> bool;
}

Expand Down
4 changes: 2 additions & 2 deletions timely/src/progress/change_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl<T, const X: usize> ChangeBatch<T, X> {
}
}

/// Returns true if the change batch is not guaranteed compact.
/// Returns `true` if the change batch is not guaranteed compact.
pub fn is_dirty(&self) -> bool {
self.updates.len() > self.clean
}
Expand Down Expand Up @@ -206,7 +206,7 @@ where
self.updates.drain(..)
}

/// True iff all keys have value zero.
/// Returns `true` iff all keys have value zero.
///
/// This method requires mutable access to `self` because it may need to compact the representation
/// to determine if the batch of updates is indeed empty. We could also implement a weaker form of
Expand Down
24 changes: 12 additions & 12 deletions timely/src/progress/frontier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct Antichain<T> {
impl<T: PartialOrder> Antichain<T> {
/// Updates the `Antichain` if the element is not greater than or equal to some present element.
///
/// Returns true if element is added to the set
/// Returns `true` if element is added to the set
///
/// # Examples
///
Expand All @@ -48,7 +48,7 @@ impl<T: PartialOrder> Antichain<T> {

/// Updates the `Antichain` if the element is not greater than or equal to some present element.
///
/// Returns true if element is added to the set
/// Returns `true` if element is added to the set
///
/// Accepts a reference to an element, which is cloned when inserting.
///
Expand Down Expand Up @@ -76,7 +76,7 @@ impl<T: PartialOrder> Antichain<T> {
/// If the antichain needs updating, it uses the `to_owned` closure to convert the element into
/// a `T`.
///
/// Returns true if element is added to the set
/// Returns `true` if element is added to the set
///
/// # Examples
///
Expand All @@ -103,7 +103,7 @@ impl<T: PartialOrder> Antichain<T> {
self.elements.reserve(additional);
}

/// Performs a sequence of insertion and return true iff any insertion does.
/// Performs a sequence of insertion and returns `true` iff any insertion does.
///
/// # Examples
///
Expand All @@ -123,7 +123,7 @@ impl<T: PartialOrder> Antichain<T> {
added
}

/// Returns true if any item in the antichain is strictly less than the argument.
/// Returns `true` if any item in the antichain is strictly less than the argument.
///
/// # Examples
///
Expand All @@ -143,7 +143,7 @@ impl<T: PartialOrder> Antichain<T> {
self.elements.iter().any(|x| x.less_than(time))
}

/// Returns true if any item in the antichain is less than or equal to the argument.
/// Returns `true` if any item in the antichain is less than or equal to the argument.
///
/// # Examples
///
Expand All @@ -163,7 +163,7 @@ impl<T: PartialOrder> Antichain<T> {
self.elements.iter().any(|x| x.less_equal(time))
}

/// Returns true if every element of `other` is greater or equal to some element of `self`.
/// Returns `true` if every element of `other` is greater or equal to some element of `self`.
#[deprecated(since="0.12.0", note="please use `PartialOrder::less_equal` instead")]
#[inline]
pub fn dominates(&self, other: &Antichain<T>) -> bool {
Expand Down Expand Up @@ -455,7 +455,7 @@ impl<T> MutableAntichain<T> {
}
}

/// Returns true if there are no elements in the `MutableAntichain`.
/// Returns `true` if there are no elements in the `MutableAntichain`.
///
/// # Examples
///
Expand All @@ -470,7 +470,7 @@ impl<T> MutableAntichain<T> {
self.frontier.is_empty()
}

/// Returns true if any item in the `MutableAntichain` is strictly less than the argument.
/// Returns `true` if any item in the `MutableAntichain` is strictly less than the argument.
///
/// # Examples
///
Expand All @@ -490,7 +490,7 @@ impl<T> MutableAntichain<T> {
self.frontier().less_than(time)
}

/// Returns true if any item in the `MutableAntichain` is less than or equal to the argument.
/// Returns `true` if any item in the `MutableAntichain` is less than or equal to the argument.
///
/// # Examples
///
Expand Down Expand Up @@ -714,7 +714,7 @@ impl<'a, T: 'a> AntichainRef<'a, T> {

impl<T> AntichainRef<'_, T> {

/// Returns true if any item in the `AntichainRef` is strictly less than the argument.
/// Returns `true` if any item in the `AntichainRef` is strictly less than the argument.
///
/// # Examples
///
Expand All @@ -731,7 +731,7 @@ impl<T> AntichainRef<'_, T> {
self.iter().any(|x| x.less_than(time))
}

/// Returns true if any item in the `AntichainRef` is less than or equal to the argument.
/// Returns `true` if any item in the `AntichainRef` is less than or equal to the argument.
#[inline]
///
/// # Examples
Expand Down
2 changes: 1 addition & 1 deletion timely/src/progress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub mod subgraph;
pub struct Location {
/// A scope-local operator identifier.
pub node: usize,
/// An operator port identifier.`
/// An operator port identifier.
pub port: Port,
}

Expand Down
6 changes: 3 additions & 3 deletions timely/src/progress/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,12 +446,12 @@ impl<T: Timestamp> PortInformation<T> {
}
}

/// True if updates at this pointstamp uniquely block progress.
/// Returns `true` if updates at this pointstamp uniquely block progress.
///
/// This method returns true if the currently maintained pointstamp
/// This method returns `true` if the currently maintained pointstamp
/// counts are such that zeroing out outstanding updates at *this*
/// pointstamp would change the frontiers at this operator. When the
/// method returns false it means that, temporarily at least, there
/// method returns `false` it means that, temporarily at least, there
/// are outstanding pointstamp updates that are strictly less than
/// this pointstamp.
#[inline]
Expand Down
4 changes: 2 additions & 2 deletions timely/src/progress/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub trait PathSummary<T> : Clone+'static+Eq+PartialOrder+Debug+Default {
///
/// It is possible that the two composed paths result in an invalid summary, for example when
/// integer additions overflow. If it is correct that all timestamps moved along these paths
/// would also result in overflow and be discarded, `followed_by` can return `None. It is very
/// would also result in overflow and be discarded, `followed_by` can return `None`. It is very
/// important that this not be used casually, as this does not prevent the actual movement of
/// data.
///
Expand All @@ -67,7 +67,7 @@ impl PathSummary<()> for () {
#[inline] fn followed_by(&self, _other: &()) -> Option<()> { Some(()) }
}

/// Implements Timestamp and PathSummary for types with a `checked_add` method.
/// Implements [`Timestamp`] and [`PathSummary`] for types with a `checked_add` method.
macro_rules! implement_timestamp_add {
($($index_type:ty,)*) => (
$(
Expand Down
6 changes: 3 additions & 3 deletions timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ impl<A: Allocate> Worker<A> {
!self.dataflows.borrow().is_empty()
}

/// Calls `self.step()` as long as `func` evaluates to true.
/// Calls `self.step()` as long as `func` evaluates to `true`.
///
/// This method will continually execute even if there is not work
/// for the worker to perform. Consider using the similar method
Expand Down Expand Up @@ -441,7 +441,7 @@ impl<A: Allocate> Worker<A> {
self.step_or_park_while(Some(Duration::from_secs(0)), func)
}

/// Calls `self.step_or_park(duration)` as long as `func` evaluates to true.
/// Calls `self.step_or_park(duration)` as long as `func` evaluates to `true`.
///
/// This method may yield whenever there is no work to perform, as performed
/// by `Self::step_or_park()`. Please consult the documentation for further
Expand Down Expand Up @@ -699,7 +699,7 @@ impl<A: Allocate> Worker<A> {
self.dataflows.borrow().keys().cloned().collect()
}

/// True if there is at least one dataflow under management.
/// Returns `true` if there is at least one dataflow under management.
pub fn has_dataflows(&self) -> bool {
!self.dataflows.borrow().is_empty()
}
Expand Down