Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions bench/bench_promise.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ and response = {
next_request : request Promise.u;
}

(* Simulate other work in the domain, and also prevent it from going to sleep.
Otherwise, we're just measuring how long it takes the OS to wake a sleeping thread. *)
let rec spin () =
Fiber.yield ();
spin ()

(* A client and server exchange these payload values.
Each contains the current message and a resolver which the other party can use to reply. *)

Expand Down Expand Up @@ -49,7 +55,11 @@ let bench_resolved ~clock ~n_iters =
Printf.printf "Reading a resolved promise: %.3f ns\n%!" (1e9 *. (t1 -. t0) /. float n_iters);
assert (!t = n_iters)

let run_bench ~domain_mgr ~clock ~use_domains ~n_iters =
let maybe_spin v fn =
if v then Fiber.first spin fn
else fn ()

let run_bench ~domain_mgr ~spin ~clock ~use_domains ~n_iters =
let init_p, init_r = Promise.create () in
Gc.full_major ();
let _minor0, prom0, _major0 = Gc.counters () in
Expand All @@ -58,28 +68,30 @@ let run_bench ~domain_mgr ~clock ~use_domains ~n_iters =
(fun () ->
if use_domains then (
Eio.Domain_manager.run domain_mgr @@ fun () ->
run_server ~n_iters ~i:0 init_r
maybe_spin spin (fun () -> run_server ~n_iters ~i:0 init_r)
) else (
run_server ~n_iters ~i:0 init_r
maybe_spin spin (fun () -> run_server ~n_iters ~i:0 init_r)
)
)
(fun () ->
run_client ~n_iters ~i:0 init_p
maybe_spin spin (fun () -> run_client ~n_iters ~i:0 init_p)
);
let t1 = Eio.Time.now clock in
let time_total = t1 -. t0 in
let time_per_iter = time_total /. float n_iters in
let _minor1, prom1, _major1 = Gc.counters () in
let prom = prom1 -. prom0 in
Printf.printf "%11b, %8d, %8.2f, %13.4f\n%!" use_domains n_iters (1e9 *. time_per_iter) (prom /. float n_iters)
let domains = Printf.sprintf "%b/%b" use_domains spin in
Printf.printf "%11s, %8d, %8.2f, %13.4f\n%!" domains n_iters (1e9 *. time_per_iter) (prom /. float n_iters)

let main ~domain_mgr ~clock =
bench_resolved ~clock ~n_iters:(10_000_000);
Printf.printf "use_domains, n_iters, ns/iter, promoted/iter\n%!";
[false, 1_000_000;
true, 100_000]
|> List.iter (fun (use_domains, n_iters) ->
run_bench ~domain_mgr ~clock ~use_domains ~n_iters
Printf.printf "domains/spin, n_iters, ns/iter, promoted/iter\n%!";
[false, false, 1_000_000;
true, true, 100_000;
true, false, 100_000]
|> List.iter (fun (use_domains, spin, n_iters) ->
run_bench ~domain_mgr ~spin ~clock ~use_domains ~n_iters
)

let () =
Expand Down
101 changes: 26 additions & 75 deletions lib_eio/core/promise.ml
Original file line number Diff line number Diff line change
@@ -1,41 +1,10 @@
(* Note on thread-safety

Promises can be shared between domains, so everything here must be thread-safe.

Wrapping everything in a mutex would be one way to do that, but that makes reads
slow, and only one domain would be able to read at a time.

Instead, we use an Atomic to hold the state, plus an additional mutex for the waiters
while in the Unresolved state. This makes resolved promises faster (at the cost of
making operations on unresolved promises a bit slower). It also makes reasoning about
the code more fun.

We can think of atomics and mutexes as "boxes", containing values and
invariants. To use them, you open the box to get access to the contents,
then close the box afterwards, restoring the invariant. For mutexes,
open/close is lock/unlock. For atomics, every operation implicitly opens and
closes the box. Any number of callers can share a reference to the box
itself; the runtime ensures a box can only be opened by one user at a time.

We can hold a full reference to something (meaning no-one else has access to it
and we can mutate it), or a fraction (giving us read-only access but also
ensuring that no-one else can mutate it either). *)

type 'a state =
| Resolved of 'a
| Unresolved of 'a Waiters.t * Mutex.t
(* The Unresolved state's mutex box contains:
- Full access to the Waiters.
- Half access to the promise's state.
- The invariant that if the promise is resolved then the waiters list is empty. *)
| Unresolved of Broadcast.t

type !'a promise = {
id : Ctf.id;

state : 'a state Atomic.t;
(* This atomic box contains either:
- A non-zero share of the reference to the Resolved state.
- A half-share of the reference to the Unresolved state. *)
state : 'a state Atomic.t; (* Note: we always switch to Resolved before broadcasting *)
}

type +!'a t
Expand All @@ -51,7 +20,7 @@ let of_public_resolver : 'a u -> 'a promise = Obj.magic
let create_with_id id =
let t = {
id;
state = Atomic.make (Unresolved (Waiters.create (), Mutex.create ()));
state = Atomic.make (Unresolved (Broadcast.create ()));
} in
to_public_promise t, to_public_resolver t

Expand All @@ -68,33 +37,33 @@ let create_resolved x =
let await t =
let t = of_public_promise t in
match Atomic.get t.state with
(* If the atomic is resolved, we take a share of that reference and return
the remainder to the atomic (which will still be non-zero). We can then
continue to know that the promise is resolved after the [Atomic.get]. *)
| Resolved x ->
Ctf.note_read t.id;
x
| Unresolved (q, mutex) ->
(* We discovered that the promise was unresolved, but we can't be sure it still is,
since we had to return the half-share reference to the atomic. So the [get] is
just to get access to the mutex. *)
Ctf.note_try_read t.id;
Mutex.lock mutex;
(* Having opened the mutex, we have:
- Access to the waiters.
- Half access to the promise's state (so we know it can't change until we close the mutex).
- The mutex invariant. *)
| Unresolved b ->
Suspend.enter (fun ctx enqueue ->
match Broadcast.suspend b (fun () -> enqueue (Ok ())) with
| None -> () (* We got resumed immediately *)
| Some request ->
match Atomic.get t.state with
| Resolved _ ->
(* The promise was resolved as we were suspending.
Resume now if we haven't already done so. *)
if Broadcast.cancel request then enqueue (Ok ())
| Unresolved _ ->
(* We observed the promise to be still unresolved after registering a waiter.
Therefore any resolution must happen after we were registered and we will be notified. *)
Ctf.note_try_read t.id;
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
if Broadcast.cancel request then enqueue (Error ex)
(* else already resumed *)
)
);
match Atomic.get t.state with
| Unresolved _ ->
(* The promise is unresolved, and can't change while we hold the mutex.
It's therefore safe to add a new waiter (and let [Waiters.await] close the mutex). *)
Waiters.await ~mutex:(Some mutex) q t.id
(* Otherwise, the promise was resolved by the time we took the lock.
Release the lock (which is fine, as we didn't change anything). *)
| Resolved x ->
Mutex.unlock mutex;
Ctf.note_read t.id;
x
| Unresolved _ -> assert false

let await_exn t =
match await t with
Expand All @@ -105,30 +74,12 @@ let resolve t v =
let rec resolve' t v =
match Atomic.get t.state with
| Resolved _ -> invalid_arg "Can't resolve already-resolved promise"
| Unresolved (q, mutex) as prev ->
(* The above [get] just gets us access to the mutex;
By the time we get here, the promise may have become resolved. *)
Mutex.lock mutex;
(* Having opened the mutex, we have:
- Access to the waiters.
- Half access to the promise's state (so we know it can't change until we close the mutex).
- The mutex invariant.
Now we open the atomic again, getting the other half access. Together,
this gives us full access to the state (i.e. no-one else can be using
it), allowing us to change it.
Note: we don't actually need an atomic CAS here, just a get and a set
would do, but this seems simplest. *)
| Unresolved b as prev ->
if Atomic.compare_and_set t.state prev (Resolved v) then (
(* The atomic now has half-access to the fullfilled state (which counts
as non-zero), and we have the other half. Now we need to restore the
mutex invariant by clearing the wakers. *)
Ctf.note_resolved t.id ~ex:None;
Waiters.wake_all q v;
Mutex.unlock mutex
Broadcast.resume_all b
) else (
(* Otherwise, the promise was already resolved when we opened the mutex.
Close it without any changes and retry. *)
Mutex.unlock mutex;
(* Otherwise, the promise was already resolved. Retry (to get the error). *)
resolve' t v
)
in
Expand Down