Skip to content

Commit ff48cbf

Browse files
committed
Repository migrated
1 parent a6c4501 commit ff48cbf

25 files changed

+2089
-2
lines changed

Cargo.toml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
[package]
2+
name = "rust-spp"
3+
version = "0.1.0"
4+
authors = ["Ricardo Pieper <ricardopieper@live.com>"]
5+
edition = "2018"
6+
7+
[dependencies]
8+
rand = "0.6.5"
9+
lazy_static = "1.3.0"
10+
raster = "0.2.0"
11+
clap = "2.33.0"
12+
num_cpus = "1.0"
13+
rayon = "1.0.3"
14+
time = "0.1.42"
15+
tokio = "0.1.19"
16+
futures = "0.1"
17+
tokio-core = "0.1.17"
18+
parking_lot = "*"
19+
[dev-dependencies]
20+
criterion = "0.2"
21+
22+
[[bench]]
23+
name = "mandelbrot_rustspp"
24+
harness = false
25+
26+
[[bench]]
27+
name = "mandelbrot_pixbypix"
28+
harness = false

README.md

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,42 @@
1-
# rust-ssp
2-
Structured Stream Parallelism for Rust
1+
# Rust SSP #
2+
3+
Structured Stream Parallelism for Rust
4+
5+
Define a pipeline with N steps. Pipelines can be normal pipelines or "farm pipelines" (in which some steps are parallel).
6+
You can also define pipelines with mutable state.
7+
8+
fn pipelined() {
9+
let pipeline = pipeline![
10+
pipeline,
11+
parallel!(LoadImage, 40),
12+
parallel!(ApplyMoreSaturation, 2),
13+
parallel!(ApplyEmboss, 2),
14+
parallel!(ApplyGamma, 2),
15+
parallel!(ApplySharpen, 2),
16+
parallel!(ApplyGrayscale, 2),
17+
parallel!(SaveImageAndGetResult, 40),
18+
sequential!(PrintResult)];
19+
20+
let dir_entries = std::fs::read_dir("/Users/user/Desktop/imagens");
21+
22+
for entry in dir_entries.unwrap() {
23+
let entry = entry.unwrap();
24+
let path = entry.path();
25+
26+
if path.extension().is_none() { continue; }
27+
28+
println!("Posting {:?}", path.to_str().unwrap());
29+
30+
pipeline.post(path).unwrap();
31+
32+
}
33+
34+
pipeline.end_and_wait();
35+
36+
println!("Finished.");
37+
}
38+
39+
40+
# How to Cite our Work
41+
42+
Ricardo Pieper, Dalvan Griebler, and Luiz Gustavo Fernandes. 2019. **Structured Stream Parallelism for Rust.** In Proceedings of the XXIII Brazilian Symposium on Programming Languages (SBLP 2019). ACM, New York, NY, USA, 54-61. DOI: https://doi.org/10.1145/3355378.3355384

