Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions bench/bench_spsc_queue.ml
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
open Lockfree

let item_count = 2_000_000

let rec try_until_success f =
try f () with Spsc_queue.Full -> try_until_success f
let rdv = Rendezvous.semaphore_unit

let run () =
let queue = Spsc_queue.create ~size_exponent:3 in
let queue = Spsc_queue.create ~size_exponent:10 in
let pusher =
Domain.spawn (fun () ->
let start_time = Unix.gettimeofday () in
for i = 1 to item_count do
try_until_success (fun () -> Spsc_queue.push queue i)
Spsc_queue.push ~rdv queue i
done;
start_time)
in
for _ = 1 to item_count do
while Option.is_none (Spsc_queue.pop queue) do
()
done
for i = 1 to item_count do
assert (Spsc_queue.pop ~rdv queue = i)
done;
let end_time = Unix.gettimeofday () in
let start_time = Domain.join pusher in
Expand Down
1 change: 1 addition & 0 deletions src/lockfree.ml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Copyright (c) 2017, Nicolas ASSOUAD <nicolas.assouad@ens.fr>
########
*)
module Rendezvous = Rendezvous
module Ws_deque = Ws_deque
module Spsc_queue = Spsc_queue
module Mpsc_queue = Mpsc_queue
Expand Down
1 change: 1 addition & 0 deletions src/lockfree.mli
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Copyright (c) 2017, Nicolas ASSOUAD <nicolas.assouad@ens.fr>
########
*)

module Rendezvous = Rendezvous
module Ws_deque = Ws_deque
module Spsc_queue = Spsc_queue
module Mpsc_queue = Mpsc_queue
Expand Down
67 changes: 67 additions & 0 deletions src/rendezvous.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
type 'a t = unit -> ('a -> unit) * (unit -> 'a)

let fail () = failwith "Rendezvous: double send"

let semaphore () =
let r = ref None in
let s = Semaphore.Binary.make false in
let send v =
r := Some v;
Semaphore.Binary.release s
and recv () =
Semaphore.Binary.acquire s;
Option.get !r
in
(send, recv)

let semaphore_unit () =
let s = Semaphore.Binary.make false in
let send () = Semaphore.Binary.release s
and recv () = Semaphore.Binary.acquire s in
(send, recv)

let backoff ?min_wait ?max_wait () =
let r = Atomic.make None in
let backoff = Backoff.create ?min_wait ?max_wait () in
let rec send v = if not (Atomic.compare_and_set r None (Some v)) then fail ()
and recv () =
match Atomic.get r with
| Some v -> v
| None ->
Backoff.once backoff;
recv ()
in
(send, recv)

let backoff_unit ?min_wait ?max_wait () =
let r = Atomic.make false in
let backoff = Backoff.create ?min_wait ?max_wait () in
let rec send () = if not (Atomic.compare_and_set r false true) then fail ()
and recv () =
if not (Atomic.get r) then (
Backoff.once backoff;
recv ())
in
(send, recv)

let spinlock () =
let r = Atomic.make None in
let rec send v = if not (Atomic.compare_and_set r None (Some v)) then fail ()
and recv () =
match Atomic.get r with
| Some v -> v
| None ->
Domain.cpu_relax ();
recv ()
in
(send, recv)

let spinlock_unit () =
let r = Atomic.make false in
let rec send () = if not (Atomic.compare_and_set r false true) then fail ()
and recv () =
if not (Atomic.get r) then (
Domain.cpu_relax ();
recv ())
in
(send, recv)
43 changes: 43 additions & 0 deletions src/rendezvous.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
(** One-shot synchronization between two domains.

When using a lockfree datastructure with a library providing fibers, like
[domainslib] or [eio], the rendezvous should be implemented in terms of the
blocking primitives provided by the scheduler. *)

type 'a t = unit -> ('a -> unit) * (unit -> 'a)
(** The type to create a rendezvous allowing two domains to synchronize
on a future ['a] value.

Given a rendezvous [rdv : 'a t], typical usage follows:

{[
let send, recv = rdv () in
Atomic.set release send; (* publish [send] somehere accessible to the other domain *)
let v = recv () in (* block until [send v] has been called and returns [v] *)
}]

The function [send] and [recv] must be called at most once.
*)

val semaphore : 'a t
(** [semaphore] is a rendezvous that uses a semaphore to suspend the domain.
Recommended when using raw domains without fibers. *)

val semaphore_unit : unit t
(** Same as [semaphore], specialized for [unit]. *)

val backoff : ?min_wait:int -> ?max_wait:int -> 'a t
(** [backoff] is a rendezvous that locks by spinning, performing a {! Backoff}
on each failure to make progress.
Recommended when using raw domains without fibers, if the operation is
expected to complete soon. *)

val backoff_unit : ?min_wait:int -> ?max_wait:int -> unit t
(** Same as [backoff], specialized for [unit]. *)

val spinlock : 'a t
(** [spinlock] is a rendezvous that locks by spinning, requiring no syscalls.
Not recommended as it can have terrible performances. *)

val spinlock_unit : unit t
(** Same as [spinlock], specialized for [unit]. *)
49 changes: 41 additions & 8 deletions src/spsc_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
*)

type 'a t = {
array : 'a Option.t Array.t;
array : 'a option array;
tail : int Atomic.t;
head : int Atomic.t;
mask : int;
lock : (unit -> unit) option Atomic.t;
}

exception Full
Expand All @@ -36,29 +37,61 @@ let create ~size_exponent =
head = Atomic.make 0;
tail = Atomic.make 0;
mask = size - 1;
array = Array.init size (fun _ -> None);
array = Array.make size None;
lock = Atomic.make None;
}

