Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 62 additions & 48 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub trait Container: Default + Clone + 'static {
/// The type of elements when reading non-destructively from the container.
type ItemRef<'a> where Self: 'a;

/// The type of elements when draining the continer.
/// The type of elements when draining the container.
type Item<'a> where Self: 'a;

/// Push `item` into self
Expand Down Expand Up @@ -63,6 +63,16 @@ pub trait Container: Default + Clone + 'static {
fn drain(&mut self) -> Self::DrainIter<'_>;
}

/// A container that can be sized and reveals its capacity.
pub trait SizableContainer: Container {
/// Return the capacity of the container.
fn capacity(&self) -> usize;
/// Return the preferred capacity of the container.
fn preferred_capacity() -> usize;
/// Reserve space for `additional` elements, possibly increasing the capacity of the container.
fn reserve(&mut self, additional: usize);
}

/// A container that can absorb items of a specific type.
pub trait PushInto<T> {
/// Push item into self.
Expand All @@ -75,36 +85,43 @@ pub trait PushInto<T> {
/// chunked into individual containers, but is free to change the data representation to
/// better fit the properties of the container.
///
/// Types implementing this trait should provide appropriate [`PushInto`] implementations such
/// that users can push the expected item types.
///
/// The owner extracts data in two ways. The opportunistic [`Self::extract`] method returns
/// any ready data, but doesn't need to produce partial outputs. In contrast, [`Self::finish`]
/// needs to produce all outputs, even partial ones.
/// needs to produce all outputs, even partial ones. Caller should repeatedly call the functions
/// to drain pending or finished data.
///
/// The caller should consume the containers returned by [`Self::extract`] and
/// [`Self::finish`]. Implementations can recycle buffers, but should ensure that they clear
/// any remaining elements.
///
/// For example, a consolidating builder can aggregate differences in-place, but it has
/// to ensure that it preserves the intended information.
///
/// The trait does not prescribe any specific ordering guarantees, and each implementation can
/// decide to represent a `push`/`push_container` order for `extract` and `finish`, or not.
// TODO: Consider adding `push_iterator` to receive an iterator of data.
/// decide to represent a push order for `extract` and `finish`, or not.
pub trait ContainerBuilder: Default + 'static {
/// The container type we're building.
type Container: Container;
/// Add an item to a container.
/// Extract assembled containers, potentially leaving unfinished data behind. Can
/// be called repeatedly, for example while the caller can send data.
///
/// The restriction to [`SizeableContainer`] only exists so that types
/// relying on [`CapacityContainerBuilder`] only need to constrain their container
/// to [`Container`] instead of [`SizableContainer`], which otherwise would be a pervasive
/// requirement.
fn push<T>(&mut self, item: T) where Self::Container: SizableContainer + PushInto<T>;
/// Push a pre-built container.
fn push_container(&mut self, container: &mut Self::Container);
/// Extract assembled containers, potentially leaving unfinished data behind.
/// Returns a `Some` if there is data ready to be shipped, and `None` otherwise.
#[must_use]
fn extract(&mut self) -> Option<&mut Self::Container>;
/// Extract assembled containers and any unfinished data.
/// Extract assembled containers and any unfinished data. Should
/// be called repeatedly until it returns `None`.
#[must_use]
fn finish(&mut self) -> Option<&mut Self::Container>;
}

/// A default container builder that uses length and preferred capacity to chunk data.
///
/// Maintains a single empty allocation between [`Self::push_into`] and [`Self::extract`], but not
/// across [`Self::finish`] to maintain a low memory footprint.
///
/// Maintains FIFO order.
#[derive(Default, Debug)]
pub struct CapacityContainerBuilder<C>{
Expand All @@ -116,21 +133,9 @@ pub struct CapacityContainerBuilder<C>{
pending: VecDeque<C>,
}

/// A container that can be sized and reveals its capacity.
pub trait SizableContainer: Container {
/// Return the capacity of the container.
fn capacity(&self) -> usize;
/// Return the preferred capacity of the container.
fn preferred_capacity() -> usize;
/// Reserve space for `additional` elements, possibly increasing the capacity of the container.
fn reserve(&mut self, additional: usize);
}

impl<C: Container> ContainerBuilder for CapacityContainerBuilder<C> {
type Container = C;

impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuilder<C> {
#[inline]
fn push<T>(&mut self, item: T) where C: SizableContainer + PushInto<T> {
fn push_into(&mut self, item: T) {
if self.current.capacity() == 0 {
self.current = self.empty.take().unwrap_or_default();
// Discard any non-uniform capacity container.
Expand All @@ -153,23 +158,10 @@ impl<C: Container> ContainerBuilder for CapacityContainerBuilder<C> {
self.pending.push_back(std::mem::take(&mut self.current));
}
}
}

#[inline]
fn push_container(&mut self, container: &mut Self::Container) {
if !container.is_empty() {
// Flush to maintain FIFO ordering.
if self.current.len() > 0 {
self.pending.push_back(std::mem::take(&mut self.current));
}

let mut empty = self.empty.take().unwrap_or_default();
// Ideally, we'd discard non-uniformly sized containers, but we don't have
// access to `len`/`capacity` of the container.
empty.clear();

self.pending.push_back(std::mem::replace(container, empty));
}
}
impl<C: Container> ContainerBuilder for CapacityContainerBuilder<C> {
type Container = C;

#[inline]
fn extract(&mut self) -> Option<&mut C> {
Expand All @@ -183,10 +175,32 @@ impl<C: Container> ContainerBuilder for CapacityContainerBuilder<C> {

#[inline]
fn finish(&mut self) -> Option<&mut C> {
if self.current.len() > 0 {
if !self.current.is_empty() {
self.pending.push_back(std::mem::take(&mut self.current));
}
self.extract()
self.empty = self.pending.pop_front();
self.empty.as_mut()
}
}

impl<C: Container> CapacityContainerBuilder<C> {
/// Push a pre-formed container at this builder. This exists to maintain
/// API compatibility.
#[inline]
pub fn push_container(&mut self, container: &mut C) {
if !container.is_empty() {
// Flush to maintain FIFO ordering.
if self.current.len() > 0 {
self.pending.push_back(std::mem::take(&mut self.current));
}

let mut empty = self.empty.take().unwrap_or_default();
// Ideally, we'd discard non-uniformly sized containers, but we don't have
// access to `len`/`capacity` of the container.
empty.clear();

self.pending.push_back(std::mem::replace(container, empty));
}
}
}

Expand Down Expand Up @@ -347,7 +361,7 @@ pub trait PushPartitioned: SizableContainer {
F: FnMut(usize, &mut Self);
}

impl<T: SizableContainer> PushPartitioned for T where for<'a> T: PushInto<T::Item<'a>> {
impl<C: SizableContainer> PushPartitioned for C where for<'a> C: PushInto<C::Item<'a>> {
fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], mut index: I, mut flush: F)
where
for<'a> I: FnMut(&Self::Item<'a>) -> usize,
Expand Down Expand Up @@ -383,7 +397,7 @@ pub mod buffer {
/// The maximum buffer capacity in elements. Returns a number between [BUFFER_SIZE_BYTES]
/// and 1, inclusively.
pub const fn default_capacity<T>() -> usize {
let size = ::std::mem::size_of::<T>();
let size = std::mem::size_of::<T>();
if size == 0 {
BUFFER_SIZE_BYTES
} else if size <= BUFFER_SIZE_BYTES {
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl<T, C: Container> Message<T, C> {
}

/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
/// leaves in place, or the container's default element.
/// leaves in place, or the container's default element. The buffer is cleared.
#[inline]
pub fn push_at<P: Push<Bundle<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {

Expand Down
96 changes: 61 additions & 35 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! with the performance of batched sends.

use crate::communication::Push;
use crate::container::{ContainerBuilder, CapacityContainerBuilder, SizableContainer, PushInto};
use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
use crate::dataflow::channels::{Bundle, Message};
use crate::dataflow::operators::Capability;
use crate::progress::Timestamp;
Expand Down Expand Up @@ -56,6 +56,14 @@ impl<T, C: Container, P: Push<Bundle<T, C>>> Buffer<T, CapacityContainerBuilder<
pub fn autoflush_session(&mut self, cap: Capability<T>) -> AutoflushSession<T, CapacityContainerBuilder<C>, P> where T: Timestamp {
self.autoflush_session_with_builder(cap)
}

/// Gives an entire container at the current time.
fn give_container(&mut self, container: &mut C) {
if !container.is_empty() {
self.builder.push_container(container);
self.extract_and_send();
}
}
}

impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
Expand Down Expand Up @@ -86,7 +94,7 @@ impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P

/// Extract pending data from the builder, but not forcing a flush.
#[inline]
fn extract(&mut self) {
fn extract_and_send(&mut self) {
while let Some(container) = self.builder.extract() {
let time = self.time.as_ref().unwrap().clone();
Message::push_at(container, time, &mut self.pusher);
Expand All @@ -101,26 +109,18 @@ impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P
Message::push_at(container, time, &mut self.pusher);
}
}

/// Gives an entire container at the current time.
fn give_container(&mut self, container: &mut CB::Container) {
if !container.is_empty() {
self.builder.push_container(container);
self.extract();
}
}
}

impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P>
impl<T, CB, P, D> PushInto<D> for Buffer<T, CB, P>
where
T: Eq+Clone,
CB::Container: SizableContainer,
CB: ContainerBuilder + PushInto<D>,
P: Push<Bundle<T, CB::Container>>
{
// Push a single item into the builder. Internal method for use by `Session`.
#[inline]
fn give<D>(&mut self, data: D) where CB::Container: PushInto<D> {
self.builder.push(data);
self.extract();
fn push_into(&mut self, item: D) {
self.builder.push_into(item);
self.extract_and_send();
}
}

Expand All @@ -133,49 +133,60 @@ pub struct Session<'a, T, CB, P> {
buffer: &'a mut Buffer<T, CB, P>,
}

impl<'a, T, CB, P> Session<'a, T, CB, P>
impl<'a, T, C: Container, P> Session<'a, T, CapacityContainerBuilder<C>, P>
where
T: Eq + Clone + 'a,
CB: ContainerBuilder + 'a,
P: Push<Bundle<T, CB::Container>> + 'a
P: Push<Bundle<T, C>> + 'a,
{
/// Provide a container at the time specified by the [Session].
pub fn give_container(&mut self, container: &mut CB::Container) {
pub fn give_container(&mut self, container: &mut C) {
self.buffer.give_container(container)
}
}

impl<'a, T, CB, P> Session<'a, T, CB, P>
where
T: Eq + Clone + 'a,
CB: ContainerBuilder + 'a,
P: Push<Bundle<T, CB::Container>> + 'a
{
/// Access the builder. Immutable access to prevent races with flushing
/// the underlying buffer.
pub fn builder(&self) -> &CB {
self.buffer.builder()
}
}

impl<'a, T, CB, P: Push<Bundle<T, CB::Container>>+'a> Session<'a, T, CB, P>
where
T: Eq + Clone + 'a,
CB: ContainerBuilder + 'a,
CB::Container: SizableContainer,
{
/// Provides one record at the time specified by the `Session`.
#[inline]
pub fn give<D>(&mut self, data: D) where CB::Container: PushInto<D> {
self.buffer.give(data);
pub fn give<D>(&mut self, data: D) where CB: PushInto<D> {
self.push_into(data);
}

/// Provides an iterator of records at the time specified by the `Session`.
#[inline]
pub fn give_iterator<I>(&mut self, iter: I)
where
I: Iterator,
CB::Container: PushInto<I::Item>,
CB: PushInto<I::Item>,
{
for item in iter {
self.give(item);
self.push_into(item);
}
}
}

impl<'a, T, CB, P, D> PushInto<D> for Session<'a, T, CB, P>
where
T: Eq + Clone + 'a,
CB: ContainerBuilder + PushInto<D> + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
{
#[inline]
fn push_into(&mut self, item: D) {
self.buffer.push_into(item);
}
}

/// A session which will flush itself when dropped.
pub struct AutoflushSession<'a, T, CB, P>
where
Expand All @@ -197,21 +208,36 @@ where
{
/// Transmits a single record.
#[inline]
pub fn give<D>(&mut self, data: D) where CB::Container: SizableContainer + PushInto<D> {
self.buffer.give(data);
pub fn give<D>(&mut self, data: D)
where
CB: PushInto<D>,
{
self.push_into(data);
}

/// Transmits records produced by an iterator.
#[inline]
pub fn give_iterator<I, D>(&mut self, iter: I)
where
I: Iterator<Item=D>,
CB::Container: SizableContainer + PushInto<D>,
CB: PushInto<D>,
{
for item in iter {
self.give(item);
self.push_into(item);
}
}
}
impl<'a, T, CB, P, D> PushInto<D> for AutoflushSession<'a, T, CB, P>
where
T: Timestamp + 'a,
CB: ContainerBuilder + PushInto<D> + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
{
#[inline]
fn push_into(&mut self, item: D) {
self.buffer.push_into(item);
}
}

impl<'a, T, CB, P> Drop for AutoflushSession<'a, T, CB, P>
where
Expand Down
Loading