Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
(ocaml (>= 4.08))
dune
dune-configurator
kqueue
(alcotest :with-test))
(tags
(io multiplexing poll ppoll epoll kevent kqueue)))
1 change: 1 addition & 0 deletions iomux.opam
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ depends: [
"ocaml" {>= "4.08"}
"dune" {>= "3.6"}
"dune-configurator"
"kqueue"
"alcotest" {with-test}
"odoc" {with-doc}
]
Expand Down
2 changes: 1 addition & 1 deletion lib/dune
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
(name iomux)
(public_name iomux)
(modules iomux config poll util)
(libraries unix)
(libraries unix kqueue)
(foreign_stubs
(language c)
(flags
Expand Down
112 changes: 92 additions & 20 deletions lib/poll.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,26 @@ end

let has_ppoll = Config.has_ppoll

let has_kqueue = Kqueue.available

let invalid_fd = unix_of_fd (-1)

type kqueue = {
kq : Kqueue.t;
changelist : Kqueue.Event_list.t;
mutable eventlist : Kqueue.Event_list.t;
}

type fds =
| Poll of buffer
| Kqueue of kqueue

let get_poll = function
| Poll b -> b
| Kqueue _ -> assert false

type t = {
buffer : buffer;
buffer : fds;
maxfds : int;
}

Expand All @@ -50,7 +66,7 @@ let poll t used timeout =
| Nowait -> 0
| Milliseconds ms -> ms
in
Raw.poll t.buffer used timeout
Raw.poll (get_poll t.buffer) used timeout

type ppoll_timeout =
| Infinite
Expand All @@ -63,43 +79,99 @@ let ppoll t used timeout sigmask =
| Nowait -> Int64.zero
| Nanoseconds timo -> timo
in
Raw.ppoll t.buffer used timeout sigmask

let ppoll_or_poll t used (timeout : ppoll_timeout) =
if has_ppoll then
ppoll t used timeout []
else
let timeout : poll_timeout = match timeout with
| Infinite -> Infinite
| Nowait -> Nowait
| Nanoseconds timo_ns ->
Milliseconds (Int64.(to_int @@ div (add timo_ns 999_999L) 1_000_000L))
in
poll t used timeout
Raw.ppoll (get_poll t.buffer) used timeout sigmask

let kqueue k nfds timeout =
let timeout = match timeout with
| Infinite -> Kqueue.Timeout.never
| Nowait -> Kqueue.Timeout.immediate
| Nanoseconds timo -> Kqueue.Timeout.of_ns timo
in
let eventlist = if nfds = 0 then Kqueue.Event_list.null else Kqueue.Event_list.create nfds in
let n = Kqueue.kevent k.kq ~changelist:Kqueue.Event_list.null ~eventlist timeout in
k.eventlist <- eventlist;
n

let ppoll_or_poll_or_kqueue t used (timeout : ppoll_timeout) =
match t.buffer with
| Kqueue k -> kqueue k used timeout
| Poll _ ->
if has_ppoll then
ppoll t used timeout []
else
let timeout : poll_timeout = match timeout with
| Infinite -> Infinite
| Nowait -> Nowait
| Nanoseconds timo_ns ->
Milliseconds (Int64.(to_int @@ div (add timo_ns 999_999L) 1_000_000L))
in
poll t used timeout

let guard_index t index =
if index >= t.maxfds || index < 0 then
invalid_arg "index out of bounds"

let set_index t index fd events =
guard_index t index;
Raw.set_index t.buffer index (fd_of_unix fd) events
match t.buffer with
| Kqueue k ->
let changelist = Kqueue.Event_list.create 1 in
let ev1 = Kqueue.Event_list.get changelist 0 in
let filter =
if Flags.(mem events pollin) then Kqueue.Filter.read
else Kqueue.Filter.write
in
let ev2 = Kqueue.Event_list.get k.changelist index in
List.iter (fun ev ->
Kqueue.Event_list.Event.set_ident ev (Kqueue.Util.file_descr_to_int fd);
Kqueue.Event_list.Event.set_filter ev filter;
Kqueue.Event_list.Event.set_flags ev Kqueue.Flag.add) [ ev1; ev2 ];
let v : int = Kqueue.kevent k.kq ~changelist ~eventlist:Kqueue.Event_list.null Kqueue.Timeout.immediate in
assert (v = 0)
| Poll buffer -> Raw.set_index buffer index (fd_of_unix fd) events

let invalidate_index t index =
guard_index t index;
Raw.set_index t.buffer index (-1) 0
match t.buffer with
| Kqueue k ->
let ev = Kqueue.Event_list.get k.changelist index in
Kqueue.Event_list.Event.set_flags ev Kqueue.Flag.delete
| Poll buffer ->
Raw.set_index buffer index (-1) 0

let kqueue_filter_to_poll f =
if Kqueue.Filter.(f = read) then Flags.pollin
else Flags.pollout

let get_revents t index =
guard_index t index;
Raw.get_revents t.buffer index
match t.buffer with
| Kqueue k ->
let ev = Kqueue.Event_list.get k.eventlist index in
Kqueue.Event_list.Event.get_filter ev |> kqueue_filter_to_poll
| Poll buffer ->
Raw.get_revents buffer index

let get_fd t index =
guard_index t index;
Raw.get_fd t.buffer index |> unix_of_fd
match t.buffer with
| Kqueue k ->
let ev = Kqueue.Event_list.get k.eventlist index in
Kqueue.Event_list.Event.get_ident ev |> Kqueue.Util.file_descr_of_int
| Poll buffer ->
Raw.get_fd buffer index |> unix_of_fd

let create ?(maxfds=Util.max_open_files ()) () =
let len = maxfds * Config.sizeof_pollfd in
let buffer = Bigarray.(Array1.create char c_layout len) in
let buffer =
if has_kqueue
then
let eventlist = Kqueue.Event_list.create 1 in
let changelist = Kqueue.Event_list.create maxfds in
let kq = { kq = Kqueue.create (); eventlist; changelist } in
Kqueue kq
else Poll (Bigarray.(Array1.create char c_layout len))
in
let t = { buffer; maxfds } in
for i = 0 to maxfds - 1 do
invalidate_index t i
Expand Down
2 changes: 1 addition & 1 deletion lib/poll.mli
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ val ppoll : t -> int -> ppoll_timeout -> int list -> int
below. *)

(** A more portable ppoll(2) call *)
val ppoll_or_poll : t -> int -> ppoll_timeout -> int
val ppoll_or_poll_or_kqueue : t -> int -> ppoll_timeout -> int
(** [ppoll_or_poll t nfds tiemout] is like {!ppoll} if the system
{!has_ppoll}, otherwise the call is emulated via {!poll}, notably
the timeout is internally converted to milliseconds and there is
Expand Down
12 changes: 6 additions & 6 deletions test/test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ let check_bool = Alcotest.(check bool)
module U = struct
let with_leak_checker (f : unit -> unit) () =
let fetch () =
let l = List.init (Util.max_open_files () / 2) (fun _ -> Unix.(socket PF_UNIX SOCK_STREAM 0)) in
let l = List.init 0 (fun _ -> Unix.(socket PF_UNIX SOCK_STREAM 0)) in
List.iter Unix.close l;
l
in
Expand Down Expand Up @@ -42,7 +42,7 @@ module T = struct
Poll.set_index poll 0 r Poll.Flags.pollin;
let b = Bytes.create 1 in
check_int "write" (Unix.write w b 0 1) 1;
let nready = Poll.poll poll 1 Nowait in
let nready = Poll.ppoll_or_poll_or_kqueue poll 1 Nowait in
check_int "nready" nready 1;
let fd = Poll.get_fd poll 0 in
let revents = Poll.get_revents poll 0 in
Expand All @@ -63,9 +63,9 @@ module T = struct

let ppoll_or_poll () =
let poll = Poll.create () in
let n = Poll.ppoll_or_poll poll 0 Nowait in
let n = Poll.ppoll_or_poll_or_kqueue poll 0 Nowait in
check_int "n is zero" n 0;
let n = Poll.ppoll_or_poll poll 0 (Nanoseconds U.hundred_ms_in_ns) in
let n = Poll.ppoll_or_poll_or_kqueue poll 0 (Nanoseconds U.hundred_ms_in_ns) in
check_int "n is zero" n 0

let example () =
Expand All @@ -76,13 +76,13 @@ module T = struct
Poll.set_index poll 7 pipe_w Poll.Flags.pollout;
(* Wait why 8 ? we tell the kernel the number of file descriptors to scan,
unset filedescriptors are skipped, so indexes 1-6 are ignored *)
let nready = Poll.poll poll 8 Nowait in
let nready = Poll.ppoll_or_poll_or_kqueue poll 8 Nowait in
check_int "nread 1" 1 nready; (* only one entry should be ready, since we added only one *)
let n = Unix.write pipe_w (Bytes.create 1) 0 1 in
check_int "n" 1 n;
(* We'll now poll for both events, note that we don't need to re-add index 7 *)
Poll.set_index poll 0 pipe_r Poll.Flags.pollin;
let nready = Poll.poll poll 8 Nowait in
let nready = Poll.ppoll_or_poll_or_kqueue poll 8 Nowait in
check_int "nready" 2 nready;
Poll.iter_ready poll nready (fun index fd flags ->
if Poll.Flags.mem flags Poll.Flags.pollin then
Expand Down