let push { array; head; tail; mask; _ } element =
let unlock lock =
if Atomic.get lock <> None then
match Atomic.exchange lock None with
| Some release -> release ()
| None -> ()

let try_push { array; head; tail; mask; lock } element =
let size = mask + 1 in
let head_val = Atomic.get head in
let tail_val = Atomic.get tail in
if head_val + size == tail_val then raise Full
let head_val = Atomic.get head in
if head_val + size = tail_val then false
else (
Array.set array (tail_val land mask) (Some element);
Atomic.set tail (tail_val + 1))
Atomic.set tail (tail_val + 1);
unlock lock;
true)

let pop { array; head; tail; mask; _ } =
let try_pop { array; head; tail; mask; lock } =
let head_val = Atomic.get head in
let tail_val = Atomic.get tail in
if head_val == tail_val then None
if head_val = tail_val then None
else
let index = head_val land mask in
let v = Array.get array index in
(* allow gc to collect it *)
Array.set array index None;
Atomic.set head (head_val + 1);
assert (Option.is_some v);
unlock lock;
v

let size { head; tail; _ } = Atomic.get tail - Atomic.get head

let wait ~rdv t expected_size =
let release, wait = rdv () in
let some_release = Some release in
if Atomic.compare_and_set t.lock None some_release then
if size t = expected_size then wait ()
else
let (_ : bool) = Atomic.compare_and_set t.lock some_release None in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I wonder whether this can cause problems? It is possible that another party either manages to change the size of the queue (before the first CAS) or call release (between the two CASes).

