Skip to content

Move to Mutex & Condition from Domain.Sync.{notify/wait} #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ run_test:
dune exec test/spectralnorm2_multicore.exe 1 2000
dune exec test/sum_par.exe 1 100
dune exec test/task_exn.exe
dune exec test/LU_decomposition_multicore.exe 1 512

clean:
dune clean
121 changes: 84 additions & 37 deletions lib/chan.ml
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
(* mutex_condvar will be used per domain; so multiple fibers or
systhreads may share a mutex_condvar variable *)
type mutex_condvar = {
mutex: Mutex.t;
condition: Condition.t
}

type waiting_notified =
| Waiting
| Notified

type 'a contents =
| Empty of {receivers: ('a option ref * Domain.id) Fun_queue.t}
| NotEmpty of {senders: ('a * Domain.id) Fun_queue.t; messages: 'a Fun_queue.t}
| Empty of {receivers: ('a option ref * mutex_condvar) Fun_queue.t}
| NotEmpty of {senders: ('a * waiting_notified ref * mutex_condvar) Fun_queue.t; messages: 'a Fun_queue.t}

type 'a t = {buffer_size: int option; contents: 'a contents Atomic.t}
type 'a t = {
buffer_size: int option;
contents: 'a contents Atomic.t
}

let mutex_condvar_key =
Domain.DLS.new_key (fun () ->
let m = Mutex.create () in
let c = Condition.create () in
{mutex=m; condition=c})

let make_bounded n =
if n < 0 then raise (Invalid_argument "Chan.make_bounded") ;
{buffer_size= Some n; contents = Atomic.make (Empty {receivers= Fun_queue.empty})}
{buffer_size= Some n;
contents = Atomic.make (Empty {receivers= Fun_queue.empty; })}

let make_unbounded () =
{buffer_size= None; contents = Atomic.make (Empty {receivers= Fun_queue.empty})}
{buffer_size= None;
contents = Atomic.make (Empty {receivers= Fun_queue.empty})}

(* [send'] is shared by both the blocking and polling versions. Returns a
* boolean indicating whether the send was successful. Hence, it always returns
Expand All @@ -30,13 +52,21 @@ let send' {buffer_size; contents} v ~polling =
begin if not polling then begin
(* The channel is empty (no senders), no waiting receivers,
* buffer size is 0 and we're not polling *)
let mc = Domain.DLS.get mutex_condvar_key in
let cond_slot = ref Waiting in
let new_contents =
NotEmpty
{messages= empty; senders= push empty (v, Domain.self ())}
{messages= empty; senders= push empty (v, cond_slot, mc)}
in
if Atomic.compare_and_set contents old_contents new_contents
then (Domain.Sync.wait (); true)
else loop ()
then begin
Mutex.lock mc.mutex;
while !cond_slot = Waiting do
Condition.wait mc.condition mc.mutex
done;
Mutex.unlock mc.mutex;
true
end else loop ()
end else
(* The channel is empty (no senders), no waiting receivers,
* buffer size is 0 and we're polling *)
Expand All @@ -51,25 +81,18 @@ let send' {buffer_size; contents} v ~polling =
if Atomic.compare_and_set contents old_contents new_contents
then true
else loop ()
| Some ((r, d), receivers') ->
| Some ((r, mc), receivers') ->
(* The channel is empty (no senders) and there are waiting receivers
* *)
let new_contents = Empty {receivers= receivers'} in
if Atomic.compare_and_set contents old_contents new_contents
then (
then begin
r := Some v;
(* Notifying another domain from within a critical section is unsafe
* in general. Notify blocks until the target domain is out of the
* critical section. If two domains are notifying each other from
* within critical section, then the program deadlocks. However,
* here (and other uses of notify in send' and recv' in the channel
* implementation), there is no possibility of other domains
* notifying this domain; only a blocked domain will be notified,
* and this domain is currently running. Hence, it is ok to notify
* from within the critical section. *)
Domain.Sync.notify d;
true )
else loop ()
Mutex.lock mc.mutex;
Mutex.unlock mc.mutex;
Condition.broadcast mc.condition;
true
end else loop ()
end
| NotEmpty {senders; messages} ->
(* The channel is not empty *)
Expand All @@ -78,12 +101,19 @@ let send' {buffer_size; contents} v ~polling =
begin if not polling then
(* The channel is not empty, the buffer is full and we're not
* polling *)
let cond_slot = ref Waiting in
let mc = Domain.DLS.get mutex_condvar_key in
let new_contents =
NotEmpty {senders= push senders (v, Domain.self ()); messages}
NotEmpty {senders= push senders (v, cond_slot, mc); messages}
in
if Atomic.compare_and_set contents old_contents new_contents then
( Domain.Sync.wait () ; true )
else loop ()
if Atomic.compare_and_set contents old_contents new_contents then begin
Mutex.lock mc.mutex;
while !cond_slot = Waiting do
Condition.wait mc.condition mc.mutex;
done;
Mutex.unlock mc.mutex;
true
end else loop ()
else
(* The channel is not empty, the buffer is full and we're
* polling *)
Expand All @@ -98,7 +128,7 @@ let send' {buffer_size; contents} v ~polling =
then true
else loop ()
in
Domain.Sync.critical_section loop
loop ()

let send c v =
let r = send' c v ~polling:false in
Expand All @@ -119,12 +149,19 @@ let recv' {buffer_size; contents} ~polling =
if not polling then begin
(* The channel is empty (no senders), and we're not polling *)
let msg_slot = ref None in
let mc = Domain.DLS.get mutex_condvar_key in
let new_contents =
Empty {receivers= push receivers (msg_slot, Domain.self ())}
Empty {receivers= push receivers (msg_slot, mc)}
in
if Atomic.compare_and_set contents old_contents new_contents then
(Domain.Sync.wait (); !msg_slot)
else loop ()
begin
Mutex.lock mc.mutex;
while !msg_slot = None do
Condition.wait mc.condition mc.mutex;
done;
Mutex.unlock mc.mutex;
!msg_slot
end else loop ()
end else
(* The channel is empty (no senders), and we're polling *)
None
Expand All @@ -146,7 +183,7 @@ let recv' {buffer_size; contents} ~polling =
if Atomic.compare_and_set contents old_contents new_contents
then Some m
else loop ()
| None, Some ((m, s), senders') ->
| None, Some ((m, c, mc), senders') ->
(* The channel is not empty, there are no messages, and there
* is a waiting sender. This is only possible is the buffer
* size is 0. *)
Expand All @@ -158,19 +195,29 @@ let recv' {buffer_size; contents} ~polling =
NotEmpty {messages; senders= senders'}
in
if Atomic.compare_and_set contents old_contents new_contents
then (Domain.Sync.notify s; Some m)
else loop ()
| Some (m, messages'), Some ((ms, s), senders') ->
then begin
c := Notified;
Mutex.lock mc.mutex;
Mutex.unlock mc.mutex;
Condition.broadcast mc.condition;
Some m
end else loop ()
| Some (m, messages'), Some ((ms, sc, mc), senders') ->
(* The channel is not empty, there is a message, and there is a
* waiting sender. *)
let new_contents =
NotEmpty {messages= push messages' ms; senders= senders'}
in
if Atomic.compare_and_set contents old_contents new_contents
then (Domain.Sync.notify s; Some m)
else loop ()
then begin
sc := Notified;
Mutex.lock mc.mutex;
Mutex.unlock mc.mutex;
Condition.broadcast mc.condition;
Some m
end else loop ()
in
Domain.Sync.critical_section loop
loop ()

let recv c =
match recv' c ~polling:false with
Expand Down
63 changes: 63 additions & 0 deletions test/LU_decomposition_multicore.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
module T = Domainslib.Task
let num_domains = try int_of_string Sys.argv.(1) with _ -> 1
let mat_size = try int_of_string Sys.argv.(2) with _ -> 1200

let k = Domain.DLS.new_key Random.State.make_self_init

module SquareMatrix = struct

let create f : float array =
let fa = Array.create_float (mat_size * mat_size) in
for i = 0 to mat_size * mat_size - 1 do
fa.(i) <- f (i / mat_size) (i mod mat_size)
done;
fa
let parallel_create pool f : float array =
let fa = Array.create_float (mat_size * mat_size) in
T.parallel_for pool ~start:0 ~finish:( mat_size * mat_size - 1)
~body:(fun i -> fa.(i) <- f (i / mat_size) (i mod mat_size));
fa

let get (m : float array) r c = m.(r * mat_size + c)
let set (m : float array) r c v = m.(r * mat_size + c) <- v
let parallel_copy pool a =
let n = Array.length a in
let copy_part a b i =
let s = (i * n / num_domains) in
let e = (i+1) * n / num_domains - 1 in
Array.blit a s b s (e - s + 1) in
let b = Array.create_float n in
let rec aux acc num_domains i =
if (i = num_domains) then
(List.iter (fun e -> T.await pool e) acc)
else begin
aux ((T.async pool (fun _ -> copy_part a b i))::acc) num_domains (i+1)
end
in
aux [] num_domains 0;
b
end

open SquareMatrix

let lup pool (a0 : float array) =
let a = parallel_copy pool a0 in
for k = 0 to (mat_size - 2) do
T.parallel_for pool ~start:(k + 1) ~finish:(mat_size -1)
~body:(fun row ->
let factor = get a row k /. get a k k in
for col = k + 1 to mat_size-1 do
set a row col (get a row col -. factor *. (get a k col))
done;
set a row k factor )
done ;
a

let () =
let pool = T.setup_pool ~num_domains:(num_domains - 1) in
let a = parallel_create pool
(fun _ _ -> (Random.State.float (Domain.DLS.get k) 100.0) +. 1.0 ) in
let lu = lup pool a in
let _l = parallel_create pool (fun i j -> if i > j then get lu i j else if i = j then 1.0 else 0.0) in
let _u = parallel_create pool (fun i j -> if i <= j then get lu i j else 0.0) in
T.teardown_pool pool
8 changes: 8 additions & 0 deletions test/dune
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@
(modules game_of_life_multicore)
(modes native))

(test
(name LU_decomposition_multicore)
(libraries domainslib)
(flags (:standard -runtime-variant d))
(modules LU_decomposition_multicore)
(modes native))


(test
(name spectralnorm2)
(modules spectralnorm2)
Expand Down