Skip to content

Commit c2c780c

Browse files
committed
WIP: Add blocking
1 parent cfa7db8 commit c2c780c

File tree

7 files changed

+261
-48
lines changed

7 files changed

+261
-48
lines changed

README.md

Lines changed: 84 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ is distributed under the [ISC license](LICENSE.md).
5959
- [Understanding transactions](#understanding-transactions)
6060
- [A three-stack lock-free queue](#a-three-stack-lock-free-queue)
6161
- [A rehashable lock-free hash table](#a-rehashable-lock-free-hash-table)
62+
- [Blocking](#blocking)
6263
- [Beware of torn reads](#beware-of-torn-reads)
6364
- [Development](#development)
6465

@@ -523,25 +524,6 @@ Or transfer elements between different transactional data structures:
523524
The ability to compose transactions allows algorithms and data-structures to be
524525
used for a wider variety of purposes.
525526

526-
#### About transactions
527-
528-
The transaction mechanism provided by **kcas** is quite intentionally designed
529-
to be very simple and efficient. This also means that it cannot provide certain
530-
features, because adding such features would either add significant dependencies
531-
or overheads to the otherwise simple and efficient implementation. In
532-
particular, the transactions provided by **kcas** do not directly provide
533-
blocking or the ability to wait for changes to shared memory locations before
534-
retrying a transaction. The way
535-
[`commit`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Tx/index.html#val-commit)
536-
works is that it simply retries the transaction in case it failed. To avoid
537-
contention, a
538-
[`backoff`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Backoff/index.html)
539-
mechanism is used, but otherwise
540-
[`commit`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Tx/index.html#val-commit)
541-
will essentially perform a
542-
[busy-wait](https://en.wikipedia.org/wiki/Busy_waiting), which should usually be
543-
avoided.
544-
545527
### Programming with explicit transaction log passing
546528

547529
The [`Xt`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html)
@@ -1494,6 +1476,89 @@ What we have here is a lock-free hash table with rehashing that should not be
14941476
highly prone to starvation. In other respects this is a fairly naive hash table
14951477
implementation. You might want to think about various ways to improve upon it.
14961478

1479+
### Blocking
1480+
1481+
Consider the following approach to unconditionally pop an element from a stack:
1482+
1483+
```ocaml
1484+
# let pop ~xt stack =
1485+
match Xt.update ~xt stack tl_safe with
1486+
| [] -> raise Exit
1487+
| elem :: _ -> elem
1488+
val pop : xt:'a Xt.t -> 'b list Loc.t -> 'b = <fun>
1489+
```
1490+
1491+
A transaction that attempts to use the above on an empty stack will keep on
1492+
retrying until some other domain pushes an element to the stack. This sort of
1493+
[busy-wait](https://en.wikipedia.org/wiki/Busy_waiting) should generally be
1494+
avoided.
1495+
1496+
```ocaml
1497+
# let stack () : _ stack = Loc.make ~awaitable:true []
1498+
val stack : unit -> 'a stack = <fun>
1499+
```
1500+
1501+
```ocaml
1502+
# let scheduler : Xt.scheduler =
1503+
let open struct
1504+
type t = {
1505+
mutex : Mutex.t;
1506+
condition : Condition.t;
1507+
mutable signaled : bool;
1508+
}
1509+
end in
1510+
let key = Domain.DLS.new_key @@ fun () -> {
1511+
mutex = Mutex.create ();
1512+
condition = Condition.create ();
1513+
signaled = false;
1514+
} in
1515+
fun suspend ->
1516+
let t = Domain.DLS.get key in
1517+
suspend (fun () ->
1518+
Mutex.lock t.mutex;
1519+
t.signaled <- true;
1520+
Mutex.unlock t.mutex;
1521+
Condition.broadcast t.condition);
1522+
Mutex.lock t.mutex;
1523+
while not t.signaled do
1524+
Condition.wait t.condition t.mutex
1525+
done;
1526+
t.signaled <- false;
1527+
Mutex.unlock t.mutex
1528+
val scheduler : Xt.scheduler = <fun>
1529+
```
1530+
1531+
```ocaml
1532+
# let push ~xt stack elem = Xt.of_tx ~xt (push stack elem)
1533+
val push : xt:'a Xt.t -> 'b list Loc.t -> 'b -> unit = <fun>
1534+
```
1535+
1536+
```ocaml
1537+
# let input_stack : int stack = stack ()
1538+
val input_stack : int stack = <abstr>
1539+
# let output_stack : int stack = stack ()
1540+
val output_stack : int stack = <abstr>
1541+
```
1542+
1543+
```ocaml
1544+
# let other = Domain.spawn @@ fun () ->
1545+
let x = Xt.commit ~scheduler { tx = pop input_stack } in
1546+
Xt.commit { tx = push output_stack x }
1547+
val other : unit Domain.t = <abstr>
1548+
```
1549+
1550+
```ocaml
1551+
# Xt.commit { tx = push input_stack 42 }
1552+
- : unit = ()
1553+
# Xt.commit ~scheduler { tx = pop output_stack }
1554+
- : int = 42
1555+
```
1556+
1557+
```ocaml
1558+
# Domain.join other
1559+
- : unit = ()
1560+
```
1561+
14971562
### Beware of torn reads
14981563

14991564
The algorithm underlying **kcas** ensures that it is not possible to read

src/kcas.ml

Lines changed: 142 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@
55

66
module Backoff = Backoff
77

8+
module AwaitableId = struct
9+
let id = Atomic.make (Int.min_int asr 1)
10+
let get_unique () = Atomic.fetch_and_add id 1
11+
let is id = id < 0
12+
let to_awaiters id = Int.min_int - 1 - id
13+
end
14+
815
module Id = struct
916
let id = Atomic.make 1
1017
let get_unique () = Atomic.fetch_and_add id 1
@@ -30,9 +37,11 @@ end = struct
3037
[@@inline]
3138
end
3239

40+
type awaiter = [ `Init | `Resumed | `Waiting of unit -> unit ] Atomic.t
3341
type determined = [ `After | `Before ]
3442

35-
type 'a loc = { state : 'a state Atomic.t; id : int }
43+
type 'a loc = { state : 'a state Atomic.t; id : int; awaiters : awaiters }
44+
and awaiters = awaiter list loc
3645
and 'a state = { mutable before : 'a; mutable after : 'a; mutable casn : casn }
3746
and cass = CASN : 'a loc * 'a state * cass * cass -> cass | NIL : cass
3847
and casn = status Atomic.t
@@ -214,12 +223,23 @@ let cas loc before state =
214223
let inc x = x + 1
215224
let dec x = x - 1
216225

226+
let make_awaiters id =
227+
let state = Atomic.make @@ new_state [] and id = AwaitableId.to_awaiters id in
228+
let rec awaiters = { state; id; awaiters } in
229+
awaiters
230+
217231
module Loc = struct
218232
type 'a t = 'a loc
219233

220-
let make after =
221-
{ state = Atomic.make @@ new_state after; id = Id.get_unique () }
234+
let make ?(awaitable = false) after =
235+
let state = Atomic.make @@ new_state after in
236+
let id =
237+
if awaitable then AwaitableId.get_unique () else Id.get_unique ()
238+
in
239+
let awaiters = if awaitable then make_awaiters id else Obj.magic () in
240+
{ state; id; awaiters }
222241

242+
let is_awaitable loc = AwaitableId.is loc.id
223243
let get_id loc = loc.id [@@inline]
224244

225245
let get loc =
@@ -539,6 +559,37 @@ module Xt = struct
539559

540560
let call { tx } = tx [@@inline]
541561

562+
let rec take_awaiters ~xt awaiters = function
563+
| NIL -> awaiters
564+
| CASN (loc, state, l, r) ->
565+
let awaiters =
566+
if l != NIL then take_awaiters ~xt awaiters l else awaiters
567+
in
568+
if AwaitableId.is loc.id then
569+
let awaiters =
570+
if state.before != state.after then
571+
match exchange ~xt loc.awaiters [] with
572+
| [] -> awaiters
573+
| aws -> aws :: awaiters
574+
else awaiters
575+
in
576+
take_awaiters ~xt awaiters r
577+
else awaiters
578+
579+
let resume_awaiters awaiters =
580+
awaiters
581+
|> List.iter @@ List.iter
582+
@@ fun awaiter ->
583+
match Atomic.exchange awaiter `Resumed with
584+
| `Waiting resume -> resume ()
585+
| _ -> ()
586+
[@@inline never]
587+
588+
let resume_awaiters = function
589+
| [] -> ()
590+
| awaiters -> resume_awaiters awaiters
591+
[@@inline]
592+
542593
let attempt (mode : Mode.t) tx =
543594
let xt =
544595
let casn = Atomic.make (mode :> status)
@@ -547,6 +598,7 @@ module Xt = struct
547598
{ casn; cass; post_commit }
548599
in
549600
let result = tx ~xt in
601+
let awaiters = take_awaiters ~xt [] xt.cass in
550602
match xt.cass with
551603
| NIL -> Action.run xt.post_commit result
552604
| CASN (loc, state, NIL, NIL) ->
@@ -557,22 +609,97 @@ module Xt = struct
557609
if cas loc before state then Action.run xt.post_commit result
558610
else exit ()
559611
| cass ->
560-
if determine_for_owner xt.casn cass then
561-
Action.run xt.post_commit result
612+
if determine_for_owner xt.casn cass then (
613+
resume_awaiters awaiters;
614+
Action.run xt.post_commit result)
562615
else exit ()
563616

564-
let rec commit backoff mode tx =
565-
match attempt mode tx with
566-
| result -> result
567-
| exception Mode.Interference ->
568-
commit (Backoff.once backoff) Mode.lock_free tx
569-
| exception Exit -> commit (Backoff.once backoff) mode tx
617+
let attempt ?(mode = Mode.lock_free) tx = attempt mode tx.tx [@@inline]
570618

571-
let commit ?(backoff = Backoff.default) ?(mode = Mode.obstruction_free) tx =
572-
commit backoff mode tx.tx
573-
[@@inline]
619+
type scheduler = ((unit -> unit) -> unit) -> unit
620+
621+
let rec add_awaiter ~xt awaiter = function
622+
| NIL -> ()
623+
| CASN (loc, _, l, r) ->
624+
if l != NIL then add_awaiter ~xt awaiter l;
625+
if AwaitableId.is loc.id then (
626+
modify ~xt loc.awaiters (List.cons awaiter);
627+
add_awaiter ~xt awaiter r)
628+
629+
let rec reset casn = function
630+
| NIL -> NIL
631+
| CASN (loc, state, l, r) as old ->
632+
let l' = reset casn l and r' = reset casn r in
633+
if is_cmp casn state then
634+
if l == l' && r == r' then old else CASN (loc, state, l', r')
635+
else
636+
let state' = Atomic.get loc.state in
637+
let current = eval state' in
638+
if current != state.before then exit () else CASN (loc, state', l, r)
574639

575-
let attempt ?(mode = Mode.lock_free) tx = attempt mode tx.tx [@@inline]
640+
let rec commit backoff (mode : Mode.t) scheduler_opt tx =
641+
let xt =
642+
let casn = Atomic.make (mode :> status)
643+
and cass = NIL
644+
and post_commit = Action.noop in
645+
{ casn; cass; post_commit }
646+
in
647+
match tx ~xt with
648+
| result -> (
649+
let awaiters = take_awaiters ~xt [] xt.cass in
650+
match xt.cass with
651+
| NIL -> Action.run xt.post_commit result
652+
| CASN (loc, state, NIL, NIL) ->
653+
if is_cmp xt.casn state then Action.run xt.post_commit result
654+
else
655+
let before = state.before in
656+
state.before <- state.after;
657+
if cas loc before state then Action.run xt.post_commit result
658+
else commit (Backoff.once backoff) mode scheduler_opt tx
659+
| cass -> (
660+
match determine_for_owner xt.casn cass with
661+
| true ->
662+
resume_awaiters awaiters;
663+
Action.run xt.post_commit result
664+
| false -> commit (Backoff.once backoff) mode scheduler_opt tx
665+
| exception Mode.Interference ->
666+
commit (Backoff.once backoff) Mode.lock_free scheduler_opt tx))
667+
| exception Exit -> (
668+
match scheduler_opt with
669+
| None -> commit (Backoff.once backoff) mode scheduler_opt tx
670+
| Some scheduler -> (
671+
match reset xt.casn xt.cass with
672+
| cass -> (
673+
xt.cass <- cass;
674+
let self = Atomic.make `Init in
675+
add_awaiter ~xt self cass;
676+
if xt.cass == cass then
677+
commit (Backoff.once backoff) mode scheduler_opt tx
678+
else
679+
match determine_for_owner xt.casn xt.cass with
680+
| true ->
681+
if Atomic.get self == `Init then
682+
scheduler (fun resume ->
683+
if
684+
not
685+
(Atomic.compare_and_set self `Init
686+
(`Waiting resume))
687+
then resume ());
688+
(* TODO: remove awaiters *)
689+
commit
690+
(Backoff.once (Backoff.reset backoff))
691+
mode scheduler_opt tx
692+
| false -> commit (Backoff.once backoff) mode scheduler_opt tx
693+
| exception Mode.Interference ->
694+
commit (Backoff.once backoff) Mode.lock_free scheduler_opt
695+
tx)
696+
| exception Exit ->
697+
commit (Backoff.once backoff) mode scheduler_opt tx))
698+
699+
let commit ?(backoff = Backoff.default) ?(mode = Mode.obstruction_free)
700+
?scheduler { tx } =
701+
commit backoff mode scheduler tx
702+
[@@inline]
576703

577704
let of_tx tx ~xt =
578705
let (_, cass, post_commit), x = tx (xt.casn, xt.cass, xt.post_commit) in

src/kcas.mli

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,17 @@ module Loc : sig
1313
type !'a t
1414
(** Type of shared memory locations. *)
1515

16-
val make : 'a -> 'a t
16+
val make : ?awaitable:bool -> 'a -> 'a t
1717
(** [make initial] creates a new shared memory location with the [initial]
18-
value. *)
18+
value.
19+
20+
By default a location is considered to be unawaitable to avoid overheads.
21+
{!Xt.commit} cannot block to await for changes to unawaitable locations.
22+
To make a location awaitable, simply pass [~awaitable:true]. *)
23+
24+
val is_awaitable : 'a t -> bool
25+
(** [is_awaitable r] determines whether the shared memory location [r] is
26+
awaitable. *)
1927

2028
val get_id : 'a t -> int
2129
(** [get_id r] returns the unique id of the shared memory location [r]. *)
@@ -494,7 +502,13 @@ module Xt : sig
494502
the transaction or returns the result of the transaction. The default for
495503
[attempt] is {!Mode.lock_free}. *)
496504

497-
val commit : ?backoff:Backoff.t -> ?mode:Mode.t -> 'a tx -> 'a
505+
type scheduler = ((unit -> unit) -> unit) -> unit
506+
(** Type of a scheduler that can be called to suspend execution of the current
507+
domain or fiber until the resume function given by the scheduler is
508+
called. *)
509+
510+
val commit :
511+
?backoff:Backoff.t -> ?mode:Mode.t -> ?scheduler:scheduler -> 'a tx -> 'a
498512
(** [commit tx] repeats [attempt tx] until it does not raise [Exit] or
499513
{!Mode.Interference} and then either returns or raises whatever attempt
500514
returned or raised.
@@ -505,7 +519,8 @@ module Xt : sig
505519
506520
Note that, aside from using exponential backoff to reduce contention, the
507521
transaction mechanism has no way to intelligently wait until shared memory
508-
locations are modified by other domains. *)
522+
locations are modified by other domains unless a suitable {!scheduler} is
523+
given. *)
509524

510525
(** {1 Conversions} *)
511526

0 commit comments

Comments
 (0)