Related to this, to avoid capturing the stack unnecessarily (before the waiters are added), the kcas blocking draft uses three states: Init | Resumed | Waiting of .... Then both suspend and resume take the race condition (where resume happens before suspend is fully ready) into account.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess SPSC makes things a bit simpler here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's possible for another party to call release even though wait () will not be called... I think it's fine? (however, we do need to be careful when calling wait () otherwise it's possible for the queue to be unstuck just before we install our lock, followed by no more operations to wake us up)

(I believe the size t = expected_size is doing something similar to your three states to determine the Resumed state implicitly for the queue internals)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I had in mind is that for the case of release without matching wait one has to make sure that resources are properly released/consumed, but it might not be too much of a problem. Also, release without matching wait could potentially be a broken rendezvous. I guess in the SPSC case that is not an issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(To be clear, I totally agree that calling release when wait might not be called feels dirty when compared to your two-phases commit.. but I don't have an intuition for why this might break custom schedulers assumptions)

()
else unlock t.lock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... Did I understand correctly that if two calls to wait are being made, e.g. to pops when the queue is empty, then what happens is that the second call to wait actually wakes up the first one? So, in that case you get a very expensive busy wait?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah.. SPSC! Right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah :P This was a quick example that only works for SPSC, but in general multiple waiting pop would actually push their "release" function onto a queue. Then the next push would see the value queue as empty and pop/wake-up the first available pop.


let rec push ~rdv t element =
if not (try_push t element) then (
wait ~rdv t (t.mask + 1);
push ~rdv t element)

let rec pop ~rdv t =
Copy link
Contributor

@polytypic polytypic Mar 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also used a(n optional) named parameter to pass in the suspend/resume mechanism in my kcas blocking draft.

However, what I was thinking is that the suspend/resume (or rendezvous) mechanism could be something that can be obtained from the DLS, for example, at the point when it is needed.

A hypothetical rendezvous framework would provide a way to install a rendezvous mechanism:

type t = (* ((unit -> unit) -> unit) -> unit   OR   whatever  *)

val set : t -> unit
val get : unit -> t (* Raises or returns default mechanism if nothing has been explicitly set *)

Schedulers, like Eio, would call set to install the mechanism when they run their event loops. (Perhaps something like push and pop rather than set might be a better API.)

Facilities that need the ability then call get when they need it.

This would allow things like kcas to work independently of the scheduler(s). You could have domainslib running on some domains and Eio on others. And you could have communication between the domains — without having to know which mechanism to pass from which domain.

Copy link
Contributor

@polytypic polytypic Mar 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An earlier idea / proposal by, I believe, @deepali2806 and @kayceesrk , is to use a Suspend effect.

That would also work. I think the proposals are equally expressive (both can implement the other).

The reason I used the continuation passing style approach is that A) it does not require the suspend mechanism to capture a stack or perform an effect and B) it also doesn't prescribe a specific signature for such an effect. Regarding A, in my previous experiments, I've gotten the impression that effects do have higher costs than function calls. OTOH, it is not clear whether the ability to avoid capturing stacks is really useful — most of the time one would likely be using an effects based scheduler anyway. Regarding B, I think the various libraries currently have similar, but slightly different Suspend like effects — I'm not sure whether or not they could all just use the one and same effect.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Sudha247 is also involved in this effort with @deepali2806. We're using suspend effect now to safely use lazy from different threads. Think blackholing for thunk evaluation as in GHC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm yeah so I liked the idea of being explicit about blocking operations in the API (hence the parameter)... I don't think it's so bad to write ~rdv:dls if desired:

let dls : unit Rendezvous.t = fun () -> DLS.get dls_rendezvous_key ()

