Skip to content

Commit cfb049d

Browse files
committed
Thread pool simplifications
- Add `Run_in_systhread` effect. This makes the thread-pool part of the scheduler's state, avoiding the need for DLS. - Remove `max_standby_systhreads_per_domain`. Suggested by Vesa Karvonen. - Simplify termination. Use one atomic instead of two. - Use the scheduler's timer to drop idle threads. Modified Zzz to allow non-fiber timeouts.
1 parent 225c806 commit cfb049d

File tree

13 files changed

+227
-120
lines changed

13 files changed

+227
-120
lines changed

lib_eio/unix/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
(language c)
66
(include_dirs include)
77
(names fork_action stubs))
8-
(libraries eio unix threads mtime.clock.os))
8+
(libraries eio eio.utils unix threads mtime.clock.os))
99

1010
(rule
1111
(enabled_if %{bin-available:lintcstubs_arity_cmt})

lib_eio/unix/eio_unix.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ let () =
2222
let sleep d =
2323
Eio.Time.Mono.sleep (Effect.perform Private.Get_monotonic_clock) d
2424

25-
let run_in_systhread = Private.run_in_systhread
25+
let run_in_systhread = Thread_pool.run_in_systhread
2626

2727
module Ipaddr = Net.Ipaddr
2828

lib_eio/unix/eio_unix.mli

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@ val sleep : float -> unit
5151
It can also be used in programs that don't care about tracking determinism. *)
5252

5353
val run_in_systhread : ?label:string -> (unit -> 'a) -> 'a
54-
(** [run_in_systhread fn] runs the function [fn] in a newly created system thread (a {! Thread.t}).
55-
This allows blocking calls to be made non-blocking.
54+
(** [run_in_systhread fn] runs the function [fn] using a pool of system threads ({! Thread.t}).
55+
56+
This pool creates a new system thread if all threads are busy, it does not wait.
57+
[run_in_systhread] allows blocking calls to be made non-blocking.
5658
5759
@param label The operation name to use in trace output. *)
5860

@@ -97,6 +99,8 @@ module Private : sig
9799
module Rcfd = Rcfd
98100

99101
module Fork_action = Fork_action
102+
103+
module Thread_pool = Thread_pool
100104
end
101105

102106
module Pi = Pi

lib_eio/unix/net.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ let getnameinfo (sockaddr : Eio.Net.Sockaddr.t) =
5151
| `Udp _ -> [Unix.NI_DGRAM]
5252
in
5353
let sockaddr = sockaddr_to_unix sockaddr in
54-
Private.run_in_systhread ~label:"getnameinfo" (fun () ->
54+
Thread_pool.run_in_systhread ~label:"getnameinfo" (fun () ->
5555
let Unix.{ni_hostname; ni_service} = Unix.getnameinfo sockaddr options in
5656
(ni_hostname, ni_service))
5757

lib_eio/unix/private.ml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,4 @@ let pipe sw = Effect.perform (Pipe sw)
1616

1717
module Rcfd = Rcfd
1818
module Fork_action = Fork_action
19-
20-
let run_in_systhread ?(label="systhread") fn =
21-
Eio.Private.Suspend.enter label @@ fun _ctx enqueue ->
22-
Thread_pool.run_on_systhread ~enqueue fn
19+
module Thread_pool = Thread_pool

lib_eio/unix/thread_pool.ml

Lines changed: 100 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,18 @@
1-
(* This thread pool does not spawn threads in advance,
2-
but up to [max_standby_systhreads_per_domain] threads are
3-
kept alive to wait for more work to arrive.
4-
This number was chosen somewhat arbitrarily but benchmarking
5-
shows it to be a good compromise. *)
6-
let max_standby_systhreads_per_domain = 20
1+
module Zzz = Eio_utils.Zzz
72

83
type job =
94
| New
105
| Exit
116
| Job : {
12-
fn: unit -> 'a;
13-
enqueue: ('a, exn) result -> unit;
7+
fn : unit -> 'a;
8+
enqueue : ('a, exn) result -> unit;
149
} -> job
1510

1611
(* Mailbox with blocking semaphore *)
1712
module Mailbox = struct
1813
type t = {
19-
available: Semaphore.Binary.t;
20-
mutable cell: job;
14+
available : Semaphore.Binary.t;
15+
mutable cell : job;
2116
}
2217

2318
let create () = { available = Semaphore.Binary.make false; cell = New }
@@ -33,69 +28,102 @@ module Mailbox = struct
3328
mbox.cell
3429
end
3530

36-
(* A lock-free Treiber stack of systhreads on stand-by.
37-
A fresh thread is created if no thread is immediately available.
38-
When the domain exits all thread on stand-by are shutdown. *)
31+
module Free_pool = struct
32+
type list =
33+
| Empty
34+
| Closed
35+
| Free of Mailbox.t * list
36+
37+
type t = list Atomic.t
38+
39+
let rec close_list = function
40+
| Free (x, xs) -> Mailbox.put x Exit; close_list xs
41+
| Empty | Closed -> ()
42+
43+
let close t =
44+
let items = Atomic.exchange t Closed in
45+
close_list items
46+
47+
let rec drop t =
48+
match Atomic.get t with
49+
| Closed | Empty -> ()
50+
| Free _ as items ->
51+
if Atomic.compare_and_set t items Empty then close_list items
52+
else drop t
53+
54+
let rec put t mbox =
55+
match Atomic.get t with
56+
| Closed -> assert false
57+
| (Empty | Free _) as current ->
58+
let next = Free (mbox, current) in
59+
if not (Atomic.compare_and_set t current next) then
60+
put t mbox (* concurrent update, try again *)
61+
62+
let make_thread t =
63+
let mbox = Mailbox.create () in
64+
let _thread : Thread.t = Thread.create (fun () ->
65+
while true do
66+
match Mailbox.take mbox with
67+
| New -> assert false
68+
| Exit -> raise Thread.Exit
69+
| Job { fn; enqueue } ->
70+
let result = try Ok (fn ()) with exn -> Error exn in
71+
put t mbox; (* Ensure thread is in free-pool before enqueuing. *)
72+
enqueue result
73+
done
74+
) ()
75+
in
76+
mbox
77+
78+
let rec get_thread t =
79+
match Atomic.get t with
80+
| Closed -> invalid_arg "Thread pool closed!"
81+
| Empty -> make_thread t
82+
| Free (mbox, next) as current ->
83+
if Atomic.compare_and_set t current next then mbox
84+
else get_thread t (* concurrent update, try again *)
85+
end
86+
3987
type t = {
40-
threads: (Mailbox.t * int) list Atomic.t;
41-
terminating: bool Atomic.t;
88+
free : Free_pool.t;
89+
sleep_q : Zzz.t;
90+
mutable timeout : Zzz.Key.t option;
4291
}
4392

44-
let create () = { threads = Atomic.make []; terminating = Atomic.make false }
45-
46-
let terminate { threads; terminating } =
47-
Atomic.set terminating true;
48-
List.iter (fun (mbox, _) -> Mailbox.put mbox Exit) (Atomic.get threads)
49-
50-
let rec keep_thread_or_exit ({ threads; _ } as pool) mbox =
51-
match Atomic.get threads with
52-
| (_, count) :: _ when count >= max_standby_systhreads_per_domain ->
53-
(* We've got enough threads on stand-by, so discard the current thread *)
54-
raise Thread.Exit
55-
| current ->
56-
let count = match current with
57-
| [] -> 0
58-
| (_, count) :: _ -> count
93+
type _ Effect.t += Run_in_systhread : (unit -> 'a) -> (('a, exn) result * t) Effect.t
94+
95+
let terminate t =
96+
Free_pool.close t.free;
97+
Option.iter (fun key -> Zzz.remove t.sleep_q key; t.timeout <- None) t.timeout
98+
99+
let create ~sleep_q =
100+
{ free = Atomic.make Free_pool.Empty; sleep_q; timeout = None }
101+
102+
let run t fn =
103+
match fn () with
104+
| x -> terminate t; x
105+
| exception ex ->
106+
let bt = Printexc.get_raw_backtrace () in
107+
terminate t;
108+
Printexc.raise_with_backtrace ex bt
109+
110+
let submit t ~ctx ~enqueue fn =
111+
match Eio.Private.Fiber_context.get_error ctx with
112+
| Some e -> enqueue (Error e)
113+
| None ->
114+
let mbox = Free_pool.get_thread t.free in
115+
Mailbox.put mbox (Job { fn; enqueue })
116+
117+
let run_in_systhread ?(label="systhread") fn =
118+
Eio.Private.Trace.suspend_fiber label;
119+
let r, t = Effect.perform (Run_in_systhread fn) in
120+
if t.timeout = None then (
121+
let time =
122+
Mtime.add_span (Mtime_clock.now ()) Mtime.Span.(20 * ms)
123+
|> Option.value ~default:Mtime.max_stamp
59124
in
60-
if not (Atomic.compare_and_set threads current ((mbox, count + 1) :: current))
61-
then keep_thread_or_exit pool mbox (* concurrent update, try again *)
62-
63-
let make_thread pool =
64-
let mbox = Mailbox.create () in
65-
let _t : Thread.t = Thread.create (fun () ->
66-
while true do
67-
match Mailbox.take mbox with
68-
| New -> assert false
69-
| Exit -> raise Thread.Exit
70-
| Job { fn; enqueue } ->
71-
enqueue (try Ok (fn ()) with exn -> Error exn);
72-
(* We're not yielding inside of [keep_thread_or_exit] so
73-
no need to check [terminating] multiple times *)
74-
if Atomic.get pool.terminating then raise Thread.Exit;
75-
keep_thread_or_exit pool mbox
76-
done
77-
) ()
78-
in
79-
mbox
80-
81-
let rec get_thread ({ threads; _ } as pool) =
82-
match Atomic.get threads with
83-
| [] -> make_thread pool
84-
| ((mbox, _count) :: rest) as current ->
85-
if not (Atomic.compare_and_set threads current rest)
86-
then get_thread pool (* concurrent update, try again *)
87-
else mbox
88-
89-
(* https://v2.ocaml.org/manual/parallelism.html#s:par_systhread_interaction
90-
"Only one systhread at a time is allowed to run OCaml code on a particular domain."
91-
So we keep a separate threadpool per domain. *)
92-
let key =
93-
Domain.DLS.new_key @@ fun () ->
94-
let pool = create () in
95-
Domain.at_exit (fun () -> terminate pool);
96-
pool
97-
98-
let run_on_systhread ~enqueue fn =
99-
let pool = Domain.DLS.get key in
100-
let mbox = get_thread pool in
101-
Mailbox.put mbox (Job { fn; enqueue })
125+
t.timeout <- Some (Zzz.add t.sleep_q time (Fn (fun () -> Free_pool.drop t.free; t.timeout <- None)))
126+
);
127+
match r with
128+
| Ok x -> x
129+
| Error ex -> raise ex

lib_eio/unix/thread_pool.mli

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,25 @@
1-
val run_on_systhread : enqueue:(('a, exn) result -> unit) -> (unit -> 'a) -> unit
1+
(** A pool of systhreads, to avoid the overhead of creating a new thread for each operation. *)
2+
3+
type t
4+
5+
val create : sleep_q:Eio_utils.Zzz.t -> t
6+
(** [create ~sleep_q] is a new thread pool.
7+
8+
[sleep_q] is used to register a clean-up task to finish idle threads. *)
9+
10+
val run : t -> (unit -> 'a) -> 'a
11+
(** [run t fn] runs [fn ()] and then marks [t] as closed, releasing all idle threads. *)
12+
13+
val submit :
14+
t ->
15+
ctx:Eio.Private.Fiber_context.t ->
16+
enqueue:(('a, exn) result -> unit) ->
17+
(unit -> 'a) ->
18+
unit
19+
(** [submit t ~ctx ~enqueue fn] starts running [fn] in a sys-thread, which uses [enqueue] to return the result.
20+
21+
If [ctx] is already cancelled then the error is passed to [enqueue] immediately.
22+
Systhreads do not respond to cancellation once running. *)
23+
24+
type _ Effect.t += Run_in_systhread : (unit -> 'a) -> (('a, exn) result * t) Effect.t
25+
val run_in_systhread : ?label:string -> (unit -> 'a) -> 'a

lib_eio/utils/zzz.ml

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@ module Key = struct
33
let compare = Optint.Int63.compare
44
end
55

6+
type item =
7+
| Fiber of unit Suspended.t
8+
| Fn of (unit -> unit)
9+
610
module Job = struct
711
type t = {
812
time : Mtime.t;
9-
thread : unit Suspended.t;
13+
item : item;
1014
}
1115

1216
let compare a b = Mtime.compare a.time b.time
@@ -21,10 +25,10 @@ type t = {
2125

2226
let create () = { sleep_queue = Q.empty; next_id = Optint.Int63.zero }
2327

24-
let add t time thread =
28+
let add t time item =
2529
let id = t.next_id in
2630
t.next_id <- Optint.Int63.succ t.next_id;
27-
let sleeper = { Job.time; thread } in
31+
let sleeper = { Job.time; item } in
2832
t.sleep_queue <- Q.add id sleeper t.sleep_queue;
2933
id
3034

@@ -33,9 +37,13 @@ let remove t id =
3337

3438
let pop t ~now =
3539
match Q.min t.sleep_queue with
36-
| Some (_, { Job.time; thread }) when time <= now ->
37-
Eio.Private.Fiber_context.clear_cancel_fn thread.fiber;
40+
| Some (_, { Job.time; item }) when time <= now ->
41+
begin
42+
match item with
43+
| Fiber k -> Eio.Private.Fiber_context.clear_cancel_fn k.fiber
44+
| Fn _ -> ()
45+
end;
3846
t.sleep_queue <- Option.get (Q.rest t.sleep_queue);
39-
`Due thread
47+
`Due item
4048
| Some (_, { Job.time; _ }) -> `Wait_until time
4149
| None -> `Nothing

lib_eio/utils/zzz.mli

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,24 @@ end
88
type t
99
(** A set of timers (implemented as a priority queue). *)
1010

11+
type item =
12+
| Fiber of unit Suspended.t
13+
| Fn of (unit -> unit)
14+
1115
val create : unit -> t
1216
(** [create ()] is a fresh empty queue. *)
1317

14-
val add : t -> Mtime.t -> unit Suspended.t -> Key.t
15-
(** [add t time thread] adds a new event, due at [time], and returns its ID.
16-
You must use {!Eio.Private.Fiber_context.set_cancel_fn} on [thread] before
17-
calling {!pop}.
18-
Your cancel function should call {!remove} (in addition to resuming [thread]). *)
18+
val add : t -> Mtime.t -> item -> Key.t
19+
(** [add t time item] adds a new event, due at [time], and returns its ID.
20+
21+
If [item] is a {!Fiber},
22+
you must use {!Eio.Private.Fiber_context.set_cancel_fn} on it before calling {!pop}.
23+
Your cancel function should call {!remove} (in addition to resuming it). *)
1924

2025
val remove : t -> Key.t -> unit
2126
(** [remove t key] removes an event previously added with [add]. *)
2227

23-
val pop : t -> now:Mtime.t -> [`Due of unit Suspended.t | `Wait_until of Mtime.t | `Nothing]
24-
(** [pop ~now t] removes and returns the earliest thread due by [now].
25-
It also clears the thread's cancel function.
26-
If no thread is due yet, it returns the time the earliest thread becomes due. *)
28+
val pop : t -> now:Mtime.t -> [`Due of item | `Wait_until of Mtime.t | `Nothing]
29+
(** [pop ~now t] removes and returns the earliest item due by [now].
30+
For fibers, it also clears the thread's cancel function.
31+
If no item is due yet, it returns the time the earliest item becomes due. *)

lib_eio_linux/low_level.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ let noop () =
113113

114114
let sleep_until time =
115115
Sched.enter "sleep" @@ fun t k ->
116-
let job = Eio_utils.Zzz.add t.sleep_q time k in
116+
let job = Eio_utils.Zzz.add t.sleep_q time (Fiber k) in
117117
Eio.Private.Fiber_context.set_cancel_fn k.fiber (fun ex ->
118118
Eio_utils.Zzz.remove t.sleep_q job;
119119
Sched.enqueue_failed_thread t k ex

0 commit comments

Comments
 (0)