benches/mandelbrot_pixbypix.rs

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
2+
#[macro_use]
3+
extern crate criterion;
4+
5+
use criterion::Criterion;
6+
use criterion::ParameterizedBenchmark;
7+
use criterion::Benchmark;
8+
use rayon::prelude::*;
9+
use rayon::ThreadPoolBuilder;
10+
use rust_spp::*;
11+
use std::rc::Rc;
12+
use std::sync::{Arc, Mutex};
13+
use raster::Color;
14+
15+
struct SquareImage {
16+
size: i32,
17+
cur_x: i32,
18+
cur_y: i32
19+
}
20+
21+
impl Iterator for SquareImage {
22+
type Item = (i32, i32);
23+
fn next(&mut self) -> Option<(i32, i32)> {
24+
let ret = (self.cur_x, self.cur_y);
25+
26+
self.cur_y = self.cur_y + 1;
27+
28+
if (self.cur_y == self.size) {
29+
self.cur_x = self.cur_x + 1;
30+
self.cur_y = 0;
31+
}
32+
if (self.cur_x == self.size) {
33+
return None;
34+
}
35+
else {
36+
return Some(ret);
37+
}
38+
}
39+
}
40+
41+
struct Parameters {
42+
init_a: f64,
43+
init_b: f64,
44+
step: f64
45+
}
46+
47+
fn calculate_pixel(x: i32, y: i32, step: f64, init_a: f64, init_b: f64) -> i32 {
48+
let iterations = 10000;
49+
50+
let im = init_b + (step * (y as f64));
51+
let mut a = init_a + (step * (x as f64));
52+
let cr = a;
53+
54+
let mut b = im;
55+
let mut k = 0;
56+
57+
for i in 0.. iterations {
58+
let a2 = a * a;
59+
let b2 = b * b;
60+
if (a2 + b2) > 4.0 { break; }
61+
b = 2.0 * a * b + im;
62+
a = a2 - b2 + cr;
63+
k = i;
64+
}
65+
return k;
66+
}
67+
68+
struct CalculatePixelIterations {
69+
params: Parameters
70+
}
71+
impl InOut<(i32, i32), (i32, i32, i32)> for CalculatePixelIterations {
72+
fn process(&mut self, position: (i32, i32)) -> (i32, i32, i32) {
73+
match position {
74+
(x, y) => {
75+
(x, y, calculate_pixel(x, y, self.params.step, self.params.init_a, self.params.init_b))
76+
}
77+
}
78+
}
79+
}
80+
81+
struct Renderer {
82+
}
83+
impl In<(i32, i32, i32), (usize, usize, u8)> for Renderer {
84+
fn process(&mut self, data: (i32, i32, i32), _order: u64) -> (usize, usize, u8) {
85+
let iterations = 10000;
86+
match data {
87+
(x, y, k) => {
88+
(x as usize, y as usize, (255 as f64 - (((k as f64) * 255 as f64 / (iterations as f64)))) as u8)
89+
}
90+
}
91+
}
92+
}
93+
94+
95+
fn mandelbrot_rustspp(size: usize, threads: i32) {
96+
{
97+
let mut pipeline = pipeline![
98+
parallel!(CalculatePixelIterations {
99+
params: Parameters {
100+
init_a: -2.125,
101+
init_b: -1.5,
102+
step: 3.0 / (size as f64),
103+
}
104+
}, threads),
105+
sequential!(Renderer { })];
106+
107+
for i in 0 .. size {
108+
for j in 0 .. size {
109+
pipeline.post((i as i32, j as i32)).unwrap();
110+
}
111+
}
112+
pipeline.end_and_wait();
113+
}
114+
}
115+
116+
fn mandelbrot_rayon(size: usize, thread_pool: Rc<rayon::ThreadPool>) {
117+
//x: i32, y: i32, step: f64, init_a: f64, init_b: f64
118+
let buf: Arc<Mutex<Vec<Vec<u8>>>> = Arc::new(Mutex::new(vec![vec![0u8; size]; size]));
119+
120+
let pixels = SquareImage { size: size as i32, cur_x:0, cur_y:0 };
121+
122+
let as_vec: Vec<(i32, i32)> = pixels.into_iter().collect();
123+
124+
let params = Parameters {
125+
init_a: -2.125,
126+
init_b: -1.5,
127+
step: 3.0 / (size as f64),
128+
};
129+
130+
thread_pool.install(|| {
131+
as_vec.into_par_iter()
132+
.map(|(x, y)| (x, y, calculate_pixel(x, y, params.step, params.init_a, params.init_b)))
133+
.for_each(|(x,y,k)| {
134+
let mut buf = buf.lock().unwrap();
135+
let iterations = 10000;
136+
buf[x as usize][y as usize] = (255 as f64 - (((k as f64) * 255 as f64 / (iterations as f64)))) as u8;
137+
});
138+
});
139+
140+
}
141+
142+
fn get_pixel_value(iterations: i32, k: i32) -> u8 {
143+
(255 as f64 - (((k as f64) * 255 as f64 / (iterations as f64)))) as u8
144+
}
145+
146+
type PixelPositionAndValue = (i32,i32,u8);
147+
148+
fn mandelbrot_rayon_collect(size: usize, thread_pool: Rc<rayon::ThreadPool>) {
149+
//x: i32, y: i32, step: f64, init_a: f64, init_b: f64
150+
151+
let pixels = SquareImage { size: size as i32, cur_x:0, cur_y:0 };
152+
153+
let as_vec: Vec<(i32, i32)> = pixels.into_iter().collect();
154+
155+
let params = Parameters {
156+
init_a: -2.125,
157+
init_b: -1.5,
158+
step: 3.0 / (size as f64),
159+
};
160+
161+
thread_pool.install(|| {
162+
let mut buf: Vec<Vec<u8>> = vec![vec![0u8; size]; size];
163+
let mut all_pixels: Vec<PixelPositionAndValue> = vec![(0,0,0); size];
164+
165+
as_vec.into_par_iter()
166+
.map(|(x, y)| (x, y,
167+
get_pixel_value(10000, calculate_pixel(x, y, params.step, params.init_a, params.init_b))))
168+
.collect_into_vec(&mut all_pixels);
169+
170+
for (x, y, pixel) in all_pixels {
171+
buf[x as usize][y as usize] = pixel;
172+
}
173+
});
174+
175+
}
176+
177+
fn criterion_benchmark(c: &mut Criterion) {
178+
179+
let threads_to_run = 4 ..= (num_cpus::get() as i32) * 2;
180+
println!("threads: {:?}", threads_to_run);
181+
c.bench("mandelbrot pixel by pixel comparison",
182+
ParameterizedBenchmark::new(
183+
"rustsupp mandelbrot 1000x1000",
184+
|b, &threads| { b.iter(|| mandelbrot_rustspp(1000, threads)); },
185+
threads_to_run)
186+
.with_function("rayon mandelbrot 1000x1000",
187+
|b, &threads| {
188+
let pool = Rc::new(ThreadPoolBuilder::new().num_threads(threads as usize).build().unwrap());
189+
b.iter(|| mandelbrot_rayon(1000, pool.clone())); })
190+
.sample_size(10));
191+
192+
}
193+
194+
195+
fn rayon_benchmark(c: &mut Criterion) {
196+
197+
let threads_to_run = 1 ..= (num_cpus::get() as i32);
198+
println!("threads: {:?}", threads_to_run);
199+
c.bench("mandelbrot rayon pixel by pixel",
200+
ParameterizedBenchmark::new(
201+
"rayon foreach 1000x1000",
202+
|b, &threads| {
203+
let pool = Rc::new(ThreadPoolBuilder::new().num_threads(threads as usize).build().unwrap());
204+
b.iter(|| mandelbrot_rayon(1000, pool.clone()));
205+
},threads_to_run)
206+
.with_function("rayon collect 1000x1000",
207+
|b, &threads| {
208+
let pool = Rc::new(ThreadPoolBuilder::new().num_threads(threads as usize).build().unwrap());
209+
b.iter(|| mandelbrot_rayon_collect(1000, pool.clone())); })
210+
.sample_size(10));
211+
212+
}
213+
214+
criterion_group!(benches, criterion_benchmark);
215+
//criterion_group!(benches, rayon_benchmark);
216+
criterion_main!(benches);

0 commit comments

Comments
 (0)