Skip to content

Commit 9e6ea54

Browse files
committed
WIP: Add blocking
1 parent 30ccc3c commit 9e6ea54

File tree

3 files changed

+232
-37
lines changed

3 files changed

+232
-37
lines changed

README.md

Lines changed: 84 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ is distributed under the [ISC license](LICENSE.md).
5252
- [Race to cooperate](#race-to-cooperate)
5353
- [A three-stack lock-free queue](#a-three-stack-lock-free-queue)
5454
- [A rehashable lock-free hash table](#a-rehashable-lock-free-hash-table)
55+
- [Blocking](#blocking)
5556
- [Development](#development)
5657

5758
## A quick tour
@@ -515,25 +516,6 @@ Or transfer elements between different transactional data structures:
515516
The ability to compose transactions allows algorithms and data-structures to be
516517
used for a wider variety of purposes.
517518

518-
#### About transactions
519-
520-
The transaction mechanism provided by **kcas** is quite intentionally designed
521-
to be very simple and efficient. This also means that it cannot provide certain
522-
features, because adding such features would either add significant dependencies
523-
or overheads to the otherwise simple and efficient implementation. In
524-
particular, the transactions provided by **kcas** do not directly provide
525-
blocking or the ability to wait for changes to shared memory locations before
526-
retrying a transaction. The way
527-
[`commit`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Tx/index.html#val-commit)
528-
works is that it simply retries the transaction in case it failed. To avoid
529-
contention, a
530-
[`backoff`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Backoff/index.html)
531-
mechanism is used, but otherwise
532-
[`commit`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Tx/index.html#val-commit)
533-
will essentially perform a
534-
[busy-wait](https://en.wikipedia.org/wiki/Busy_waiting), which should usually be
535-
avoided.
536-
537519
### Programming with explicit transaction log passing
538520

539521
The [`Xt`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html)
@@ -1230,6 +1212,89 @@ What we have here is a lock-free hash table with rehashing that should not be
12301212
highly prone to starvation. In other respects this is a fairly naive hash table
12311213
implementation. You might want to think about various ways to improve upon it.
12321214

1215+
### Blocking
1216+
1217+
Consider the following approach to unconditionally pop an element from a stack:
1218+
1219+
```ocaml
1220+
# let pop ~xt stack =
1221+
match Xt.update ~xt stack tl_safe with
1222+
| [] -> raise Exit
1223+
| elem :: _ -> elem
1224+
val pop : xt:'a Xt.t -> 'b list Loc.t -> 'b = <fun>
1225+
```
1226+
1227+
A transaction that attempts to use the above on an empty stack will be keep on
1228+
retrying until some other domain pushes an element to the stack. This sort of
1229+
[busy-wait](https://en.wikipedia.org/wiki/Busy_waiting) should generally be
1230+
avoided.
1231+
1232+
```ocaml
1233+
# let stack () : _ stack = Loc.make ~awaitable:true []
1234+
val stack : unit -> 'a stack = <fun>
1235+
```
1236+
1237+
```ocaml
1238+
# let scheduler : Xt.scheduler =
1239+
let open struct
1240+
type t = {
1241+
mutex : Mutex.t;
1242+
condition : Condition.t;
1243+
mutable signaled : bool;
1244+
}
1245+
end in
1246+
let key = Domain.DLS.new_key @@ fun () -> {
1247+
mutex = Mutex.create ();
1248+
condition = Condition.create ();
1249+
signaled = false;
1250+
} in
1251+
fun suspend ->
1252+
let t = Domain.DLS.get key in
1253+
suspend (fun () ->
1254+
Mutex.lock t.mutex;
1255+
t.signaled <- true;
1256+
Mutex.unlock t.mutex;
1257+
Condition.broadcast t.condition);
1258+
Mutex.lock t.mutex;
1259+
while not t.signaled do
1260+
Condition.wait t.condition t.mutex
1261+
done;
1262+
t.signaled <- false;
1263+
Mutex.unlock t.mutex
1264+
val scheduler : Xt.scheduler = <fun>
1265+
```
1266+
1267+
```ocaml
1268+
# let push ~xt stack elem = Xt.of_tx ~xt (push stack elem)
1269+
val push : xt:'a Xt.t -> 'b list Loc.t -> 'b -> unit = <fun>
1270+
```
1271+
1272+
```ocaml
1273+
# let input_stack : int stack = stack ()
1274+
val input_stack : int stack = <abstr>
1275+
# let output_stack : int stack = stack ()
1276+
val output_stack : int stack = <abstr>
1277+
```
1278+
1279+
```ocaml
1280+
# let other = Domain.spawn @@ fun () ->
1281+
let x = Xt.commit ~scheduler { tx = pop input_stack } in
1282+
Xt.commit { tx = push output_stack x }
1283+
val other : unit Domain.t = <abstr>
1284+
```
1285+
1286+
```ocaml
1287+
# Xt.commit { tx = push input_stack 42 }
1288+
- : unit = ()
1289+
# Xt.commit ~scheduler { tx = pop output_stack }
1290+
- : int = 42
1291+
```
1292+
1293+
```ocaml
1294+
# Domain.join other
1295+
- : unit = ()
1296+
```
1297+
12331298
## Development
12341299

12351300
### Formatting

src/kcas.ml

Lines changed: 133 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,23 @@
55

66
module Backoff = Backoff
77

8+
module AwaitableId = struct
9+
let id = Atomic.make (Int.min_int asr 1)
10+
let get_unique () = Atomic.fetch_and_add id 1
11+
let is id = id < 0
12+
let to_awaiters id = Int.min_int - 1 - id
13+
end
14+
815
module Id = struct
916
let id = Atomic.make 1
1017
let get_unique () = Atomic.fetch_and_add id 1
1118
end
1219

20+
type awaiter = [ `Init | `Resumed | `Waiting of unit -> unit ] Atomic.t
1321
type determined = [ `After | `Before ]
1422

15-
type 'a loc = { state : 'a state Atomic.t; id : int }
23+
type 'a loc = { state : 'a state Atomic.t; id : int; awaiters : awaiters }
24+
and awaiters = awaiter list loc
1625
and 'a state = { mutable before : 'a; mutable after : 'a; mutable casn : casn }
1726
and cass = CASN : 'a loc * 'a state * cass * cass -> cass | NIL : cass
1827
and casn = status Atomic.t
@@ -190,11 +199,26 @@ let cas loc before state =
190199
&& Atomic.compare_and_set loc.state state' state
191200
[@@inline]
192201

202+
let make_awaiters id =
203+
let rec awaiters =
204+
{
205+
state = Atomic.make @@ new_state [];
206+
id = AwaitableId.to_awaiters id;
207+
awaiters;
208+
}
209+
in
210+
awaiters
211+
193212
module Loc = struct
194213
type 'a t = 'a loc
195214

196-
let make after =
197-
{ state = Atomic.make @@ new_state after; id = Id.get_unique () }
215+
let make ?(awaitable = false) after =
216+
let state = Atomic.make @@ new_state after in
217+
let id =
218+
if awaitable then AwaitableId.get_unique () else Id.get_unique ()
219+
in
220+
let awaiters = if awaitable then make_awaiters id else Obj.magic () in
221+
{ state; id; awaiters }
198222

199223
let get_id loc = loc.id [@@inline]
200224

@@ -446,9 +470,41 @@ module Xt = struct
446470

447471
let call { tx } = tx [@@inline]
448472

473+
let rec take_awaiters ~xt awaiters = function
474+
| NIL -> awaiters
475+
| CASN (loc, state, l, r) ->
476+
let awaiters =
477+
if l != NIL then take_awaiters ~xt awaiters l else awaiters
478+
in
479+
if AwaitableId.is loc.id then
480+
let awaiters =
481+
if state.before != state.after then
482+
match exchange ~xt loc.awaiters [] with
483+
| [] -> awaiters
484+
| aws -> aws :: awaiters
485+
else awaiters
486+
in
487+
take_awaiters ~xt awaiters r
488+
else awaiters
489+
490+
let resume_awaiters awaiters =
491+
awaiters
492+
|> List.iter @@ List.iter
493+
@@ fun awaiter ->
494+
match Atomic.exchange awaiter `Resumed with
495+
| `Waiting resume -> resume ()
496+
| _ -> ()
497+
[@@inline never]
498+
499+
let resume_awaiters = function
500+
| [] -> ()
501+
| awaiters -> resume_awaiters awaiters
502+
[@@inline]
503+
449504
let attempt (mode : Mode.t) tx =
450505
let xt = { casn = Atomic.make (mode :> status); cass = NIL } in
451506
let result = tx ~xt in
507+
let awaiters = take_awaiters ~xt [] xt.cass in
452508
match xt.cass with
453509
| NIL -> result
454510
| CASN (loc, state, NIL, NIL) ->
@@ -457,20 +513,83 @@ module Xt = struct
457513
let before = state.before in
458514
state.before <- state.after;
459515
if cas loc before state then result else exit ()
460-
| cass -> if determine_for_owner xt.casn cass then result else exit ()
516+
| cass ->
517+
if determine_for_owner xt.casn cass then (
518+
resume_awaiters awaiters;
519+
result)
520+
else exit ()
461521

462-
let rec commit backoff mode tx =
463-
match attempt mode tx with
464-
| result -> result
465-
| exception Mode.Interference ->
466-
commit (Backoff.once backoff) Mode.lock_free tx
467-
| exception Exit -> commit (Backoff.once backoff) mode tx
522+
let attempt ?(mode = Mode.lock_free) tx = attempt mode tx.tx [@@inline]
468523

469-
let commit ?(backoff = Backoff.default) ?(mode = Mode.obstruction_free) tx =
470-
commit backoff mode tx.tx
471-
[@@inline]
524+
type scheduler = ((unit -> unit) -> unit) -> unit
525+
526+
let rec add_awaiter ~xt awaiter = function
527+
| NIL -> ()
528+
| CASN (loc, _, l, r) ->
529+
if l != NIL then add_awaiter ~xt awaiter l;
530+
if AwaitableId.is loc.id then (
531+
modify ~xt loc.awaiters (List.cons awaiter);
532+
add_awaiter ~xt awaiter r)
533+
534+
let rec reset casn = function
535+
| NIL -> NIL
536+
| CASN (loc, state, l, r) as old ->
537+
let l' = reset casn l and r' = reset casn r in
538+
if is_cmp casn state then
539+
if l == l' && r == r' then old else CASN (loc, state, l', r')
540+
else
541+
let state' = Atomic.get loc.state in
542+
let current = eval state' in
543+
if current != state.before then exit () else CASN (loc, state', l, r)
472544

473-
let attempt ?(mode = Mode.lock_free) tx = attempt mode tx.tx [@@inline]
545+
let rec commit backoff (mode : Mode.t) scheduler_opt tx =
546+
let xt = { casn = Atomic.make (mode :> status); cass = NIL } in
547+
match tx ~xt with
548+
| result -> (
549+
let awaiters = take_awaiters ~xt [] xt.cass in
550+
match determine_for_owner xt.casn xt.cass with
551+
| true ->
552+
resume_awaiters awaiters;
553+
result
554+
| false -> commit (Backoff.once backoff) mode scheduler_opt tx
555+
| exception Mode.Interference ->
556+
commit (Backoff.once backoff) Mode.lock_free scheduler_opt tx)
557+
| exception Exit -> (
558+
match scheduler_opt with
559+
| None -> commit (Backoff.once backoff) mode scheduler_opt tx
560+
| Some scheduler -> (
561+
match reset xt.casn xt.cass with
562+
| cass -> (
563+
xt.cass <- cass;
564+
let self = Atomic.make `Init in
565+
add_awaiter ~xt self cass;
566+
if xt.cass == cass then
567+
commit (Backoff.once backoff) mode scheduler_opt tx
568+
else
569+
match determine_for_owner xt.casn xt.cass with
570+
| true ->
571+
if Atomic.get self == `Init then
572+
scheduler (fun resume ->
573+
if
574+
not
575+
(Atomic.compare_and_set self `Init
576+
(`Waiting resume))
577+
then resume ());
578+
(* TODO: remove awaiters *)
579+
commit
580+
(Backoff.once (Backoff.reset backoff))
581+
mode scheduler_opt tx
582+
| false -> commit (Backoff.once backoff) mode scheduler_opt tx
583+
| exception Mode.Interference ->
584+
commit (Backoff.once backoff) Mode.lock_free scheduler_opt
585+
tx)
586+
| exception Exit ->
587+
commit (Backoff.once backoff) mode scheduler_opt tx))
588+
589+
let commit ?(backoff = Backoff.default) ?(mode = Mode.obstruction_free)
590+
?scheduler { tx } =
591+
commit backoff mode scheduler tx
592+
[@@inline]
474593

475594
let of_tx tx ~xt =
476595
let (_, cass), x = tx (xt.casn, xt.cass) in

src/kcas.mli

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,13 @@ module Loc : sig
1313
type 'a t
1414
(** Type of shared memory locations. *)
1515

16-
val make : 'a -> 'a t
16+
val make : ?awaitable:bool -> 'a -> 'a t
1717
(** [make initial] creates a new shared memory location with the [initial]
18-
value. *)
18+
value.
19+
20+
By default a location is considered to be unawaitable to avoid overheads.
21+
{!Xt.commit} cannot block to await for changes to unawaitable locations.
22+
To make a location awaitable, simply pass [~awaitable:true]. *)
1923

2024
val get_id : 'a t -> int
2125
(** [get_id r] returns the unique id of the shared memory location [r]. *)
@@ -411,7 +415,13 @@ module Xt : sig
411415
the transaction or returns the result of the transaction. The default for
412416
[attempt] is {!Mode.lock_free}. *)
413417

414-
val commit : ?backoff:Backoff.t -> ?mode:Mode.t -> 'a tx -> 'a
418+
type scheduler = ((unit -> unit) -> unit) -> unit
419+
(** Type of a scheduler that can be called to suspend execution of the current
420+
domain or fiber until the resume function given by the scheduler is
421+
called. *)
422+
423+
val commit :
424+
?backoff:Backoff.t -> ?mode:Mode.t -> ?scheduler:scheduler -> 'a tx -> 'a
415425
(** [commit tx] repeats [attempt tx] until it does not raise [Exit] or
416426
{!Mode.Interference} and then either returns or raises whatever attempt
417427
returned or raised.
@@ -422,7 +432,8 @@ module Xt : sig
422432
423433
Note that, aside from using exponential backoff to reduce contention, the
424434
transaction mechanism has no way to intelligently wait until shared memory
425-
locations are modified by other domains. *)
435+
locations are modified by other domains unless a suitable {!scheduler} is
436+
given. *)
426437

427438
(** {1 Conversions} *)
428439

0 commit comments

Comments
 (0)