Skip to content

Commit

Permalink
Add zmq-eio
Browse files Browse the repository at this point in the history
  • Loading branch information
andersfugmann committed Jul 4, 2023
1 parent 82a2707 commit 47b2fe3
Show file tree
Hide file tree
Showing 7 changed files with 361 additions and 0 deletions.
12 changes: 12 additions & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,15 @@ Async aware bindings to zmq are available though package zmq-async")
(zmq (= :version))
(lwt (>= 2.6.0))
(ounit2 :with-test)))

(package
(name zmq-eio)
(authors "Anders Fugmann")
(synopsis "Eio aware bindings to ZMQ")
(depends
(ocaml (>= 4.04.1))
(zmq (= :version))
(eio (>= 0.10))
(eio_unix (>= 0.9))
(base (>= v0.11.0))
(ounit2 :with-test)))
33 changes: 33 additions & 0 deletions zmq-eio.opam
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
synopsis: "Eio aware bindings to ZMQ"
maintainer: ["Anders Fugmann <anders@fugmann.net>"]
authors: ["Anders Fugmann"]
license: "MIT"
homepage: "https://github.com/issuu/ocaml-zmq"
bug-reports: "https://github.com/issuu/ocaml-zmq/issues"
depends: [
"dune" {>= "2.7"}
"ocaml" {>= "4.04.1"}
"zmq" {= version}
"eio" {>= "0.10"}
"eio_unix" {>= "0.9"}
"base" {>= "v0.11.0"}
"ounit2" {with-test}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo: "git+https://github.com/issuu/ocaml-zmq.git"
4 changes: 4 additions & 0 deletions zmq-eio/src/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(library
(name zmq_eio)
(public_name zmq-eio)
(libraries zmq.deferred eio eio_main base))
159 changes: 159 additions & 0 deletions zmq-eio/src/socket.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
(** Eio based bindings for eio *)
exception Closed

type 'a t = {
socket : 'a Zmq.Socket.t;
fd : Unix.file_descr;
senders : (unit -> unit) Queue.t;
receivers : (unit -> unit) Queue.t;
condition : Eio.Condition.t;
mutex : Eio.Mutex.t;
ready_condition: Eio.Condition.t;
mutable thread : unit Eio.Promise.or_exn option; (* None indicates already closed *)
}

type 'a of_socket_args = sw:Eio.Switch.t -> 'a
type 'a deferred = 'a

(** invoke the first function on the queue, but only pop it if it does not raise EAGAIN *)
let process queue =
match (Queue.peek queue) () with
| () ->
let (_: unit -> unit) = Queue.pop queue in
()
| exception Unix.Unix_error (Unix.EAGAIN, _, _) ->
(* If f raised EAGAIN, dont pop the message. *)
(* This should never happen. If so, the queue could be replaced with a Eio.Stream for faster handling *)
()

let with_lock lock f =
Eio.Mutex.lock lock;
try
let v = f () in
Eio.Mutex.unlock lock;
v
with
| e ->
Eio.Mutex.unlock lock;
raise e

let rec fd_monitor t =
Eio.Condition.await_no_mutex t.ready_condition;
Eio_unix.await_readable t.fd;
with_lock t.mutex (fun () -> Eio.Condition.broadcast t.condition);
fd_monitor t

let rec event_loop t =
let inner () =
match Zmq.Socket.events t.socket with
| Zmq.Socket.Poll_error ->
failwith "Cannot poll socket"
| (Poll_in_out | Poll_in) when not (Queue.is_empty t.receivers) ->
process t.receivers
| (Poll_in_out | Poll_out) when not (Queue.is_empty t.senders) ->
process t.senders
| _ ->
Eio.Condition.broadcast t.ready_condition;
Eio.Condition.await t.condition t.mutex;
in
with_lock t.mutex (fun () -> inner ());
match t.thread with
| None when Queue.is_empty t.senders && Queue.is_empty t.receivers ->
()
| _ ->
event_loop t

