-
Notifications
You must be signed in to change notification settings - Fork 13
Description
This issue is based on my comment on previous PR that introduced the
TxAPI. I will likely update this issue with some further notes.
The Tx mechanism is fundamentally limited as it does not support non-busy wait or blocking. Unfortunately, adding full support for blocking seems like it would be outside the scope of the underlying algorithms as it would add significant dependencies (ability to suspend fibers and/or domains) and/or significant overheads (having lists of waiters with each location, and accessing those lists during transactions).
However, there might be practical ways to extend the Tx API to allow it to support low overhead blocking transactions on top of the underlying transaction log mechanism.
To support blocking, one essentially needs a way to signal waiters. After mutating some locations the mutator signals waiters. For a scalable mechanism that signal needs to be selective and only wake up those waiters that are interested in the mutated locations.
To associate waiters with locations in a truly low-overhead fashion, one possibility would be to allow locations to be "tagged":
module Loc : sig
type ('tag, 'a) tagged
val make_tagged: 'tag -> 'a -> ('tag, 'a) tagged
val get_tag : ('tag, 'a) tagged -> 'tag
type 'a t = (unit, 'a) t
(* ... *)In a blocking transaction mechanism that 'tag could be a bag of the waiters of changes to the location.
Additionally, a scalable blocking mechanism also needs to be able to efficiently figure out which locations have been read and which have been written. A waiter needs to add itself to the read locations and a mutator needs to signal waiters of written locations.
module Tx : sig
(* ... *)
module Log : sig
type t
type 'r reducer = {
one : 't 'a. ('t, 'a) Loc.tagged -> 'r;
zero : 'r;
plus : 'r -> 'r -> 'r;
}
val reduce : 'r reducer -> t -> 'r
(** [reduce reducer] performs a fold over the transaction log. *)
end
exception Retry of unit t
(** Exception raised by {!reset_and_retry}. *)
val reset_and_retry : (Log.t -> unit t) -> 'a t
(** [reset_and_retry on_read] returns a transaction that resets the current
transaction such that it only reads from the accessed locations. The
[on_read] function is then called with the internal transaction log to
construct a transaction that is then composed after the current
transaction. The composed transaction [tx] is then raised as a
[Retry tx] exception. *)
val written: (Log.t -> unit t) -> 'a t -> 'a t
(** [written on_written tx] returns a transaction that executes as [tx] and
then calls the given function with a view of the internal transaction log
restricted to the written locations. The returned transaction is then
composed after the transaction [tx].
The intended use case for [written] is to extend a transaction to signal
waiters in a blocking transaction mechanism:
{[
let rec blocking_tx tx =
let all_waiters = Loc.make [] in
match
tx
|> written (fun log ->
(* remove all waiters of all written locations
and add them to the [all_waiters] list. *)
)
|> attempt
with
| result ->
(* signal [all_waiters] *)
result
| exception Exit ->
blocking_tx tx
| exception Retry add_waiters_tx -> (
match attempt add_waiters_tx with
| () ->
(* blocking wait *)
blocking_tx tx
| exception Exit ->
(* Locations were already mutated before waiters could be added *)
blocking_tx tx)
]} *)The idea of (resetting and) extending transactions with the waiter operations is that this way the kcas mechanism itself checks whether the waiters should be added (as the read locations didn't change during the original transaction and the addition of waiters — if either fails then the transaction can be just retried without blocking) or signaled (as the mutations, including taking all the waiters, were completed successfully).
The above is only a preliminary idea. I have not yet fully implemented the above to verify it in practise.
Here is the blocking_tx example with proper highlighting:
let rec blocking_tx tx =
let all_waiters = Loc.make [] in
match
tx
|> written (fun log ->
(* remove all waiters of all written locations
and add them to the [all_waiters] list. *)
)
|> attempt
with
| result ->
(* signal [all_waiters] *)
result
| exception Exit ->
blocking_tx tx
| exception Retry add_waiters_tx -> (
match attempt add_waiters_tx with
| () ->
(* blocking wait *)
blocking_tx tx
| exception Exit ->
(* Locations were already mutated before waiters could be added *)
blocking_tx tx)Of course, a proper implementation would be a bit more complicated with things like backoff.