Skip to content

Improvement of parallel_for implementation #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions lib/task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -79,24 +79,26 @@ let parallel_for_reduce pool reduce_fun init ~chunk_size ~start ~finish ~body =
let results = List.map (await pool) ps in
List.fold_left reduce_fun init results

let parallel_for pool ~chunk_size ~start ~finish ~body =
assert (chunk_size > 0);
let work s e =
for i=s to e do
body i
done
let parallel_for ?(chunk_size=0) ~start ~finish ~body pool =
let chunk_size = if chunk_size > 0 then chunk_size
else begin
let n_domains = (Array.length pool.domains) + 1 in
let n_tasks = finish - start + 1 in
if n_domains = 1 then n_tasks
else max 1 (n_tasks/(8*n_domains))
end
in
let rec loop i acc =
if i+chunk_size > finish then
let p = async pool (fun _ -> work i finish) in
p::acc
let rec work pool fn s e =
if e - s < chunk_size then
for i = s to e do fn i done
else begin
let p = async pool (fun _ -> work i (i+chunk_size-1)) in
loop (i+chunk_size) (p::acc)
let d = s + ((e - s) / 2) in
let left = async pool (fun _ -> work pool fn s d) in
work pool fn (d+1) e;
await pool left
end
in
let ps = loop start [] in
List.iter (await pool) ps
work pool body start finish

let parallel_scan pool op elements =

Expand Down
15 changes: 9 additions & 6 deletions lib/task.mli
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ val await : pool -> 'a promise -> 'a
* be returned. If the task had raised an exception, then [await] raises the
* same exception. *)

val parallel_for : pool -> chunk_size:int -> start:int -> finish:int ->
body:(int -> unit) -> unit
(** [parallel_for p c s f b] behaves similar to [for i=s to f do b i done], but
* runs the for loop in parallel. The chunk size [c] determines the
* granularity of parallelisation. Individual iterates may be run in any
* order. *)
val parallel_for: ?chunk_size:int -> start:int -> finish:int ->
body:(int -> unit) -> pool -> unit
(** [parallel_for c s f b p] behaves similar to [for i=s to f do b i done], but
* runs the for loop in parallel. The chunk size [c] determines the number of
* body applications done in one task; this will default to
* [(finish-start + 1) / (8 * num_domains)]. Individual iterates may be run
* in any order. Tasks are distributed to workers using a divide-and-conquer
* scheme.
*)

val parallel_for_reduce : pool -> ('a -> 'a -> 'a) -> 'a -> chunk_size:int ->
start:int -> finish:int -> body:(int -> 'a) -> 'a
Expand Down
28 changes: 26 additions & 2 deletions test/sum_par.ml
Original file line number Diff line number Diff line change
@@ -1,14 +1,38 @@
let num_domains = try int_of_string Sys.argv.(1) with _ -> 1
let num_domains = try int_of_string Sys.argv.(1) with _ -> 2
let n = try int_of_string Sys.argv.(2) with _ -> 40

module T = Domainslib.Task

let _ =
(* use parallel_for_reduce *)
let p = T.setup_pool ~num_domains:(num_domains - 1) in
let sum =
T.parallel_for_reduce p (+) 0 ~chunk_size:(n/(4*num_domains)) ~start:0
~finish:(n-1) ~body:(fun _i -> 1)
in
T.teardown_pool p;
Printf.printf "Sum is %d\n" sum
Printf.printf "Sum is %d\n" sum;
assert (sum = n)

let _ =
(* explictly use empty pool and default chunk_size *)
let p = T.setup_pool ~num_domains:0 in
let sum = Atomic.make 0 in
T.parallel_for p ~start:0 ~finish:(n-1)
~body:(fun _i -> ignore (Atomic.fetch_and_add sum 1));
let sum = Atomic.get sum in
T.teardown_pool p;
Printf.printf "Sum is %d\n" sum;
assert (sum = n)

let _ =
(* configured num_domains and default chunk_size *)
let p = T.setup_pool ~num_domains:(num_domains - 1) in
let sum = Atomic.make 0 in
T.parallel_for p ~start:0 ~finish:(n-1)
~body:(fun _i -> ignore (Atomic.fetch_and_add sum 1));
let sum = Atomic.get sum in
T.teardown_pool p;
Printf.printf "Sum is %d\n" sum;
assert (sum = n)