(But for example in domainslib, we need the pool argument to wait so I'm not sure we can expect schedulers to preconfigure the dls_rendezvous_key for the user in general)

Regarding effects, I think it's asking a lot more from schedulers to handle our own effect rather than define how to block in their library (it's also a pain for the user to setup if they wanted a real system lock?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... Where can I find the work on safe lazy?

Copy link
Contributor

@polytypic polytypic Mar 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Speaking of the different forms of Suspend. In my par-ml experiment I used the following:

type 'a continuation = ('a, unit) Effect.Deep.continuation
type _ Effect.t += Suspend : ('a continuation -> unit) -> 'a Effect.t

The reason I used those definitions is that the handler then becomes trivial:

let effc (type a) : a Effect.t -> _ = function
  | Suspend ef -> Some ef
  | _ -> None

I guess this

type resume_result = Resume_success | Resume_failure

type 'a resumer = 'a -> resume_result
type _ Effect.t += Suspend : ('a resumer -> bool) -> 'a Effect.t

is the current proposal?

I unfortunately haven't had time to think about this thoroughly, but I'm not immediately convinced that the features like resume_result and returning a bool are the best way to go. Are they necessary for best results or could the same be communicated through side-channels efficiently enough? How often are the features needed? (In the unified_interface repo the Resume_failure constructor does not seem to used at all.) They might be necessary, but I need to think about this. Also, are they sufficient? Is there some special case that would benefit from other special features? I assume these questions have been considered and I'd love to hear some more rationale behind the choices.

Also, there is a kind of arbitrary decision here to insert a function, an abstraction, at one point or another. In the approach I used in kcas and the rendezvous proposal here, the interface is very abstract — only simple function types (or pairs of functions) are being used. Why expose the concrete effect rather than abstract over it?

Usually designs that are more abstract have more lasting power. A very concrete design with specific affordances for all the special cases tends to make things more cumbersome.

But like I said, I unfortunately haven't had time to think about this thoroughly enough. And when I say think, I mean that one/I should actually try to implement realistic things with the various proposals and see how they compare (in usability and performance).

Copy link
Contributor

@polytypic polytypic Mar 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in domainslib, we need the pool argument to wait so I'm not sure we can expect schedulers to preconfigure the dls_rendezvous_key for the user in general

These sorts of schedulers, including domainslib and Eio (and every other similar effects based library I know of for OCaml), basically run a loop of some kind on the domains on which they run. I don't see why it would be problematic to install the rendezvous mechanism to DLS just before entering the loop.

It would also be fairly easy to provide a function, in domainslib, like get_pool: unit -> pool (or get_pool: unit -> pool option if you prefer) that obtains the pool with which the current domain is associated with.

One possibility is also to allow both: passing an optional parameter explicitly to tell how to block and, in the absence of such parameter, obtain the mechanism from DLS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't know why domainslib provides the ability to ping-pong between different domain pools... Yet another generic alternative might be to release the blocking domain into a shared pool like idle-domains and expect custom schedulers to integrate with that?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I unfortunately haven't had time to think about this thoroughly, but I'm not immediately convinced that the features like resume_result and returning a bool are the best way to go. Are they necessary for best results or could the same be communicated through side-channels efficiently enough? How often are the features needed? (In the unified_interface repo the Resume_failure constructor does not seem to used at all.) They might be necessary, but I need to think about this. Also, are they sufficient? Is there some special case that would benefit from other special features? I assume these questions have been considered and I'd love to hear some more rationale behind the choices

The return type of resumer captures the notion of cancellation of the task. It means if the particular scheduler cancels the task while it is still in the suspended state, we should avoid resuming it. The resumer is defined in such a way that if the task is live, it actually resumes the task and returns Resume_success when the corresponding resume function is called. In the alternate case, when the task is cancelled, it will return Resume_failure instead of actually resuming the task. In MVar implementation here, we are checking the resumer return value. When the resumer return value is Resume_failure it means we can skip through it and retry the operation again to get the next resumer.

Also, in the type signature of Suspend effect, we have a function that takes a resumer and returns a bool type. It signifies the thread safety while suspending the task. Thread safety, in this case, is achieved through lock-free implementation. It means we are using atomic operations like compare and swap (CAS). When such a CAS operation is successful, we will return the true value, indicating the push to the suspended queue is successful. In case of CAS failure, we have to retry the operation again to get the most recent state of the queue or MVar.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... Where can I find the work on safe lazy?

@Sudha247 has a development branch. This is very much "research" atm. Not intended to be upstreamed as is.

match try_pop t with
| Some v -> v
| None ->
wait ~rdv t 0;
pop ~rdv t
21 changes: 13 additions & 8 deletions src/spsc_queue.mli
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@ val create : size_exponent:int -> 'a t
(** [create ~size_exponent:int] creates a empty queue of size
[2^size_exponent]. *)

val push : 'a t -> 'a -> unit
(** [push q v] pushes [v] at the back of the queue.

@raise [Full] if the queue is full.
*)
val try_push : 'a t -> 'a -> bool
(** [try_push q v] attempts to push [v] at the back of the queue
and returns [true] on success, or [false] if the queue is full. *)

val pop : 'a t -> 'a option
(** [pop q] removes element from head of the queue, if any. This method can be used by
at most 1 thread at the time. *)
val push : rdv:unit Rendezvous.t -> 'a t -> 'a -> unit
(** [push ~rdv q v] pushes [v] at the back of the queue, potentially
blocking using [rdv] while the queue is full. *)

val try_pop : 'a t -> 'a option
(** [try_pop q] removes an element from the head of the queue, if any. *)

val pop : rdv:unit Rendezvous.t -> 'a t -> 'a
(** [pop q] removes an element from the head of the queue, potentially
blocking using [rdv] while the queue is empty. *)

val size : 'a t -> int
(** [size] returns the size of the queue. This method linearizes only when called
Expand Down
20 changes: 5 additions & 15 deletions test/spsc_queue/qcheck_spsc_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ let pop_n_times q n =
let rec loop count acc =
if count = 0 then acc
else
let v = Spsc_queue.pop q in
let v = Spsc_queue.try_pop q in
Domain.cpu_relax ();
loop (count - 1) (v :: acc)
in
Expand All @@ -33,12 +33,7 @@ let tests =

(* Sequential pushed : not Full exception should be
raised. *)
let not_full_queue =
try
List.iter (Spsc_queue.push q) l;
true
with Spsc_queue.Full -> false
in
let not_full_queue = List.for_all (Spsc_queue.try_push q) l in

(* Consumer domain pops *)
let consumer = Domain.spawn (fun () -> pop_n_times q npop) in
Expand All @@ -63,7 +58,7 @@ let tests =

(* Initialization *)
let q = Spsc_queue.create ~size_exponent in
List.iter (Spsc_queue.push q) l;
List.iter (fun x -> assert (Spsc_queue.try_push q x)) l;

(* Consumer pops *)
let sema = Semaphore.Binary.make false in
Expand All @@ -82,7 +77,7 @@ let tests =
(* Main domain pushes.*)
List.iter
(fun elt ->
Spsc_queue.push q elt;
assert (Spsc_queue.try_push q elt);
Domain.cpu_relax ())
l')
in
Expand All @@ -103,12 +98,7 @@ let tests =

(* Initialization *)
let q = Spsc_queue.create ~size_exponent in
let is_full =
try
List.iter (Spsc_queue.push q) l;
false
with Spsc_queue.Full -> true
in
let is_full = not (List.for_all (Spsc_queue.try_push q) l) in

(* Property *)
(List.length l > size_max && is_full)
Expand Down
16 changes: 8 additions & 8 deletions test/spsc_queue/spsc_queue_dscheck.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@ let create_test ~shift_by () =

(* shift the queue, that helps testing overlap handling *)
for _ = 1 to shift_by do
Spsc_queue.push queue (-1);
assert (Option.is_some (Spsc_queue.pop queue))
assert (Spsc_queue.try_push queue (-1));
assert (Option.is_some (Spsc_queue.try_pop queue))
done;

(* enqueuer *)
Atomic.spawn (fun () ->
for i = 1 to items_count do
Spsc_queue.push queue i
assert (Spsc_queue.try_push queue i)
done);

(* dequeuer *)
let dequeued = ref 0 in
Atomic.spawn (fun () ->
for _ = 1 to items_count + 1 do
match Spsc_queue.pop queue with
match Spsc_queue.try_pop queue with
| None -> ()
| Some v ->
assert (v = !dequeued + 1);
Expand All @@ -34,17 +34,17 @@ let with_trace ?(shift_by = 0) f () = Atomic.trace (fun () -> f ~shift_by ())
let size_linearizes_with_1_thr () =
Atomic.trace (fun () ->
let queue = Spsc_queue.create ~size_exponent:4 in
Spsc_queue.push queue (-1);
Spsc_queue.push queue (-1);
assert (Spsc_queue.try_push queue (-1));
assert (Spsc_queue.try_push queue (-1));

Atomic.spawn (fun () ->
for _ = 1 to 4 do
Spsc_queue.push queue (-1)
assert (Spsc_queue.try_push queue (-1))
done);

let size = ref 0 in
Atomic.spawn (fun () ->
assert (Option.is_some (Spsc_queue.pop queue));
assert (Option.is_some (Spsc_queue.try_pop queue));
size := Spsc_queue.size queue);

Atomic.final (fun () -> Atomic.check (fun () -> 1 <= !size && !size <= 5)))
Expand Down
Loading