Skip to content

Commit 25ba925

Browse files
committed
Reorganize types and traits to deprioritize Vec streams
1 parent c9e9712 commit 25ba925

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+355
-258
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use timely::dataflow::operators::*;
2323
fn main() {
2424
timely::example(|scope| {
2525
(0..10).to_stream(scope)
26+
.container::<Vec<_>>()
2627
.inspect(|x| println!("seen: {:?}", x));
2728
});
2829
}
@@ -66,6 +67,7 @@ fn main() {
6667
// create a new input, exchange data, and inspect its output
6768
worker.dataflow(|scope| {
6869
scope.input_from(&mut input)
70+
.container::<Vec<_>>()
6971
.exchange(|x| *x)
7072
.inspect(move |x| println!("worker {}:\thello {}", index, x))
7173
.probe_with(&mut probe);

mdbook/src/chapter_0/chapter_0_0.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ Let's start with what may be the simplest non-trivial timely dataflow program.
55
```rust
66
# extern crate timely;
77

8-
use timely::dataflow::operators::{ToStream, Inspect};
8+
use timely::dataflow::operators::{vec::ToStream, Inspect};
99

1010
timely::example(|scope| {
1111
(0..10).to_stream(scope)

mdbook/src/chapter_0/chapter_0_1.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ timely::execute_from_args(std::env::args(), |worker| {
1919
// create a new input, exchange data, and inspect its output
2020
let probe = worker.dataflow(|scope|
2121
scope.input_from(&mut input)
22+
.container::<Vec<_>>()
2223
.exchange(|x| *x)
2324
.inspect(move |x| println!("worker {}:\thello {}", index, x))
2425
.probe()

mdbook/src/chapter_1/chapter_1_1.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ fn main() {
2525
// create a new input, exchange data, and inspect its output
2626
let probe = worker.dataflow(|scope|
2727
scope.input_from(&mut input)
28+
.container::<Vec<_>>()
2829
.exchange(|x| *x)
2930
.inspect(move |x| println!("worker {}:\thello {}", index, x))
3031
.probe()

mdbook/src/chapter_1/chapter_1_3.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ fn main() {
2020
// create a new input, exchange data, and inspect its output
2121
let probe = worker.dataflow(|scope|
2222
scope.input_from(&mut input)
23+
.container::<Vec<_>>()
2324
.exchange(|x| *x)
2425
.inspect(move |x| println!("worker {}:\thello {}", index, x))
2526
.probe()

mdbook/src/chapter_2/chapter_2.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use timely::dataflow::operators::{ToStream, Inspect};
1414
fn main() {
1515
timely::example(|scope| {
1616
(0..10).to_stream(scope)
17+
.container::<Vec<_>>()
1718
.inspect(|x| println!("seen: {:?}", x));
1819
});
1920
}

mdbook/src/chapter_2/chapter_2_1.md

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@ fn main() {
1616
// initializes and runs a timely dataflow.
1717
timely::execute_from_args(std::env::args(), |worker| {
1818

19-
let mut input = InputHandle::<(), String>::new();
19+
let mut input = InputHandle::new();
2020

2121
// define a new dataflow
22-
worker.dataflow(|scope| {
23-
24-
let stream1 = input.to_stream(scope);
25-
let stream2 = (0 .. 10).to_stream(scope);
22+
worker.dataflow::<u32,_,_>(|scope| {
2623

24+
let stream1 = input.to_stream(scope).container::<Vec<i32>>();
25+
let stream2 = (0 .. 10).to_stream(scope).container::<Vec<_>>();
2726
});
2827

2928
}).unwrap();

mdbook/src/chapter_2/chapter_2_2.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ fn main() {
1414
worker.dataflow::<(),_,_>(|scope| {
1515
(0 .. 10)
1616
.to_stream(scope)
17+
.container::<Vec<_>>()
1718
.inspect(|x| println!("hello: {}", x));
1819
});
1920
}).unwrap();
@@ -36,6 +37,7 @@ fn main() {
3637
worker.dataflow::<(),_,_>(|scope| {
3738
(0 .. 10)
3839
.to_stream(scope)
40+
.container::<Vec<_>>()
3941
.inspect_batch(|t, xs| println!("hello: {:?} @ {:?}", xs, t));
4042
});
4143
}).unwrap();
@@ -58,8 +60,8 @@ use timely::dataflow::operators::capture::Extract;
5860

5961
fn main() {
6062
let (data1, data2) = timely::example(|scope| {
61-
let data1 = (0..3).to_stream(scope).capture();
62-
let data2 = vec![0,1,2].to_stream(scope).capture();
63+
let data1 = (0..3).to_stream(scope).container::<Vec<_>>().capture();
64+
let data2 = vec![0,1,2].to_stream(scope).container::<Vec<_>>().capture();
6365
(data1, data2)
6466
});
6567

mdbook/src/chapter_2/chapter_2_3.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ The following program should print out the numbers one through ten.
1313
```rust
1414
extern crate timely;
1515

16-
use timely::dataflow::operators::{ToStream, Inspect, Map};
16+
use timely::dataflow::operators::{ToStream, Inspect, vec::Map};
1717

1818
fn main() {
1919
timely::execute_from_args(std::env::args(), |worker| {
@@ -32,7 +32,7 @@ The closure `map` takes *owned* data as input, which means you are able to mutat
3232
```rust
3333
extern crate timely;
3434

35-
use timely::dataflow::operators::{ToStream, Inspect, Map};
35+
use timely::dataflow::operators::{ToStream, Inspect, vec::Map};
3636

3737
fn main() {
3838
timely::execute_from_args(std::env::args(), |worker| {
@@ -56,7 +56,7 @@ For example, the `map_in_place` method takes a closure which receives a mutable
5656
```rust
5757
extern crate timely;
5858

59-
use timely::dataflow::operators::{ToStream, Inspect, Map};
59+
use timely::dataflow::operators::{ToStream, Inspect, vec::Map};
6060

6161
fn main() {
6262
timely::execute_from_args(std::env::args(), |worker| {
@@ -76,7 +76,7 @@ Alternately, the `flat_map` method takes input data and allows your closure to t
7676
```rust
7777
extern crate timely;
7878

79-
use timely::dataflow::operators::{ToStream, Inspect, Map};
79+
use timely::dataflow::operators::{ToStream, Inspect, vec::Map};
8080

8181
fn main() {
8282
timely::execute_from_args(std::env::args(), |worker| {
@@ -97,7 +97,7 @@ Another fundamental operation is *filtering*, in which a predicate dictates a su
9797
```rust
9898
extern crate timely;
9999

100-
use timely::dataflow::operators::{ToStream, Inspect, Filter};
100+
use timely::dataflow::operators::{ToStream, Inspect, vec::Filter};
101101

102102
fn main() {
103103
timely::execute_from_args(std::env::args(), |worker| {
@@ -122,7 +122,7 @@ The `partition` operator takes two arguments, a number of resulting streams to p
122122
```rust
123123
extern crate timely;
124124

125-
use timely::dataflow::operators::{ToStream, Partition, Inspect};
125+
use timely::dataflow::operators::{ToStream, Inspect, vec::Partition};
126126

127127
fn main() {
128128
timely::example(|scope| {
@@ -143,7 +143,7 @@ In the other direction, `concat` takes two streams and produces one output strea
143143
```rust
144144
extern crate timely;
145145

146-
use timely::dataflow::operators::{ToStream, Partition, Concat, Inspect};
146+
use timely::dataflow::operators::{ToStream, Concat, Inspect, vec::Partition};
147147

148148
fn main() {
149149
timely::example(|scope| {
@@ -163,7 +163,7 @@ There is also a `concatenate` method defined for scopes which collects all strea
163163
```rust
164164
extern crate timely;
165165

166-
use timely::dataflow::operators::{ToStream, Partition, Concatenate, Inspect};
166+
use timely::dataflow::operators::{ToStream, Concatenate, Inspect, vec::Partition};
167167

168168
fn main() {
169169
timely::example(|scope| {

mdbook/src/chapter_2/chapter_2_4.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ fn main() {
1717
timely::example(|scope| {
1818
(0u64..10)
1919
.to_stream(scope)
20+
.container::<Vec<_>>()
2021
.unary(Pipeline, "increment", |capability, info| {
2122

2223
move |input, output| {
@@ -131,6 +132,7 @@ fn main() {
131132
timely::example(|scope| {
132133
(0u64..10)
133134
.to_stream(scope)
135+
.container::<Vec<_>>()
134136
.unary(Pipeline, "increment", |capability, info| {
135137

136138
let mut maximum = 0; // define this here; use in the closure
@@ -179,8 +181,8 @@ use timely::dataflow::channels::pact::Pipeline;
179181
fn main() {
180182
timely::example(|scope| {
181183

182-
let in1 = (0 .. 10).to_stream(scope);
183-
let in2 = (0 .. 10).to_stream(scope);
184+
let in1 = (0 .. 10).to_stream(scope).container::<Vec<_>>();
185+
let in2 = (0 .. 10).to_stream(scope).container::<Vec<_>>();
184186

185187
in1.binary_frontier(in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {
186188

@@ -230,8 +232,8 @@ use timely::dataflow::channels::pact::Pipeline;
230232
fn main() {
231233
timely::example(|scope| {
232234

233-
let in1 = (0 .. 10).to_stream(scope);
234-
let in2 = (0 .. 10).to_stream(scope);
235+
let in1 = (0 .. 10).to_stream(scope).container::<Vec<_>>();
236+
let in2 = (0 .. 10).to_stream(scope).container::<Vec<_>>();
235237

236238
in1.binary_frontier(in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {
237239

0 commit comments

Comments
 (0)