Skip to content

Commit 0dce076

Browse files
committed
WIP: Add blocking
1 parent 6465475 commit 0dce076

File tree

18 files changed

+489
-79
lines changed

18 files changed

+489
-79
lines changed

README.md

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ is distributed under the [ISC license](LICENSE.md).
4646
- [A transactional lock-free stack](#a-transactional-lock-free-stack)
4747
- [A transactional lock-free queue](#a-transactional-lock-free-queue)
4848
- [Composing transactions](#composing-transactions)
49-
- [About transactions](#about-transactions)
5049
- [A transactional lock-free leftist heap](#a-transactional-lock-free-leftist-heap)
5150
- [A composable Michael-Scott style queue](#a-composable-michael-scott-style-queue)
5251
- [Designing lock-free algorithms with k-CAS](#designing-lock-free-algorithms-with-k-cas)
@@ -58,6 +57,7 @@ is distributed under the [ISC license](LICENSE.md).
5857
- [Understanding transactions](#understanding-transactions)
5958
- [A three-stack lock-free queue](#a-three-stack-lock-free-queue)
6059
- [A rehashable lock-free hash table](#a-rehashable-lock-free-hash-table)
60+
- [Blocking](#blocking)
6161
- [Beware of torn reads](#beware-of-torn-reads)
6262
- [Development](#development)
6363

@@ -504,25 +504,6 @@ Or transfer elements between different transactional data structures:
504504
The ability to compose transactions allows algorithms and data-structures to be
505505
used for a wider variety of purposes.
506506

507-
#### About transactions
508-
509-
The transaction mechanism provided by **kcas** is quite intentionally designed
510-
to be very simple and efficient. This also means that it cannot provide certain
511-
features, because adding such features would either add significant dependencies
512-
or overheads to the otherwise simple and efficient implementation. In
513-
particular, the transactions provided by **kcas** do not directly provide
514-
blocking or the ability to wait for changes to shared memory locations before
515-
retrying a transaction. The way
516-
[`commit`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-commit)
517-
works is that it simply retries the transaction in case it failed. To avoid
518-
contention, a
519-
[`backoff`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Backoff/index.html)
520-
mechanism is used, but otherwise
521-
[`commit`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-commit)
522-
will essentially perform a
523-
[busy-wait](https://en.wikipedia.org/wiki/Busy_waiting), which should usually be
524-
avoided.
525-
526507
#### A transactional lock-free leftist heap
527508

528509
Let's implement something a bit more complicated,
@@ -1434,6 +1415,54 @@ What we have here is a lock-free hash table with rehashing that should not be
14341415
highly prone to starvation. In other respects this is a fairly naive hash table
14351416
implementation. You might want to think about various ways to improve upon it.
14361417

1418+
### Blocking
1419+
1420+
Consider the following approach to unconditionally pop an element from a stack:
1421+
1422+
```ocaml
1423+
# let pop ~xt stack =
1424+
match Xt.update ~xt stack tl_safe with
1425+
| [] -> raise Exit
1426+
| elem :: _ -> elem
1427+
val pop : xt:'a Xt.t -> 'b list Loc.t -> 'b = <fun>
1428+
```
1429+
1430+
A transaction that attempts to use the above on an empty stack will keep on
1431+
retrying until some other domain pushes an element to the stack. This sort of
1432+
[busy-wait](https://en.wikipedia.org/wiki/Busy_waiting) should generally be
1433+
avoided.
1434+
1435+
```ocaml
1436+
# let stack () : _ stack = Loc.make ~awaitable:true []
1437+
val stack : unit -> 'a stack = <fun>
1438+
```
1439+
1440+
```ocaml
1441+
# let input_stack : int stack = stack ()
1442+
val input_stack : int stack = <abstr>
1443+
# let output_stack : int stack = stack ()
1444+
val output_stack : int stack = <abstr>
1445+
```
1446+
1447+
```ocaml
1448+
# let other = Domain.spawn @@ fun () ->
1449+
let x = Xt.commit { tx = pop input_stack } in
1450+
Xt.commit { tx = push output_stack x }
1451+
val other : unit Domain.t = <abstr>
1452+
```
1453+
1454+
```ocaml
1455+
# Xt.commit { tx = push input_stack 42 }
1456+
- : unit = ()
1457+
# Xt.commit { tx = pop output_stack }
1458+
- : int = 42
1459+
```
1460+
1461+
```ocaml
1462+
# Domain.join other
1463+
- : unit = ()
1464+
```
1465+
14371466
### Beware of torn reads
14381467

14391468
The algorithm underlying **kcas** ensures that it is not possible to read

src/backoff.mli

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
1717
*)
1818

19+
(** Randomized exponential backoff mechanism. *)
20+
1921
type t
2022
(** Type of backoff values. *)
2123

src/kcas.ml

Lines changed: 119 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,16 @@
33
* Copyright (c) 2023, Vesa Karvonen <vesa.a.j.k@gmail.com>
44
*)
55

6+
module Trigger = Trigger
67
module Backoff = Backoff
78

9+
module AwaitableId = struct
10+
let id = Atomic.make (Int.min_int asr 1)
11+
let get_unique () = Atomic.fetch_and_add id 1
12+
let is id = id < 0
13+
let to_awaiters id = Int.min_int - 1 - id
14+
end
15+
816
module Id = struct
917
let id = Atomic.make 1
1018
let get_unique () = Atomic.fetch_and_add id 1
@@ -30,9 +38,11 @@ end = struct
3038
[@@inline]
3139
end
3240

41+
type awaiter = [ `Init | `Resumed | `Waiting of unit -> unit ] Atomic.t
3342
type determined = [ `After | `Before ]
3443

35-
type 'a loc = { state : 'a state Atomic.t; id : int }
44+
type 'a loc = { state : 'a state Atomic.t; id : int; awaiters : awaiters }
45+
and awaiters = awaiter list loc
3646
and 'a state = { mutable before : 'a; mutable after : 'a; mutable casn : casn }
3747
and cass = CASN : 'a loc * 'a state * cass * cass -> cass | NIL : cass
3848
and casn = status Atomic.t
@@ -214,12 +224,23 @@ let cas loc before state =
214224
let inc x = x + 1
215225
let dec x = x - 1
216226

227+
let make_awaiters id =
228+
let state = Atomic.make @@ new_state [] and id = AwaitableId.to_awaiters id in
229+
let rec awaiters = { state; id; awaiters } in
230+
awaiters
231+
217232
module Loc = struct
218233
type 'a t = 'a loc
219234

220-
let make after =
221-
{ state = Atomic.make @@ new_state after; id = Id.get_unique () }
235+
let make ?(awaitable = false) after =
236+
let state = Atomic.make @@ new_state after in
237+
let id =
238+
if awaitable then AwaitableId.get_unique () else Id.get_unique ()
239+
in
240+
let awaiters = if awaitable then make_awaiters id else Obj.magic () in
241+
{ state; id; awaiters }
222242

243+
let is_awaitable loc = AwaitableId.is loc.id
223244
let get_id loc = loc.id [@@inline]
224245

225246
let get loc =
@@ -391,38 +412,106 @@ module Xt = struct
391412

392413
let call { tx } = tx [@@inline]
393414

394-
let attempt (mode : Mode.t) tx =
415+
let rec take_awaiters ~xt awaiters = function
416+
| NIL -> awaiters
417+
| CASN (loc, state, l, r) ->
418+
let awaiters =
419+
if l != NIL then take_awaiters ~xt awaiters l else awaiters
420+
in
421+
if AwaitableId.is loc.id then
422+
let awaiters =
423+
if state.before != state.after then
424+
match exchange ~xt loc.awaiters [] with
425+
| [] -> awaiters
426+
| aws -> aws :: awaiters
427+
else awaiters
428+
in
429+
take_awaiters ~xt awaiters r
430+
else awaiters
431+
432+
let resume_awaiters awaiters =
433+
awaiters
434+
|> List.iter @@ List.iter
435+
@@ fun awaiter ->
436+
match Atomic.exchange awaiter `Resumed with
437+
| `Waiting resume -> resume ()
438+
| _ -> ()
439+
[@@inline never]
440+
441+
let resume_awaiters = function
442+
| [] -> ()
443+
| awaiters -> resume_awaiters awaiters
444+
[@@inline]
445+
446+
let rec add_awaiter ~xt awaiter = function
447+
| NIL -> ()
448+
| CASN (loc, _, l, r) ->
449+
if l != NIL then add_awaiter ~xt awaiter l;
450+
if AwaitableId.is loc.id then (
451+
modify ~xt loc.awaiters (List.cons awaiter);
452+
add_awaiter ~xt awaiter r)
453+
454+
let rec reset casn = function
455+
| NIL -> NIL
456+
| CASN (loc, state, l, r) as old ->
457+
let l' = reset casn l and r' = reset casn r in
458+
if is_cmp casn state then
459+
if l == l' && r == r' then old else CASN (loc, state, l', r')
460+
else
461+
let state' = Atomic.get loc.state in
462+
let current = eval state' in
463+
if current != state.before then exit () else CASN (loc, state', l, r)
464+
465+
let rec commit backoff (mode : Mode.t) tx =
395466
let xt =
396467
let casn = Atomic.make (mode :> status)
397468
and cass = NIL
398469
and post_commit = Action.noop in
399470
{ casn; cass; post_commit }
400471
in
401-
let result = tx ~xt in
402-
match xt.cass with
403-
| NIL -> Action.run xt.post_commit result
404-
| CASN (loc, state, NIL, NIL) ->
405-
if is_cmp xt.casn state then Action.run xt.post_commit result
406-
else
407-
let before = state.before in
408-
state.before <- state.after;
409-
if cas loc before state then Action.run xt.post_commit result
410-
else exit ()
411-
| cass ->
412-
if determine_for_owner xt.casn cass then
413-
Action.run xt.post_commit result
414-
else exit ()
415-
416-
let rec commit backoff mode tx =
417-
match attempt mode tx with
418-
| result -> result
419-
| exception Mode.Interference ->
420-
commit (Backoff.once backoff) Mode.lock_free tx
421-
| exception Exit -> commit (Backoff.once backoff) mode tx
422-
423-
let commit ?(backoff = Backoff.default) ?(mode = Mode.obstruction_free) tx =
424-
commit backoff mode tx.tx
472+
match tx ~xt with
473+
| result -> (
474+
let awaiters = take_awaiters ~xt [] xt.cass in
475+
match xt.cass with
476+
| NIL -> Action.run xt.post_commit result
477+
| CASN (loc, state, NIL, NIL) ->
478+
if is_cmp xt.casn state then Action.run xt.post_commit result
479+
else
480+
let before = state.before in
481+
state.before <- state.after;
482+
if cas loc before state then Action.run xt.post_commit result
483+
else commit (Backoff.once backoff) mode tx
484+
| cass -> (
485+
match determine_for_owner xt.casn cass with
486+
| true ->
487+
resume_awaiters awaiters;
488+
Action.run xt.post_commit result
489+
| false -> commit (Backoff.once backoff) mode tx
490+
| exception Mode.Interference ->
491+
commit (Backoff.once backoff) Mode.lock_free tx))
492+
| exception Exit -> (
493+
match reset xt.casn xt.cass with
494+
| cass -> (
495+
xt.cass <- cass;
496+
let self = Atomic.make `Init in
497+
add_awaiter ~xt self cass;
498+
if xt.cass == cass then commit (Backoff.once backoff) mode tx
499+
else
500+
match determine_for_owner xt.casn xt.cass with
501+
| true ->
502+
(if Atomic.get self == `Init then
503+
let t = Trigger.prepare_for_await () in
504+
if Atomic.compare_and_set self `Init (`Waiting t.release)
505+
then t.await ());
506+
(* TODO: remove awaiters *)
507+
commit (Backoff.once (Backoff.reset backoff)) mode tx
508+
| false -> commit (Backoff.once backoff) mode tx
509+
| exception Mode.Interference ->
510+
commit (Backoff.once backoff) Mode.lock_free tx)
511+
| exception Exit -> commit (Backoff.once backoff) mode tx)
512+
513+
let commit ?(backoff = Backoff.default) ?(mode = Mode.obstruction_free) { tx }
514+
=
515+
commit backoff mode tx
425516
[@@inline]
426-
427-
let attempt ?(mode = Mode.lock_free) tx = attempt mode tx.tx [@@inline]
428517
end

src/kcas.mli

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(** {1 Auxiliary modules} *)
22

3+
module Trigger : module type of Trigger
34
module Backoff : module type of Backoff
4-
(** Randomized exponential backoff mechanism. *)
55

66
(** {1 Individual locations}
77
@@ -13,9 +13,17 @@ module Loc : sig
1313
type !'a t
1414
(** Type of shared memory locations. *)
1515

16-
val make : 'a -> 'a t
16+
val make : ?awaitable:bool -> 'a -> 'a t
1717
(** [make initial] creates a new shared memory location with the [initial]
18-
value. *)
18+
value.
19+
20+
By default a location is considered to be unawaitable to avoid overheads.
21+
{!Xt.commit} cannot block to await for changes to unawaitable locations.
22+
To make a location awaitable, simply pass [~awaitable:true]. *)
23+
24+
val is_awaitable : 'a t -> bool
25+
(** [is_awaitable r] determines whether the shared memory location [r] is
26+
awaitable. *)
1927

2028
val get_id : 'a t -> int
2129
(** [get_id r] returns the unique id of the shared memory location [r]. *)
@@ -239,24 +247,14 @@ module Xt : sig
239247
val call : 'a tx -> xt:'x t -> 'a
240248
(** [call ~xt tx] is equivalent to [tx.Xt.tx ~xt]. *)
241249

242-
val attempt : ?mode:Mode.t -> 'a tx -> 'a
243-
(** [attempt tx] attempts to atomically perform the transaction over shared
244-
memory locations recorded by calling [tx] with a fresh explicit
245-
transaction log. If used in {!Mode.obstruction_free} may raise
246-
{!Mode.Interference}. Otherwise either raises [Exit] on failure to commit
247-
the transaction or returns the result of the transaction. The default for
248-
[attempt] is {!Mode.lock_free}. *)
249-
250250
val commit : ?backoff:Backoff.t -> ?mode:Mode.t -> 'a tx -> 'a
251-
(** [commit tx] repeats [attempt tx] until it does not raise [Exit] or
252-
{!Mode.Interference} and then either returns or raises whatever attempt
253-
returned or raised.
251+
(** [commit tx] repeatedly calls [tx] to record a log of shared memory
252+
accesses and attempts to perform them atomically until it succeeds and
253+
then returns whatever [tx] returned. [tx] may raise [Exit] to explicitly
254+
request a retry or any other exception than {!Mode.Interference} to abort
255+
the transaction.
254256
255257
The default for [commit] is {!Mode.obstruction_free}. However, after
256258
enough attempts have failed during the verification step, [commit]
257-
switches to {!Mode.lock_free}.
258-
259-
Note that, aside from using exponential backoff to reduce contention, the
260-
transaction mechanism has no way to intelligently wait until shared memory
261-
locations are modified by other domains. *)
259+
switches to {!Mode.lock_free}. *)
262260
end

src/kcas_data/kcas_data.ml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ module Hashtbl = Hashtbl
5454
module Queue = Queue
5555
module Stack = Stack
5656

57+
(** {1 [Eio] style synchronization primitives} *)
58+
59+
module Promise = Promise
60+
module Semaphore = Semaphore
61+
5762
(** {1 Utilities} *)
5863

5964
module Accumulator = Accumulator

0 commit comments

Comments
 (0)