let of_socket: ('a Zmq.Socket.t -> 'a t) of_socket_args = fun ~sw socket ->
let fd = Zmq.Socket.get_fd socket in
let t =
{ socket;
fd;
senders = Queue.create ();
receivers = Queue.create ();
mutex = Eio.Mutex.create ();
condition = Eio.Condition.create ();
ready_condition = Eio.Condition.create ();
thread = None;
}
in
let thread = Eio.Fiber.fork_promise ~sw (fun () ->
Eio.Switch.run (fun sw ->
Eio.Fiber.fork ~sw (fun () -> event_loop t);
Eio.Fiber.fork_daemon ~sw (fun () -> fd_monitor t);
()
);
)
in
t.thread <- Some thread;
t

let to_socket t =
t.socket

(** Stop the deamon thread, and ensure that all sends and receives has been handled *)
let close t =
let thread = match t.thread with
| None -> failwith "Socket already closed"
| Some t -> t
in
with_lock t.mutex (fun () -> t.thread <- None; Eio.Condition.broadcast t.condition);
let _e = Eio.Promise.await_exn thread in
Zmq.Socket.close t.socket;
()


let request t queue f =
let () =
match t.thread with
| None -> raise Closed
| Some _ -> ()
in
let (pt, pu) = Eio.Promise.create ~label:"Zmq" () in
let f () =
let v = f () in
Eio.Promise.resolve pu v
in
with_lock t.mutex (fun () -> Queue.push f queue; Eio.Condition.broadcast t.condition);
Eio.Promise.await pt

let send t message =
request t t.senders (fun () -> Zmq.Socket.send ~block:false t.socket message)

let send_msg t message =
request t t.senders (fun () -> Zmq.Socket.send_msg ~block:false t.socket message)

let send_all t =
request t t.receivers (fun () -> Zmq.Socket.send_all ~block:false t.socket)

let send_msg_all t =
request t t.receivers (fun () -> Zmq.Socket.send_msg_all ~block:false t.socket)

let recv t =
request t t.receivers (fun () -> Zmq.Socket.recv ~block:false t.socket)

let recv_msg t =
request t t.receivers (fun () -> Zmq.Socket.recv_msg ~block:false t.socket)

let recv_all t =
request t t.receivers (fun () -> Zmq.Socket.recv_all ~block:false t.socket)

let recv_msg_all t =
request t t.receivers (fun () -> Zmq.Socket.recv_msg_all ~block:false t.socket)

module Router = struct
type id_t = string

let id_of_string t = t

let recv t =
match recv_all t with
| id :: message -> (id, message)
| _ -> assert false

let send t id message =
send_all t (id :: message)
end

module Monitor = struct
let recv t = request t t.receivers (fun () -> Zmq.Monitor.recv ~block:false t.socket)
end
1 change: 1 addition & 0 deletions zmq-eio/src/socket.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include Zmq_deferred.Socket.Socket with type 'a deferred = 'a and type 'a of_socket_args = sw:Eio.Switch.t -> 'a
11 changes: 11 additions & 0 deletions zmq-eio/test/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
(executable
(name test)
(libraries zmq-eio ounit2 eio eio_main))

(rule
(alias runtest)
(deps
(:test test.exe))
(action
(run %{test}))
(package zmq-eio))
141 changes: 141 additions & 0 deletions zmq-eio/test/test.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
open OUnit

(** Sleep is horrible slow, or causing strange slowdowns *)
let sleepf env secs = Eio.Time.sleep (Eio.Stdenv.clock env) secs

let setup ~sw env =
let make ctx tpe =
let s = Zmq.Socket.create ctx tpe in
Zmq.Socket.set_receive_high_water_mark s 1;
Zmq.Socket.set_send_high_water_mark s 2;
s
in
let ctx = Zmq.Context.create () in
let s1 = make ctx Zmq.Socket.pair in
let s2 = make ctx Zmq.Socket.pair in
let endpoint = "inproc://test" in
Zmq.Socket.bind s1 endpoint;
Zmq.Socket.connect s2 endpoint;
(* Sleep a bit *)
sleepf env 0.0001;
(ctx, Zmq_eio.Socket.of_socket ~sw s1, Zmq_eio.Socket.of_socket ~sw s2)

