@@ -196,10 +196,8 @@ type t = {
196196 run_q : runnable Lf_queue .t ;
197197
198198 (* When adding to [run_q] from another domain, this domain may be sleeping and so won't see the event.
199- In that case, [need_wakeup = true] and you must signal using [eventfd]. You must hold [eventfd_mutex]
200- when writing to or closing [eventfd]. *)
199+ In that case, [need_wakeup = true] and you must signal using [eventfd]. *)
201200 eventfd : FD .t ;
202- eventfd_mutex : Mutex .t ;
203201
204202 (* If [false], the main thread will check [run_q] before sleeping again
205203 (possibly because an event has been or will be sent to [eventfd]).
@@ -216,21 +214,22 @@ let wake_buffer =
216214 Bytes. set_int64_ne b 0 1L ;
217215 b
218216
217+ (* This can be called from any systhread (including ones not running Eio),
218+ and also from signal handlers or GC finalizers. It must not take any locks. *)
219219let wakeup t =
220- Mutex. lock t.eventfd_mutex;
221- match
222- Log. debug ( fun f -> f " Sending wakeup on eventfd %a " FD. pp t.eventfd);
223- Atomic. set t.need_wakeup false ; (* [t] will check [run_q] after getting the event below *)
224- let sent = Unix. single_write ( FD. get_exn " wakeup " t.eventfd) wake_buffer 0 8 in
220+ Atomic. set t.need_wakeup false ; (* [t] will check [run_q] after getting the event below *)
221+ match t.eventfd.fd with
222+ | `Closed -> () (* Domain has shut down (presumably after handling the event) *)
223+ | `Open fd ->
224+ let sent = Unix. single_write fd wake_buffer 0 8 in
225225 assert (sent = 8 )
226- with
227- | () -> Mutex. unlock t.eventfd_mutex
228- | exception ex -> Mutex. unlock t.eventfd_mutex; raise ex
229226
227+ (* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *)
230228let enqueue_thread st k x =
231229 Lf_queue. push st.run_q (Thread (k, x));
232230 if Atomic. get st.need_wakeup then wakeup st
233231
232+ (* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *)
234233let enqueue_failed_thread st k ex =
235234 Lf_queue. push st.run_q (Failed_thread (k, ex));
236235 if Atomic. get st.need_wakeup then wakeup st
@@ -964,7 +963,42 @@ module Low_level = struct
964963 |> List. filter_map to_eio_sockaddr_t
965964end
966965
967- external eio_eventfd : int -> Unix .file_descr = " caml_eio_eventfd"
966+ module EventFD_pool : sig
967+ (* We need to write to event FDs from signal handlers and GC finalizers.
968+ This means we can't take a lock, which means we can't easily prevent
969+ the owning domain from closing the FD while we're writing to it
970+ (which could result in us writing to an unreleaded file if the FD
971+ got reused). To avoid that, we never close event FDs but just return them
972+ to a free pool.
973+
974+ The case where this matters is:
975+
976+ 1. Some other systhread calls [wakeup].
977+ 2 [wakeup] adds an item to the run-queue and sees it needs to send a wake-up event.
978+ 3. The domain wakes up for some other reason, handles the event, then shuts down.
979+ 4. The original systhread writes to the eventfd.
980+ *)
981+
982+ val get : unit -> Unix .file_descr
983+ (* Take the next free eventfd from the pool, or create a new one if the pool's empty.
984+ You might get a few spurious events from it as other threads are shutting down,
985+ so you must be able to cope with that. *)
986+
987+ val put : Unix .file_descr -> unit
988+ (* [put fd] adds [fd] to the free pool. *)
989+ end = struct
990+ external eio_eventfd : int -> Unix .file_descr = " caml_eio_eventfd"
991+
992+ let free = Lf_queue. create ()
993+
994+ let get () =
995+ match Lf_queue. pop free with
996+ | Some fd -> fd
997+ | None -> eio_eventfd 0
998+
999+ let put fd =
1000+ Lf_queue. push free fd
1001+ end
9681002
9691003type has_fd = < fd : FD .t >
9701004type source = < Eio.Flow .source ; Eio.Flow .close ; has_fd >
@@ -1405,12 +1439,11 @@ let rec run : type a.
14051439 in
14061440 let run_q = Lf_queue. create () in
14071441 Lf_queue. push run_q IO ;
1408- let eventfd_mutex = Mutex. create () in
14091442 let sleep_q = Zzz. create () in
14101443 let io_q = Queue. create () in
14111444 let mem_q = Queue. create () in
14121445 let eventfd = FD. placeholder ~seekable: false ~close_unix: false in
1413- let st = { mem; uring; run_q; eventfd_mutex; io_q; mem_q; eventfd; need_wakeup = Atomic. make false ; sleep_q } in
1446+ let st = { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic. make false ; sleep_q } in
14141447 Log. debug (fun l -> l " starting main thread" );
14151448 let rec fork ~new_fiber :fiber fn =
14161449 let open Effect.Deep in
@@ -1542,13 +1575,11 @@ let rec run : type a.
15421575 let new_fiber = Fiber_context. make_root () in
15431576 fork ~new_fiber (fun () ->
15441577 Switch. run_protected (fun sw ->
1545- let fd = eio_eventfd 0 in
1578+ let fd = EventFD_pool. get () in
15461579 st.eventfd.fd < - `Open fd;
15471580 Switch. on_release sw (fun () ->
1548- Mutex. lock st.eventfd_mutex;
1549- FD. close st.eventfd;
1550- Mutex. unlock st.eventfd_mutex;
1551- Unix. close fd
1581+ let unix = FD. to_unix `Take st.eventfd in
1582+ EventFD_pool. put unix
15521583 );
15531584 Log. debug (fun f -> f " Monitoring eventfd %a" FD. pp st.eventfd);
15541585 result := Some (
0 commit comments