-
Notifications
You must be signed in to change notification settings - Fork 31
Rendezvous and demonstration on Spsc_queue API #68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| type 'a t = unit -> ('a -> unit) * (unit -> 'a) | ||
|
|
||
| let fail () = failwith "Rendezvous: double send" | ||
|
|
||
| let semaphore () = | ||
| let r = ref None in | ||
| let s = Semaphore.Binary.make false in | ||
| let send v = | ||
| r := Some v; | ||
| Semaphore.Binary.release s | ||
| and recv () = | ||
| Semaphore.Binary.acquire s; | ||
| Option.get !r | ||
| in | ||
| (send, recv) | ||
|
|
||
| let semaphore_unit () = | ||
| let s = Semaphore.Binary.make false in | ||
| let send () = Semaphore.Binary.release s | ||
| and recv () = Semaphore.Binary.acquire s in | ||
| (send, recv) | ||
|
|
||
| let backoff ?min_wait ?max_wait () = | ||
| let r = Atomic.make None in | ||
| let backoff = Backoff.create ?min_wait ?max_wait () in | ||
| let rec send v = if not (Atomic.compare_and_set r None (Some v)) then fail () | ||
| and recv () = | ||
| match Atomic.get r with | ||
| | Some v -> v | ||
| | None -> | ||
| Backoff.once backoff; | ||
| recv () | ||
| in | ||
| (send, recv) | ||
|
|
||
| let backoff_unit ?min_wait ?max_wait () = | ||
| let r = Atomic.make false in | ||
| let backoff = Backoff.create ?min_wait ?max_wait () in | ||
| let rec send () = if not (Atomic.compare_and_set r false true) then fail () | ||
| and recv () = | ||
| if not (Atomic.get r) then ( | ||
| Backoff.once backoff; | ||
| recv ()) | ||
| in | ||
| (send, recv) | ||
|
|
||
| let spinlock () = | ||
| let r = Atomic.make None in | ||
| let rec send v = if not (Atomic.compare_and_set r None (Some v)) then fail () | ||
| and recv () = | ||
| match Atomic.get r with | ||
| | Some v -> v | ||
| | None -> | ||
| Domain.cpu_relax (); | ||
| recv () | ||
| in | ||
| (send, recv) | ||
|
|
||
| let spinlock_unit () = | ||
| let r = Atomic.make false in | ||
| let rec send () = if not (Atomic.compare_and_set r false true) then fail () | ||
| and recv () = | ||
| if not (Atomic.get r) then ( | ||
| Domain.cpu_relax (); | ||
| recv ()) | ||
| in | ||
| (send, recv) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| (** One-shot synchronization between two domains. | ||
|
|
||
| When using a lockfree datastructure with a library providing fibers, like | ||
| [domainslib] or [eio], the rendezvous should be implemented in terms of the | ||
| blocking primitives provided by the scheduler. *) | ||
|
|
||
| type 'a t = unit -> ('a -> unit) * (unit -> 'a) | ||
| (** The type to create a rendezvous allowing two domains to synchronize | ||
| on a future ['a] value. | ||
|
|
||
| Given a rendezvous [rdv : 'a t], typical usage follows: | ||
|
|
||
| {[ | ||
| let send, recv = rdv () in | ||
| Atomic.set release send; (* publish [send] somehere accessible to the other domain *) | ||
| let v = recv () in (* block until [send v] has been called and returns [v] *) | ||
| }] | ||
|
|
||
| The function [send] and [recv] must be called at most once. | ||
| *) | ||
|
|
||
| val semaphore : 'a t | ||
| (** [semaphore] is a rendezvous that uses a semaphore to suspend the domain. | ||
| Recommended when using raw domains without fibers. *) | ||
|
|
||
| val semaphore_unit : unit t | ||
| (** Same as [semaphore], specialized for [unit]. *) | ||
|
|
||
| val backoff : ?min_wait:int -> ?max_wait:int -> 'a t | ||
| (** [backoff] is a rendezvous that locks by spinning, performing a {! Backoff} | ||
| on each failure to make progress. | ||
| Recommended when using raw domains without fibers, if the operation is | ||
| expected to complete soon. *) | ||
|
|
||
| val backoff_unit : ?min_wait:int -> ?max_wait:int -> unit t | ||
| (** Same as [backoff], specialized for [unit]. *) | ||
|
|
||
| val spinlock : 'a t | ||
| (** [spinlock] is a rendezvous that locks by spinning, requiring no syscalls. | ||
| Not recommended as it can have terrible performances. *) | ||
|
|
||
| val spinlock_unit : unit t | ||
| (** Same as [spinlock], specialized for [unit]. *) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,10 +22,11 @@ | |
| *) | ||
|
|
||
| type 'a t = { | ||
| array : 'a Option.t Array.t; | ||
| array : 'a option array; | ||
| tail : int Atomic.t; | ||
| head : int Atomic.t; | ||
| mask : int; | ||
| lock : (unit -> unit) option Atomic.t; | ||
| } | ||
|
|
||
| exception Full | ||
|
|
@@ -36,29 +37,61 @@ let create ~size_exponent = | |
| head = Atomic.make 0; | ||
| tail = Atomic.make 0; | ||
| mask = size - 1; | ||
| array = Array.init size (fun _ -> None); | ||
| array = Array.make size None; | ||
| lock = Atomic.make None; | ||
| } | ||
|
|
||
| let push { array; head; tail; mask; _ } element = | ||
| let unlock lock = | ||
| if Atomic.get lock <> None then | ||
| match Atomic.exchange lock None with | ||
| | Some release -> release () | ||
| | None -> () | ||
|
|
||
| let try_push { array; head; tail; mask; lock } element = | ||
| let size = mask + 1 in | ||
| let head_val = Atomic.get head in | ||
| let tail_val = Atomic.get tail in | ||
| if head_val + size == tail_val then raise Full | ||
| let head_val = Atomic.get head in | ||
| if head_val + size = tail_val then false | ||
| else ( | ||
| Array.set array (tail_val land mask) (Some element); | ||
| Atomic.set tail (tail_val + 1)) | ||
| Atomic.set tail (tail_val + 1); | ||
| unlock lock; | ||
| true) | ||
|
|
||
| let pop { array; head; tail; mask; _ } = | ||
| let try_pop { array; head; tail; mask; lock } = | ||
| let head_val = Atomic.get head in | ||
| let tail_val = Atomic.get tail in | ||
| if head_val == tail_val then None | ||
| if head_val = tail_val then None | ||
| else | ||
| let index = head_val land mask in | ||
| let v = Array.get array index in | ||
| (* allow gc to collect it *) | ||
| Array.set array index None; | ||
| Atomic.set head (head_val + 1); | ||
| assert (Option.is_some v); | ||
| unlock lock; | ||
| v | ||
|
|
||
| let size { head; tail; _ } = Atomic.get tail - Atomic.get head | ||
|
|
||
| let wait ~rdv t expected_size = | ||
| let release, wait = rdv () in | ||
| let some_release = Some release in | ||
| if Atomic.compare_and_set t.lock None some_release then | ||
| if size t = expected_size then wait () | ||
| else | ||
| let (_ : bool) = Atomic.compare_and_set t.lock some_release None in | ||
| () | ||
| else unlock t.lock | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... Did I understand correctly that if two calls to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah.. SPSC! Right.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah :P This was a quick example that only works for SPSC, but in general multiple waiting |
||
|
|
||
| let rec push ~rdv t element = | ||
| if not (try_push t element) then ( | ||
| wait ~rdv t (t.mask + 1); | ||
| push ~rdv t element) | ||
|
|
||
| let rec pop ~rdv t = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also used a(n optional) named parameter to pass in the suspend/resume mechanism in my kcas blocking draft. However, what I was thinking is that the suspend/resume (or rendezvous) mechanism could be something that can be obtained from the DLS, for example, at the point when it is needed. A hypothetical rendezvous framework would provide a way to install a rendezvous mechanism: type t = (* ((unit -> unit) -> unit) -> unit OR whatever *)
val set : t -> unit
val get : unit -> t (* Raises or returns default mechanism if nothing has been explicitly set *)Schedulers, like Eio, would call Facilities that need the ability then call This would allow things like kcas to work independently of the scheduler(s). You could have domainslib running on some domains and Eio on others. And you could have communication between the domains — without having to know which mechanism to pass from which domain.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An earlier idea / proposal by, I believe, @deepali2806 and @kayceesrk , is to use a That would also work. I think the proposals are equally expressive (both can implement the other). The reason I used the continuation passing style approach is that A) it does not require the suspend mechanism to capture a stack or perform an effect and B) it also doesn't prescribe a specific signature for such an effect. Regarding A, in my previous experiments, I've gotten the impression that effects do have higher costs than function calls. OTOH, it is not clear whether the ability to avoid capturing stacks is really useful — most of the time one would likely be using an effects based scheduler anyway. Regarding B, I think the various libraries currently have similar, but slightly different
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Sudha247 is also involved in this effort with @deepali2806. We're using suspend effect now to safely use lazy from different threads. Think blackholing for thunk evaluation as in GHC.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm yeah so I liked the idea of being explicit about blocking operations in the API (hence the parameter)... I don't think it's so bad to write let dls : unit Rendezvous.t = fun () -> DLS.get dls_rendezvous_key ()(But for example in Regarding effects, I think it's asking a lot more from schedulers to handle our own effect rather than define how to block in their library (it's also a pain for the user to setup if they wanted a real system lock?)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... Where can I find the work on safe lazy?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Speaking of the different forms of type 'a continuation = ('a, unit) Effect.Deep.continuation
type _ Effect.t += Suspend : ('a continuation -> unit) -> 'a Effect.tThe reason I used those definitions is that the handler then becomes trivial: let effc (type a) : a Effect.t -> _ = function
| Suspend ef -> Some ef
| _ -> NoneI guess this type resume_result = Resume_success | Resume_failure
type 'a resumer = 'a -> resume_result
type _ Effect.t += Suspend : ('a resumer -> bool) -> 'a Effect.tis the current proposal? I unfortunately haven't had time to think about this thoroughly, but I'm not immediately convinced that the features like Also, there is a kind of arbitrary decision here to insert a function, an abstraction, at one point or another. In the approach I used in kcas and the rendezvous proposal here, the interface is very abstract — only simple function types (or pairs of functions) are being used. Why expose the concrete effect rather than abstract over it? Usually designs that are more abstract have more lasting power. A very concrete design with specific affordances for all the special cases tends to make things more cumbersome. But like I said, I unfortunately haven't had time to think about this thoroughly enough. And when I say think, I mean that one/I should actually try to implement realistic things with the various proposals and see how they compare (in usability and performance).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
These sorts of schedulers, including It would also be fairly easy to provide a function, in One possibility is also to allow both: passing an optional parameter explicitly to tell how to block and, in the absence of such parameter, obtain the mechanism from DLS.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I don't know why There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The return type of resumer captures the notion of cancellation of the task. It means if the particular scheduler cancels the task while it is still in the suspended state, we should avoid resuming it. The resumer is defined in such a way that if the task is live, it actually resumes the task and returns Also, in the type signature of Suspend effect, we have a function that takes a resumer and returns a
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@Sudha247 has a development branch. This is very much "research" atm. Not intended to be upstreamed as is. |
||
| match try_pop t with | ||
| | Some v -> v | ||
| | None -> | ||
| wait ~rdv t 0; | ||
| pop ~rdv t | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... I wonder whether this can cause problems? It is possible that another party either manages to change the size of the queue (before the first CAS) or call
release(between the two CASes).Related to this, to avoid capturing the stack unnecessarily (before the waiters are added), the kcas blocking draft uses three states:
Init | Resumed | Waiting of .... Then both suspend and resume take the race condition (where resume happens before suspend is fully ready) into account.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess SPSC makes things a bit simpler here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it's possible for another party to call
releaseeven thoughwait ()will not be called... I think it's fine? (however, we do need to be careful when callingwait ()otherwise it's possible for the queue to be unstuck just before we install our lock, followed by no more operations to wake us up)(I believe the
size t = expected_sizeis doing something similar to your three states to determine theResumedstate implicitly for the queue internals)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I had in mind is that for the case of
releasewithout matchingwaitone has to make sure that resources are properly released/consumed, but it might not be too much of a problem. Also,releasewithout matchingwaitcould potentially be a broken rendezvous. I guess in the SPSC case that is not an issue.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(To be clear, I totally agree that calling
releasewhenwaitmight not be called feels dirty when compared to your two-phases commit.. but I don't have an intuition for why this might break custom schedulers assumptions)