let teardown ~sw:_ _env (ctx, s1, s2) =
Zmq_eio.Socket.close s2;
Zmq_eio.Socket.close s1;
Zmq.Context.terminate ctx;
()

let all_ok l =
Eio.Fiber.List.iter (fun f -> f ()) l

let send env ?(delay = 0.0) s count =
let rec inner = function
| 0 -> ()
| n ->
Zmq_eio.Socket.send s "test";
sleepf env delay;
inner (n - 1)
in
fun () -> inner count

let recv env ?(delay = 0.0) s count =
let rec inner = function
| 0 -> ()
| n ->
let _ = Zmq_eio.Socket.recv s in
sleepf env delay;
inner (n - 1)
in
fun () -> inner count

(** Test functions *)
let test_setup_teardown ~sw:_ _env _s = ()

let count = 1000
(* Tests *)
let test_send_receive ~sw:_ env (_, s1, s2) =
all_ok [
send env s2 count;
recv env s1 count;
]

let test_msend_mreceive ~sw:_ env (_, s1, s2) =
all_ok [
send env s2 count; send env s2 count; send env s2 count; send env s2 count;
recv env s1 count; recv env s1 count; recv env s1 count; recv env s1 count;
]

let test_mix ~sw:_ env (_, s1, s2) =
all_ok [
send env s2 count; recv env s1 count;
send env s1 count; recv env s2 count;
send env s2 count; recv env s1 count;
send env s1 count; recv env s2 count;
send env s2 count; recv env s1 count;
]

let test_slow_send ~sw:_ env (_, s1, s2) =
all_ok [
recv env ~delay:0.0001 s2 count;
send env s1 (count / 5);
send env s1 (count / 5);
send env s1 (count / 5);
send env s1 (count / 5);
send env s1 (count / 5);
]

let test_slow_receive ~sw:_ env (_, s1, s2) =
all_ok [
send env ~delay:0.0001 s2 count;
recv env s1 (count / 5);
recv env s1 (count / 5);
recv env s1 (count / 5);
recv env s1 (count / 5);
recv env s1 (count / 5);
]

let test_slow_mix1 ~sw:_ env (_, s1, s2) =
all_ok [
send env ~delay:0.0001 s2 count; recv env ~delay:0.0002 s1 count;
send env ~delay:0.0001 s1 count; recv env ~delay:0.0002 s2 count;
send env ~delay:0.0001 s2 count; recv env ~delay:0.0002 s1 count;
send env ~delay:0.0001 s1 count; recv env ~delay:0.0002 s2 count;
]

let test_slow_mix2 ~sw:_ env (_, s1, s2) =
all_ok [
send env ~delay:0.0002 s2 count; recv env ~delay:0.0001 s1 count;
send env ~delay:0.0002 s1 count; recv env ~delay:0.0001 s2 count;
send env ~delay:0.0002 s2 count; recv env ~delay:0.0001 s1 count;
send env ~delay:0.0002 s1 count; recv env ~delay:0.0001 s2 count;
]


let suite () =
let bracket test =
let f sw env =
let s = setup ~sw env in
match test ~sw env s with
| v -> teardown ~sw env s; v
| exception e -> teardown ~sw env s; raise e
in
fun () -> Eio_linux.run (fun env ->
Eio.Switch.run (fun sw -> f sw env))
in

__MODULE__ >::: [
"test_setup_teardown" >:: bracket test_setup_teardown;
"test_send_receive" >:: bracket test_send_receive;
"test_msend_mreceive" >:: bracket test_msend_mreceive;
"test_mix" >:: bracket test_mix;
"test_slow_send" >:: bracket test_slow_send;
"test_slow_receive" >:: bracket test_slow_receive;
"test_slow_mix" >:: bracket test_slow_mix1;
"test_slow_mix" >:: bracket test_slow_mix2;
"test_send_receive" >:: bracket test_send_receive;
]


let () =
run_test_tt_main (suite ()) |> ignore

0 comments on commit 47b2fe3

Please sign in to comment.