Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
(luv (>= 0.5.11))
(luv_unix (>= 0.5.0))
(mdx (and (>= 1.10.0) :with-test))
(logs (>= 0.7.0))
(fmt (>= 0.8.9))))
(package
(name eio_main)
Expand Down
1 change: 0 additions & 1 deletion eio_luv.opam
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ depends: [
"luv" {>= "0.5.11"}
"luv_unix" {>= "0.5.0"}
"mdx" {>= "1.10.0" & with-test}
"logs" {>= "0.7.0"}
"fmt" {>= "0.8.9"}
"odoc" {with-doc}
]
Expand Down
62 changes: 8 additions & 54 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ module FD = struct
Eio.Switch.remove_hook t.release_hook;
if t.close_unix then (
let res = Effect.perform (Close fd) in
Log.debug (fun l -> l "close: woken up");
if res < 0 then
raise (wrap_error (Uring.error_of_errno res) "close" (string_of_int (Obj.magic fd : int)))
)
Expand Down Expand Up @@ -115,11 +114,6 @@ module FD = struct
let uring_file_offset t =
if t.seekable then Optint.Int63.minus_one else Optint.Int63.zero

let pp f t =
match t.fd with
| `Open fd -> Fmt.pf f "%d" (Obj.magic fd : int)
| `Closed -> Fmt.string f "(closed)"

let fstat t =
(* todo: use uring *)
try
Expand Down Expand Up @@ -245,7 +239,6 @@ let enter fn = Effect.perform (Enter fn)

(* Cancellations always come from the same domain, so no need to send wake events here. *)
let rec enqueue_cancel job st =
Log.debug (fun l -> l "cancel: submitting call");
Ctf.label "cancel";
match Uring.cancel st.uring job Cancel_job with
| None -> Queue.push (fun st -> enqueue_cancel job st) st.io_q
Expand Down Expand Up @@ -315,7 +308,6 @@ let enqueue_read st action (file_offset,fd,buf,len) =
| None -> FD.uring_file_offset fd
in
let req = { op=`R; file_offset; len; fd; cur_off = 0; buf; action } in
Log.debug (fun l -> l "read: submitting call");
Ctf.label "read";
submit_rw_req st req

Expand Down Expand Up @@ -355,7 +347,6 @@ let rec enqueue_writev args st action =
Queue.push (fun st -> enqueue_writev args st action) st.io_q

let rec enqueue_poll_add fd poll_mask st action =
Log.debug (fun l -> l "poll_add: submitting call");
Ctf.label "poll_add";
match FD.get "poll_add" fd with
| Error ex -> enqueue_failed_thread st action ex
Expand All @@ -368,7 +359,6 @@ let rec enqueue_poll_add fd poll_mask st action =
Queue.push (fun st -> enqueue_poll_add fd poll_mask st action) st.io_q

let rec enqueue_poll_add_unix fd poll_mask st action cb =
Log.debug (fun l -> l "poll_add: submitting call");
Ctf.label "poll_add";
let retry = with_cancel_hook ~action st (fun () ->
Uring.poll_add st.uring fd poll_mask (Job_fn (action, cb))
Expand All @@ -378,7 +368,6 @@ let rec enqueue_poll_add_unix fd poll_mask st action cb =
Queue.push (fun st -> enqueue_poll_add_unix fd poll_mask st action cb) st.io_q

let rec enqueue_close st action fd =
Log.debug (fun l -> l "close: submitting call");
Ctf.label "close";
let subm = Uring.close st.uring fd (Job_no_cancel action) in
if subm = None then (* wait until an sqe is available *)
Expand All @@ -391,12 +380,10 @@ let enqueue_write st action (file_offset,fd,buf,len) =
| None -> FD.uring_file_offset fd
in
let req = { op=`W; file_offset; len; fd; cur_off = 0; buf; action } in
Log.debug (fun l -> l "write: submitting call");
Ctf.label "write";
submit_rw_req st req

let rec enqueue_splice ~src ~dst ~len st action =
Log.debug (fun l -> l "splice: submitting call");
Ctf.label "splice";
match FD.get "splice-src" src, FD.get "splice-dst" dst with
| Error ex, _
Expand All @@ -410,7 +397,6 @@ let rec enqueue_splice ~src ~dst ~len st action =
Queue.push (fun st -> enqueue_splice ~src ~dst ~len st action) st.io_q

let rec enqueue_openat2 ((access, flags, perm, resolve, dir, path) as args) st action =
Log.debug (fun l -> l "openat2: submitting call");
Ctf.label "openat2";
let use fd =
let retry = with_cancel_hook ~action st (fun () ->
Expand Down Expand Up @@ -441,7 +427,6 @@ let rec enqueue_unlink ((dir, fd, path) as args) st action =
Queue.push (fun st -> enqueue_unlink args st action) st.io_q

let rec enqueue_connect fd addr st action =
Log.debug (fun l -> l "connect: submitting call");
Ctf.label "connect";
match FD.get "connect" fd with
| Error ex -> enqueue_failed_thread st action ex
Expand All @@ -464,7 +449,6 @@ let rec extract_fds = function
| Ok fds -> Ok (fd :: fds)

let rec enqueue_send_msg fd ~fds ~dst buf st action =
Log.debug (fun l -> l "send_msg: submitting call");
Ctf.label "send_msg";
match FD.get "send_msg" fd, extract_fds fds with
| Error ex, _
Expand All @@ -478,7 +462,6 @@ let rec enqueue_send_msg fd ~fds ~dst buf st action =
Queue.push (fun st -> enqueue_send_msg fd ~fds ~dst buf st action) st.io_q

let rec enqueue_recv_msg fd msghdr st action =
Log.debug (fun l -> l "recv_msg: submitting call");
Ctf.label "recv_msg";
match FD.get "recv_msg" fd with
| Error ex -> enqueue_failed_thread st action ex
Expand All @@ -491,7 +474,6 @@ let rec enqueue_recv_msg fd msghdr st action =
Queue.push (fun st -> enqueue_recv_msg fd msghdr st action) st.io_q

let rec enqueue_accept fd client_addr st action =
Log.debug (fun l -> l "accept: submitting call");
Ctf.label "accept";
match FD.get "accept" fd with
| Error ex -> enqueue_failed_thread st action ex
Expand All @@ -505,7 +487,6 @@ let rec enqueue_accept fd client_addr st action =
)

let rec enqueue_noop st action =
Log.debug (fun l -> l "noop: submitting call");
Ctf.label "noop";
let retry = (Uring.noop st.uring (Job_no_cancel action) = None) in
if retry then (
Expand Down Expand Up @@ -558,17 +539,13 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] =
Some (diff_ns /. 1e9)
| `Nothing -> None
in
Log.debug (fun l -> l "@[<v2>scheduler out of jobs, next timeout %s:@,%a@]"
(match timeout with None -> "inf" | Some v -> string_of_float v)
Uring.Stats.pp (Uring.get_debug_stats uring));
if not (Lf_queue.is_empty st.run_q) then (
Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *)
schedule st
) else if timeout = None && Uring.active_ops uring = 0 then (
(* Nothing further can happen at this point.
If there are no events in progress but also still no memory available, something has gone wrong! *)
assert (Queue.length mem_q = 0);
Log.debug (fun l -> l "schedule: exiting"); (* Nothing left to do *)
Lf_queue.close st.run_q; (* Just to catch bugs if something tries to enqueue later *)
`Exit_scheduler
) else (
Expand Down Expand Up @@ -600,10 +577,8 @@ and handle_complete st ~runnable result =
submit_pending_io st; (* If something was waiting for a slot, submit it now. *)
match runnable with
| Read req ->
Log.debug (fun l -> l "read returned");
complete_rw_req st req result
| Write req ->
Log.debug (fun l -> l "write returned");
complete_rw_req st req result
| Job k ->
Fiber_context.clear_cancel_fn k.fiber;
Expand All @@ -618,14 +593,14 @@ and handle_complete st ~runnable result =
| Job_no_cancel k ->
Suspended.continue k result
| Cancel_job ->
Log.debug (fun l -> l "cancel returned");
if result = -2 then (
Log.debug (fun f -> f "Cancel returned ENOENT - operation completed before cancel took effect")
) else if result = -114 then (
Log.debug (fun f -> f "Cancel returned EALREADY - operation cancelled while already in progress")
) else if result <> 0 then (
Log.warn (fun f -> f "Cancel returned unexpected error: %s" (Unix.error_message (Uring.error_of_errno result)))
);
begin match result with
| 0 (* Operation cancelled successfully *)
| -2 (* ENOENT - operation completed before cancel took effect *)
| -114 (* EALREADY - operation already in progress *)
-> ()
| errno ->
Log.warn (fun f -> f "Cancel returned unexpected error: %s" (Unix.error_message (Uring.error_of_errno errno)))
end;
schedule st
| Job_fn (k, f) ->
Fiber_context.clear_cancel_fn k.fiber;
Expand Down Expand Up @@ -662,7 +637,6 @@ module Low_level = struct
match st.mem with
| None -> Suspended.discontinue k (Failure "No fixed buffer available")
| Some mem ->
Log.debug (fun l -> l "alloc: %d" (Uring.Region.avail mem));
match Uring.Region.alloc mem with
| buf -> Suspended.continue k buf
| exception Uring.Region.No_space ->
Expand All @@ -676,7 +650,6 @@ module Low_level = struct

let noop () =
let result = enter enqueue_noop in
Log.debug (fun l -> l "noop returned");
if result <> 0 then raise (unclassified_error (Eio_unix.Unix_error (Uring.error_of_errno result, "noop", "")))

type _ Effect.t += Sleep_until : Mtime.t -> unit Effect.t
Expand All @@ -687,14 +660,12 @@ module Low_level = struct

let read_exactly ?file_offset fd buf len =
let res = Effect.perform (ERead (file_offset, fd, buf, Exactly len)) in
Log.debug (fun l -> l "read_exactly: woken up after read");
if res < 0 then (
raise @@ wrap_error (Uring.error_of_errno res) "read_exactly" ""
)

let read_upto ?file_offset fd buf len =
let res = Effect.perform (ERead (file_offset, fd, buf, Upto len)) in
Log.debug (fun l -> l "read_upto: woken up after read");
if res < 0 then (
raise @@ wrap_error (Uring.error_of_errno res) "read_upto" ""
) else (
Expand All @@ -703,7 +674,6 @@ module Low_level = struct

let readv ?file_offset fd bufs =
let res = enter (enqueue_readv (file_offset, fd, bufs)) in
Log.debug (fun l -> l "readv: woken up after read");
if res < 0 then (
raise @@ wrap_error (Uring.error_of_errno res) "readv" ""
) else if res = 0 then (
Expand All @@ -714,7 +684,6 @@ module Low_level = struct

let writev_single ?file_offset fd bufs =
let res = enter (enqueue_writev (file_offset, fd, bufs)) in
Log.debug (fun l -> l "writev: woken up after write");
if res < 0 then (
raise @@ wrap_error (Uring.error_of_errno res) "writev" ""
) else (
Expand All @@ -737,14 +706,12 @@ module Low_level = struct

let await_readable fd =
let res = enter (enqueue_poll_add fd (Uring.Poll_mask.(pollin + pollerr))) in
Log.debug (fun l -> l "await_readable: woken up");
if res < 0 then (
raise (unclassified_error (Eio_unix.Unix_error (Uring.error_of_errno res, "await_readable", "")))
)

let await_writable fd =
let res = enter (enqueue_poll_add fd (Uring.Poll_mask.(pollout + pollerr))) in
Log.debug (fun l -> l "await_writable: woken up");
if res < 0 then (
raise (unclassified_error (Eio_unix.Unix_error (Uring.error_of_errno res, "await_writable", "")))
)
Expand All @@ -753,7 +720,6 @@ module Low_level = struct

let write ?file_offset fd buf len =
let res = Effect.perform (EWrite (file_offset, fd, buf, Exactly len)) in
Log.debug (fun l -> l "write: woken up after write");
if res < 0 then (
raise @@ wrap_error (Uring.error_of_errno res) "write" ""
)
Expand All @@ -769,14 +735,12 @@ module Low_level = struct

let splice src ~dst ~len =
let res = enter (enqueue_splice ~src ~dst ~len) in
Log.debug (fun l -> l "splice returned");
if res > 0 then res
else if res = 0 then raise End_of_file
else raise @@ wrap_error (Uring.error_of_errno res) "splice" ""

let connect fd addr =
let res = enter (enqueue_connect fd addr) in
Log.debug (fun l -> l "connect returned");
if res < 0 then (
let ex =
match addr with
Expand All @@ -788,7 +752,6 @@ module Low_level = struct

let send_msg fd ?(fds=[]) ?dst buf =
let res = enter (enqueue_send_msg fd ~fds ~dst buf) in
Log.debug (fun l -> l "send_msg returned");
if res < 0 then (
raise @@ wrap_error (Uring.error_of_errno res) "send_msg" ""
)
Expand All @@ -797,7 +760,6 @@ module Low_level = struct
let addr = Uring.Sockaddr.create () in
let msghdr = Uring.Msghdr.create ~addr buf in
let res = enter (enqueue_recv_msg fd msghdr) in
Log.debug (fun l -> l "recv_msg returned");
if res < 0 then (
raise @@ wrap_error (Uring.error_of_errno res) "recv_msg" ""
);
Expand All @@ -807,7 +769,6 @@ module Low_level = struct
let addr = Uring.Sockaddr.create () in
let msghdr = Uring.Msghdr.create ~n_fds:max_fds ~addr buf in
let res = enter (enqueue_recv_msg fd msghdr) in
Log.debug (fun l -> l "recv_msg returned");
if res < 0 then (
raise @@ wrap_error (Uring.error_of_errno res) "recv_msg" ""
);
Expand All @@ -827,7 +788,6 @@ module Low_level = struct

let openat2 ~sw ?seekable ~access ~flags ~perm ~resolve ?dir path =
let res = enter (enqueue_openat2 (access, flags, perm, resolve, dir, path)) in
Log.debug (fun l -> l "openat2 returned");
if res < 0 then (
Switch.check sw; (* If cancelled, report that instead. *)
raise @@ wrap_error_fs (Uring.error_of_errno res) "openat2" ""
Expand Down Expand Up @@ -912,7 +872,6 @@ module Low_level = struct
Ctf.label "accept";
let client_addr = Uring.Sockaddr.create () in
let res = enter (enqueue_accept fd client_addr) in
Log.debug (fun l -> l "accept returned");
if res < 0 then (
raise @@ wrap_error (Uring.error_of_errno res) "accept" ""
) else (
Expand Down Expand Up @@ -1397,7 +1356,6 @@ let monitor_event_fd t =
let buf = Cstruct.create 8 in
while true do
let got = Low_level.readv t.eventfd [buf] in
Log.debug (fun f -> f "Received wakeup on eventfd %a" FD.pp t.eventfd);
assert (got = 8);
(* We just go back to sleep now, but this will cause the scheduler to look
at the run queue again and notice any new items. *)
Expand All @@ -1424,7 +1382,6 @@ let with_uring ~queue_depth ?polling_timeout ?(fallback=no_fallback) fn =
let rec run : type a.
?queue_depth:int -> ?n_blocks:int -> ?block_size:int -> ?polling_timeout:int -> ?fallback:(_ -> a) -> (_ -> a) -> a =
fun ?(queue_depth=64) ?n_blocks ?(block_size=4096) ?polling_timeout ?fallback main ->
Log.debug (fun l -> l "starting run");
let n_blocks = Option.value n_blocks ~default:queue_depth in
let stdenv = stdenv ~run_event_loop:(run ~queue_depth ~n_blocks ~block_size ?polling_timeout ?fallback:None) in
(* TODO unify this allocation API around baregion/uring *)
Expand All @@ -1446,7 +1403,6 @@ let rec run : type a.
let mem_q = Queue.create () in
let eventfd = FD.placeholder ~seekable:false ~close_unix:false in
let st = { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q } in
Log.debug (fun l -> l "starting main thread");
let rec fork ~new_fiber:fiber fn =
let open Effect.Deep in
Ctf.note_switch (Fiber_context.tid fiber);
Expand Down Expand Up @@ -1582,7 +1538,6 @@ let rec run : type a.
let unix = FD.to_unix `Take st.eventfd in
EventFD_pool.put unix
);
Log.debug (fun f -> f "Monitoring eventfd %a" FD.pp st.eventfd);
result := Some (
Fiber.first
(fun () -> main stdenv)
Expand All @@ -1591,5 +1546,4 @@ let rec run : type a.
)
)
in
Log.debug (fun l -> l "exit");
Option.get !result
2 changes: 1 addition & 1 deletion lib_eio_luv/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(library
(name eio_luv)
(public_name eio_luv)
(libraries eio eio.unix luv luv_unix eio.utils logs fmt))
(libraries eio eio.unix luv luv_unix eio.utils fmt))
Loading