diff --git a/dune-project b/dune-project index dbdcbce..d6d06b5 100644 --- a/dune-project +++ b/dune-project @@ -43,3 +43,15 @@ Async aware bindings to zmq are available though package zmq-async") (zmq (= :version)) (lwt (>= 2.6.0)) (ounit2 :with-test))) + + (package + (name zmq-eio) + (authors "Anders Fugmann") + (synopsis "Eio aware bindings to ZMQ") + (depends + (ocaml (>= 4.04.1)) + (zmq (= :version)) + (eio (>= 0.10)) + (eio_unix (>= 0.9)) + (base (>= v0.11.0)) + (ounit2 :with-test))) diff --git a/zmq-eio.opam b/zmq-eio.opam new file mode 100644 index 0000000..d2b63a0 --- /dev/null +++ b/zmq-eio.opam @@ -0,0 +1,33 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Eio aware bindings to ZMQ" +maintainer: ["Anders Fugmann "] +authors: ["Anders Fugmann"] +license: "MIT" +homepage: "https://github.com/issuu/ocaml-zmq" +bug-reports: "https://github.com/issuu/ocaml-zmq/issues" +depends: [ + "dune" {>= "2.7"} + "ocaml" {>= "4.04.1"} + "zmq" {= version} + "eio" {>= "0.10"} + "eio_unix" {>= "0.9"} + "base" {>= "v0.11.0"} + "ounit2" {with-test} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/issuu/ocaml-zmq.git" diff --git a/zmq-eio/src/dune b/zmq-eio/src/dune new file mode 100644 index 0000000..b254135 --- /dev/null +++ b/zmq-eio/src/dune @@ -0,0 +1,4 @@ +(library + (name zmq_eio) + (public_name zmq-eio) + (libraries zmq.deferred eio eio_main base)) diff --git a/zmq-eio/src/socket.ml b/zmq-eio/src/socket.ml new file mode 100644 index 0000000..4445e6c --- /dev/null +++ b/zmq-eio/src/socket.ml @@ -0,0 +1,159 @@ +(** Eio based bindings for eio *) +exception Closed + +type 'a t = { + socket : 'a Zmq.Socket.t; + fd : Unix.file_descr; + senders : (unit -> unit) Queue.t; + receivers : (unit -> unit) Queue.t; + condition : Eio.Condition.t; + mutex : Eio.Mutex.t; + ready_condition: Eio.Condition.t; + mutable thread : unit Eio.Promise.or_exn option; (* None indicates already closed *) +} + +type 'a of_socket_args = sw:Eio.Switch.t -> 'a +type 'a deferred = 'a + +(** invoke the first function on the queue, but only pop it if it does not raise EAGAIN *) +let process queue = + match (Queue.peek queue) () with + | () -> + let (_: unit -> unit) = Queue.pop queue in + () + | exception Unix.Unix_error (Unix.EAGAIN, _, _) -> + (* If f raised EAGAIN, dont pop the message. *) + (* This should never happen. If so, the queue could be replaced with a Eio.Stream for faster handling *) + () + +let with_lock lock f = + Eio.Mutex.lock lock; + try + let v = f () in + Eio.Mutex.unlock lock; + v + with + | e -> + Eio.Mutex.unlock lock; + raise e + +let rec fd_monitor t = + Eio.Condition.await_no_mutex t.ready_condition; + Eio_unix.await_readable t.fd; + with_lock t.mutex (fun () -> Eio.Condition.broadcast t.condition); + fd_monitor t + +let rec event_loop t = + let inner () = + match Zmq.Socket.events t.socket with + | Zmq.Socket.Poll_error -> + failwith "Cannot poll socket" + | (Poll_in_out | Poll_in) when not (Queue.is_empty t.receivers) -> + process t.receivers + | (Poll_in_out | Poll_out) when not (Queue.is_empty t.senders) -> + process t.senders + | _ -> + Eio.Condition.broadcast t.ready_condition; + Eio.Condition.await t.condition t.mutex; + in + with_lock t.mutex (fun () -> inner ()); + match t.thread with + | None when Queue.is_empty t.senders && Queue.is_empty t.receivers -> + () + | _ -> + event_loop t + +let of_socket: ('a Zmq.Socket.t -> 'a t) of_socket_args = fun ~sw socket -> + let fd = Zmq.Socket.get_fd socket in + let t = + { socket; + fd; + senders = Queue.create (); + receivers = Queue.create (); + mutex = Eio.Mutex.create (); + condition = Eio.Condition.create (); + ready_condition = Eio.Condition.create (); + thread = None; + } + in + let thread = Eio.Fiber.fork_promise ~sw (fun () -> + Eio.Switch.run (fun sw -> + Eio.Fiber.fork ~sw (fun () -> event_loop t); + Eio.Fiber.fork_daemon ~sw (fun () -> fd_monitor t); + () + ); + ) + in + t.thread <- Some thread; + t + +let to_socket t = + t.socket + +(** Stop the deamon thread, and ensure that all sends and receives has been handled *) +let close t = + let thread = match t.thread with + | None -> failwith "Socket already closed" + | Some t -> t + in + with_lock t.mutex (fun () -> t.thread <- None; Eio.Condition.broadcast t.condition); + let _e = Eio.Promise.await_exn thread in + Zmq.Socket.close t.socket; + () + + +let request t queue f = + let () = + match t.thread with + | None -> raise Closed + | Some _ -> () + in + let (pt, pu) = Eio.Promise.create ~label:"Zmq" () in + let f () = + let v = f () in + Eio.Promise.resolve pu v + in + with_lock t.mutex (fun () -> Queue.push f queue; Eio.Condition.broadcast t.condition); + Eio.Promise.await pt + +let send t message = + request t t.senders (fun () -> Zmq.Socket.send ~block:false t.socket message) + +let send_msg t message = + request t t.senders (fun () -> Zmq.Socket.send_msg ~block:false t.socket message) + +let send_all t = + request t t.receivers (fun () -> Zmq.Socket.send_all ~block:false t.socket) + +let send_msg_all t = + request t t.receivers (fun () -> Zmq.Socket.send_msg_all ~block:false t.socket) + +let recv t = + request t t.receivers (fun () -> Zmq.Socket.recv ~block:false t.socket) + +let recv_msg t = + request t t.receivers (fun () -> Zmq.Socket.recv_msg ~block:false t.socket) + +let recv_all t = + request t t.receivers (fun () -> Zmq.Socket.recv_all ~block:false t.socket) + +let recv_msg_all t = + request t t.receivers (fun () -> Zmq.Socket.recv_msg_all ~block:false t.socket) + +module Router = struct + type id_t = string + + let id_of_string t = t + + let recv t = + match recv_all t with + | id :: message -> (id, message) + | _ -> assert false + + let send t id message = + send_all t (id :: message) +end + +module Monitor = struct + let recv t = request t t.receivers (fun () -> Zmq.Monitor.recv ~block:false t.socket) +end diff --git a/zmq-eio/src/socket.mli b/zmq-eio/src/socket.mli new file mode 100644 index 0000000..6fed4e0 --- /dev/null +++ b/zmq-eio/src/socket.mli @@ -0,0 +1 @@ +include Zmq_deferred.Socket.Socket with type 'a deferred = 'a and type 'a of_socket_args = sw:Eio.Switch.t -> 'a diff --git a/zmq-eio/test/dune b/zmq-eio/test/dune new file mode 100644 index 0000000..9f0b88d --- /dev/null +++ b/zmq-eio/test/dune @@ -0,0 +1,11 @@ +(executable + (name test) + (libraries zmq-eio ounit2 eio eio_main)) + +(rule + (alias runtest) + (deps + (:test test.exe)) + (action + (run %{test})) + (package zmq-eio)) diff --git a/zmq-eio/test/test.ml b/zmq-eio/test/test.ml new file mode 100644 index 0000000..8845fea --- /dev/null +++ b/zmq-eio/test/test.ml @@ -0,0 +1,141 @@ +open OUnit + +(** Sleep is horrible slow, or causing strange slowdowns *) +let sleepf env secs = Eio.Time.sleep (Eio.Stdenv.clock env) secs + +let setup ~sw env = + let make ctx tpe = + let s = Zmq.Socket.create ctx tpe in + Zmq.Socket.set_receive_high_water_mark s 1; + Zmq.Socket.set_send_high_water_mark s 2; + s + in + let ctx = Zmq.Context.create () in + let s1 = make ctx Zmq.Socket.pair in + let s2 = make ctx Zmq.Socket.pair in + let endpoint = "inproc://test" in + Zmq.Socket.bind s1 endpoint; + Zmq.Socket.connect s2 endpoint; + (* Sleep a bit *) + sleepf env 0.0001; + (ctx, Zmq_eio.Socket.of_socket ~sw s1, Zmq_eio.Socket.of_socket ~sw s2) + +let teardown ~sw:_ _env (ctx, s1, s2) = + Zmq_eio.Socket.close s2; + Zmq_eio.Socket.close s1; + Zmq.Context.terminate ctx; + () + +let all_ok l = + Eio.Fiber.List.iter (fun f -> f ()) l + +let send env ?(delay = 0.0) s count = + let rec inner = function + | 0 -> () + | n -> + Zmq_eio.Socket.send s "test"; + sleepf env delay; + inner (n - 1) + in + fun () -> inner count + +let recv env ?(delay = 0.0) s count = + let rec inner = function + | 0 -> () + | n -> + let _ = Zmq_eio.Socket.recv s in + sleepf env delay; + inner (n - 1) + in + fun () -> inner count + +(** Test functions *) +let test_setup_teardown ~sw:_ _env _s = () + +let count = 1000 +(* Tests *) +let test_send_receive ~sw:_ env (_, s1, s2) = + all_ok [ + send env s2 count; + recv env s1 count; + ] + +let test_msend_mreceive ~sw:_ env (_, s1, s2) = + all_ok [ + send env s2 count; send env s2 count; send env s2 count; send env s2 count; + recv env s1 count; recv env s1 count; recv env s1 count; recv env s1 count; + ] + +let test_mix ~sw:_ env (_, s1, s2) = + all_ok [ + send env s2 count; recv env s1 count; + send env s1 count; recv env s2 count; + send env s2 count; recv env s1 count; + send env s1 count; recv env s2 count; + send env s2 count; recv env s1 count; + ] + +let test_slow_send ~sw:_ env (_, s1, s2) = + all_ok [ + recv env ~delay:0.0001 s2 count; + send env s1 (count / 5); + send env s1 (count / 5); + send env s1 (count / 5); + send env s1 (count / 5); + send env s1 (count / 5); + ] + +let test_slow_receive ~sw:_ env (_, s1, s2) = + all_ok [ + send env ~delay:0.0001 s2 count; + recv env s1 (count / 5); + recv env s1 (count / 5); + recv env s1 (count / 5); + recv env s1 (count / 5); + recv env s1 (count / 5); + ] + +let test_slow_mix1 ~sw:_ env (_, s1, s2) = + all_ok [ + send env ~delay:0.0001 s2 count; recv env ~delay:0.0002 s1 count; + send env ~delay:0.0001 s1 count; recv env ~delay:0.0002 s2 count; + send env ~delay:0.0001 s2 count; recv env ~delay:0.0002 s1 count; + send env ~delay:0.0001 s1 count; recv env ~delay:0.0002 s2 count; + ] + +let test_slow_mix2 ~sw:_ env (_, s1, s2) = + all_ok [ + send env ~delay:0.0002 s2 count; recv env ~delay:0.0001 s1 count; + send env ~delay:0.0002 s1 count; recv env ~delay:0.0001 s2 count; + send env ~delay:0.0002 s2 count; recv env ~delay:0.0001 s1 count; + send env ~delay:0.0002 s1 count; recv env ~delay:0.0001 s2 count; + ] + + +let suite () = + let bracket test = + let f sw env = + let s = setup ~sw env in + match test ~sw env s with + | v -> teardown ~sw env s; v + | exception e -> teardown ~sw env s; raise e + in + fun () -> Eio_linux.run (fun env -> + Eio.Switch.run (fun sw -> f sw env)) + in + + __MODULE__ >::: [ + "test_setup_teardown" >:: bracket test_setup_teardown; + "test_send_receive" >:: bracket test_send_receive; + "test_msend_mreceive" >:: bracket test_msend_mreceive; + "test_mix" >:: bracket test_mix; + "test_slow_send" >:: bracket test_slow_send; + "test_slow_receive" >:: bracket test_slow_receive; + "test_slow_mix" >:: bracket test_slow_mix1; + "test_slow_mix" >:: bracket test_slow_mix2; + "test_send_receive" >:: bracket test_send_receive; + ] + + +let () = + run_test_tt_main (suite ()) |> ignore