Skip to content

Use same algorithm for parallel_for_reduce as parallel_for #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions lib/task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -57,27 +57,31 @@ let teardown_pool pool =
done;
Array.iter Domain.join pool.domains

let parallel_for_reduce pool reduce_fun init ~chunk_size ~start ~finish ~body =
assert (chunk_size > 0);
let work s e =
let rec loop i acc =
if i > e then acc
else loop (i+1) (reduce_fun acc (body i))
in
loop s init
let parallel_for_reduce ?(chunk_size=0) ~start ~finish ~body pool reduce_fun init =
let chunk_size = if chunk_size > 0 then chunk_size
else begin
let n_domains = (Array.length pool.domains) + 1 in
let n_tasks = finish - start + 1 in
if n_domains = 1 then n_tasks
else max 1 (n_tasks/(8*n_domains))
end
in
let rec loop i acc =
if i+chunk_size > finish then
let p = async pool (fun _ -> work i finish) in
p::acc
let rec work s e =
if e - s < chunk_size then
let rec loop i acc =
if i > e then acc
else loop (i+1) (reduce_fun acc (body i))
in
loop s init
else begin
let p = async pool (fun _ -> work i (i+chunk_size-1)) in
loop (i+chunk_size) (p::acc)
let d = s + ((e - s) / 2) in
let p = async pool (fun _ -> work s d) in
let right = work (d+1) e in
let left = await pool p in
reduce_fun left right
end
in
let ps = loop start [] in
let results = List.map (await pool) ps in
List.fold_left reduce_fun init results
work start finish

let parallel_for ?(chunk_size=0) ~start ~finish ~body pool =
let chunk_size = if chunk_size > 0 then chunk_size
Expand Down
8 changes: 4 additions & 4 deletions lib/task.mli
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ val await : pool -> 'a promise -> 'a
* be returned. If the task had raised an exception, then [await] raises the
* same exception. *)

val parallel_for: ?chunk_size:int -> start:int -> finish:int ->
val parallel_for : ?chunk_size:int -> start:int -> finish:int ->
body:(int -> unit) -> pool -> unit
(** [parallel_for c s f b p] behaves similar to [for i=s to f do b i done], but
* runs the for loop in parallel. The chunk size [c] determines the number of
Expand All @@ -37,9 +37,9 @@ val parallel_for: ?chunk_size:int -> start:int -> finish:int ->
* scheme.
*)

val parallel_for_reduce : pool -> ('a -> 'a -> 'a) -> 'a -> chunk_size:int ->
start:int -> finish:int -> body:(int -> 'a) -> 'a
(** [parallel_for_reduce p r i c s f b] is similar to [parallel_for] except
val parallel_for_reduce : ?chunk_size:int -> start:int -> finish:int ->
body:(int -> 'a) -> pool -> ('a -> 'a -> 'a) -> 'a -> 'a
(** [parallel_for_reduce c s f b p r i] is similar to [parallel_for] except
* that the result returned by each iteration is reduced with [r] with initial
* value [i]. *)

Expand Down