Skip to content

Commit 499d03c

Browse files
committed
Make Eio.Promise lock-free
This is slightly faster and simplifies the code.
1 parent e63eff8 commit 499d03c

File tree

1 file changed

+26
-75
lines changed

1 file changed

+26
-75
lines changed

lib_eio/core/promise.ml

Lines changed: 26 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,10 @@
1-
(* Note on thread-safety
2-
3-
Promises can be shared between domains, so everything here must be thread-safe.
4-
5-
Wrapping everything in a mutex would be one way to do that, but that makes reads
6-
slow, and only one domain would be able to read at a time.
7-
8-
Instead, we use an Atomic to hold the state, plus an additional mutex for the waiters
9-
while in the Unresolved state. This makes resolved promises faster (at the cost of
10-
making operations on unresolved promises a bit slower). It also makes reasoning about
11-
the code more fun.
12-
13-
We can think of atomics and mutexes as "boxes", containing values and
14-
invariants. To use them, you open the box to get access to the contents,
15-
then close the box afterwards, restoring the invariant. For mutexes,
16-
open/close is lock/unlock. For atomics, every operation implicitly opens and
17-
closes the box. Any number of callers can share a reference to the box
18-
itself; the runtime ensures a box can only be opened by one user at a time.
19-
20-
We can hold a full reference to something (meaning no-one else has access to it
21-
and we can mutate it), or a fraction (giving us read-only access but also
22-
ensuring that no-one else can mutate it either). *)
23-
241
type 'a state =
252
| Resolved of 'a
26-
| Unresolved of 'a Waiters.t * Mutex.t
27-
(* The Unresolved state's mutex box contains:
28-
- Full access to the Waiters.
29-
- Half access to the promise's state.
30-
- The invariant that if the promise is resolved then the waiters list is empty. *)
3+
| Unresolved of Broadcast.t
314

325
type !'a promise = {
336
id : Ctf.id;
34-
35-
state : 'a state Atomic.t;
36-
(* This atomic box contains either:
37-
- A non-zero share of the reference to the Resolved state.
38-
- A half-share of the reference to the Unresolved state. *)
7+
state : 'a state Atomic.t; (* Note: we always switch to Resolved before broadcasting *)
398
}
409

4110
type +!'a t
@@ -51,7 +20,7 @@ let of_public_resolver : 'a u -> 'a promise = Obj.magic
5120
let create_with_id id =
5221
let t = {
5322
id;
54-
state = Atomic.make (Unresolved (Waiters.create (), Mutex.create ()));
23+
state = Atomic.make (Unresolved (Broadcast.create ()));
5524
} in
5625
to_public_promise t, to_public_resolver t
5726

@@ -68,33 +37,33 @@ let create_resolved x =
6837
let await t =
6938
let t = of_public_promise t in
7039
match Atomic.get t.state with
71-
(* If the atomic is resolved, we take a share of that reference and return
72-
the remainder to the atomic (which will still be non-zero). We can then
73-
continue to know that the promise is resolved after the [Atomic.get]. *)
7440
| Resolved x ->
7541
Ctf.note_read t.id;
7642
x
77-
| Unresolved (q, mutex) ->
78-
(* We discovered that the promise was unresolved, but we can't be sure it still is,
79-
since we had to return the half-share reference to the atomic. So the [get] is
80-
just to get access to the mutex. *)
81-
Ctf.note_try_read t.id;
82-
Mutex.lock mutex;
83-
(* Having opened the mutex, we have:
84-
- Access to the waiters.
85-
- Half access to the promise's state (so we know it can't change until we close the mutex).
86-
- The mutex invariant. *)
43+
| Unresolved b ->
44+
Suspend.enter (fun ctx enqueue ->
45+
match Broadcast.suspend b (fun () -> enqueue (Ok ())) with
46+
| None -> () (* We got resumed immediately *)
47+
| Some request ->
48+
match Atomic.get t.state with
49+
| Resolved _ ->
50+
(* The promise was resolved as we were suspending.
51+
Resume now if we haven't already done so. *)
52+
if Broadcast.cancel request then enqueue (Ok ())
53+
| Unresolved _ ->
54+
(* We observed the promise to be still unresolved after registering a waiter.
55+
Therefore any resolution must happen after we were registered and we will be notified. *)
56+
Ctf.note_try_read t.id;
57+
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
58+
if Broadcast.cancel request then enqueue (Error ex)
59+
(* else already resumed *)
60+
)
61+
);
8762
match Atomic.get t.state with
88-
| Unresolved _ ->
89-
(* The promise is unresolved, and can't change while we hold the mutex.
90-
It's therefore safe to add a new waiter (and let [Waiters.await] close the mutex). *)
91-
Waiters.await ~mutex:(Some mutex) q t.id
92-
(* Otherwise, the promise was resolved by the time we took the lock.
93-
Release the lock (which is fine, as we didn't change anything). *)
9463
| Resolved x ->
95-
Mutex.unlock mutex;
9664
Ctf.note_read t.id;
9765
x
66+
| Unresolved _ -> assert false
9867

9968
let await_exn t =
10069
match await t with
@@ -105,30 +74,12 @@ let resolve t v =
10574
let rec resolve' t v =
10675
match Atomic.get t.state with
10776
| Resolved _ -> invalid_arg "Can't resolve already-resolved promise"
108-
| Unresolved (q, mutex) as prev ->
109-
(* The above [get] just gets us access to the mutex;
110-
By the time we get here, the promise may have become resolved. *)
111-
Mutex.lock mutex;
112-
(* Having opened the mutex, we have:
113-
- Access to the waiters.
114-
- Half access to the promise's state (so we know it can't change until we close the mutex).
115-
- The mutex invariant.
116-
Now we open the atomic again, getting the other half access. Together,
117-
this gives us full access to the state (i.e. no-one else can be using
118-
it), allowing us to change it.
119-
Note: we don't actually need an atomic CAS here, just a get and a set
120-
would do, but this seems simplest. *)
77+
| Unresolved b as prev ->
12178
if Atomic.compare_and_set t.state prev (Resolved v) then (
122-
(* The atomic now has half-access to the fullfilled state (which counts
123-
as non-zero), and we have the other half. Now we need to restore the
124-
mutex invariant by clearing the wakers. *)
12579
Ctf.note_resolved t.id ~ex:None;
126-
Waiters.wake_all q v;
127-
Mutex.unlock mutex
80+
Broadcast.resume_all b
12881
) else (
129-
(* Otherwise, the promise was already resolved when we opened the mutex.
130-
Close it without any changes and retry. *)
131-
Mutex.unlock mutex;
82+
(* Otherwise, the promise was already resolved. Retry (to get the error). *)
13283
resolve' t v
13384
)
13485
in

0 commit comments

Comments
 (0)