Skip to content

Commit 07b0f18

Browse files
committed
WIP: Add domain local await support
1 parent 446d457 commit 07b0f18

File tree

5 files changed

+79
-2
lines changed

5 files changed

+79
-2
lines changed

lib/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
(library
22
(name domainslib)
33
(public_name domainslib)
4-
(libraries lockfree))
4+
(libraries lockfree kcas))

lib/multi_channel.ml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,11 @@ type dls_state = {
3232
mc: mutex_condvar;
3333
}
3434

35+
module Foreign_queue = Lockfree.Michael_scott_queue
36+
3537
type 'a t = {
3638
channels: 'a Ws_deque.t array;
39+
foreign_queue: 'a Foreign_queue.t;
3740
waiters: (waiting_status ref * mutex_condvar ) Chan.t;
3841
next_domain_id: int Atomic.t;
3942
recv_block_spins: int;
@@ -54,6 +57,7 @@ let rec log2 n =
5457

5558
let make ?(recv_block_spins = 2048) n =
5659
{ channels = Array.init n (fun _ -> Ws_deque.create ());
60+
foreign_queue = Foreign_queue.create ();
5761
waiters = Chan.make_unbounded ();
5862
next_domain_id = Atomic.make 0;
5963
recv_block_spins;
@@ -109,6 +113,10 @@ let rec check_waiters mchan =
109113
end
110114
end
111115

116+
let send_foreign mchan v =
117+
Foreign_queue.push mchan.foreign_queue v;
118+
check_waiters mchan
119+
112120
let send mchan v =
113121
let id = (get_local_state mchan).id in
114122
Ws_deque.push (Array.unsafe_get mchan.channels id) v;
@@ -137,7 +145,10 @@ let recv_poll_with_dls mchan dls =
137145
try
138146
Ws_deque.pop (Array.unsafe_get mchan.channels dls.id)
139147
with
140-
| Exit -> recv_poll_loop mchan dls 0
148+
| Exit ->
149+
match Foreign_queue.pop mchan.foreign_queue with
150+
| None -> recv_poll_loop mchan dls 0
151+
| Some v -> v
141152
[@@inline]
142153

143154
let recv_poll mchan =

lib/task.ml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
open Effect
22
open Effect.Deep
3+
module Domain_local_await = Kcas.Domain_local_await
34

45
type 'a task = unit -> 'a
56

@@ -80,11 +81,36 @@ let async pool f =
8081
Multi_channel.send pd.task_chan (Work (fun _ -> step (do_task f) p));
8182
p
8283

84+
let prepare_for_await chan _mode =
85+
(* Cancellation is not supported, so mode can be ignored. *)
86+
let promise = Atomic.make (Pending []) in
87+
let release () =
88+
match Atomic.get promise with
89+
| (Returned _ | Raised _) -> ()
90+
| Pending _ ->
91+
match Atomic.exchange promise (Returned ()) with
92+
| Pending ks ->
93+
ks
94+
|> List.iter @@ fun (k, c) ->
95+
Multi_channel.send_foreign c (Work (fun _ -> continue k ()))
96+
| _ -> ()
97+
and await () =
98+
match Atomic.get promise with
99+
| (Returned _ | Raised _) -> ()
100+
| Pending _ -> perform (Wait (promise, chan))
101+
in
102+
Domain_local_await.{ release; await }
103+
83104
let rec worker task_chan =
84105
match Multi_channel.recv task_chan with
85106
| Quit -> Multi_channel.clear_local_state task_chan
86107
| Work f -> f (); worker task_chan
87108

109+
let worker task_chan =
110+
Domain_local_await.using
111+
~prepare_for_await:(prepare_for_await task_chan)
112+
~while_running:(fun () -> worker task_chan)
113+
88114
let run (type a) pool (f : unit -> a) : a =
89115
let pd = get_pool_data pool in
90116
let p = Atomic.make (Pending []) in
@@ -105,6 +131,11 @@ let run (type a) pool (f : unit -> a) : a =
105131
in
106132
loop ()
107133

134+
let run pool f =
135+
Domain_local_await.using
136+
~prepare_for_await:(prepare_for_await (get_pool_data pool).task_chan)
137+
~while_running:(fun () -> run pool f)
138+
108139
let named_pools = Hashtbl.create 8
109140
let named_pools_mutex = Mutex.create ()
110141

test/dune

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@
1515
(modules fib_par)
1616
(modes native))
1717

18+
(test
19+
(name kcas_integration)
20+
(libraries domainslib kcas)
21+
(modules kcas_integration)
22+
(modes native))
23+
1824
(test
1925
(name enumerate_par)
2026
(libraries domainslib)

test/kcas_integration.ml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
open Kcas
2+
module T = Domainslib.Task
3+
4+
let var = Loc.make None
5+
6+
let () =
7+
let n = 100 in
8+
let pool_domain =
9+
Domain.spawn @@ fun () ->
10+
let pool =
11+
T.setup_pool ~num_domains:(Domain.recommended_domain_count () - 2) ()
12+
in
13+
T.run pool (fun () ->
14+
T.parallel_for ~start:1 ~finish:n
15+
~body:(fun i ->
16+
ignore @@ Loc.update var
17+
@@ function None -> Some i | _ -> Retry.later ())
18+
pool);
19+
T.teardown_pool pool;
20+
Printf.printf "Done\n%!"
21+
in
22+
for _ = 1 to n do
23+
match
24+
Loc.update var @@ function None -> Retry.later () | Some _ -> None
25+
with
26+
| None -> failwith "impossible"
27+
| Some i -> Printf.printf "Got %d\n%!" i
28+
done;
29+
Domain.join pool_domain

0 commit comments

Comments
 (0)