-
Notifications
You must be signed in to change notification settings - Fork 13
Factor batching logic out of the cohttp-lwt client #97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| type 'a t = { | ||
| mutable size: int; | ||
| mutable q: 'a list; | ||
| (* The queue is a FIFO represented as a list in reverse order *) | ||
| batch: int; (* Minimum size to batch before popping *) | ||
| high_watermark: int; | ||
| timeout: Mtime.span option; | ||
| mutable start: Mtime.t; | ||
| } | ||
|
|
||
| let default_high_watermark batch_size = | ||
| if batch_size = 1 then | ||
| 100 | ||
| else | ||
| batch_size * 10 | ||
|
|
||
| let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t = | ||
| let high_watermark = | ||
| match high_watermark with | ||
| | Some x -> x | ||
| | None -> default_high_watermark batch | ||
| in | ||
| let start = | ||
| match now with | ||
| | Some x -> x | ||
| | None -> Mtime_clock.now () | ||
| in | ||
| assert (batch > 0); | ||
| { size = 0; q = []; start; batch; timeout; high_watermark } | ||
|
|
||
| let timeout_expired_ ~now self : bool = | ||
| match self.timeout with | ||
| | Some t -> | ||
| let elapsed = Mtime.span now self.start in | ||
| Mtime.Span.compare elapsed t >= 0 | ||
| | None -> false | ||
|
|
||
| (* Big enough to send a batch *) | ||
| let is_full_ self : bool = self.size >= self.batch | ||
|
|
||
| let ready_to_pop ~force ~now self = | ||
| self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self) | ||
|
|
||
| let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = | ||
| if ready_to_pop ~force ~now self then ( | ||
| assert (self.q <> []); | ||
| let batch = | ||
| (* Reverse the list to retrieve the FIFO order. *) | ||
| List.rev self.q | ||
| in | ||
| self.q <- []; | ||
| self.size <- 0; | ||
| Some batch | ||
| ) else | ||
| None | ||
|
|
||
| (* Helper so we can count new elements and prepend them onto the existing [q] in | ||
| one pass. *) | ||
| let append_with_count ~(elems : 'a list) ~(q : 'a list) : int * 'a list = | ||
c-cube marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| elems |> List.fold_left (fun (count, q') x -> succ count, x :: q') (0, q) | ||
|
|
||
| let push (self : _ t) elems : [ `Dropped | `Ok ] = | ||
| if self.size >= self.high_watermark then | ||
| (* drop this to prevent queue from growing too fast *) | ||
| `Dropped | ||
| else ( | ||
| if self.size = 0 && Option.is_some self.timeout then | ||
| (* current batch starts now *) | ||
| self.start <- Mtime_clock.now (); | ||
|
|
||
| let count, q' = append_with_count ~elems ~q:self.q in | ||
| (* add to queue *) | ||
| self.size <- self.size + count; | ||
| self.q <- q'; | ||
| `Ok | ||
| ) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| (** A thread-safe batch of resources to be popper when ready . *) | ||
|
|
||
| type 'a t | ||
|
|
||
| val make : | ||
| ?batch:int -> | ||
| ?high_watermark:int -> | ||
| ?now:Mtime.t -> | ||
| ?timeout:Mtime.span -> | ||
| unit -> | ||
| 'a t | ||
| (** [make ()] is a new batch | ||
|
|
||
| @param batch | ||
| the number of elements after which the batch will be considered {b full}, | ||
| and ready to pop. Set to [0] to disable batching. It is required that | ||
| [batch >= 0]. Default [1]. | ||
|
|
||
| @param high_watermark | ||
| the batch size limit after which new elements will be [`Dropped] by | ||
| {!push}. This prevents the queue from growing too fast for effective | ||
| transmission in case of signal floods. Default | ||
| [if batch = 1 then 100 else batch * 10]. | ||
|
|
||
| @param now the current time. Default [Mtime_clock.now ()]. | ||
|
|
||
| @param timeout | ||
| the time span after which a batch is ready to pop, whether or not it is | ||
| {b full}. *) | ||
|
|
||
| val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option | ||
| (** [pop_if_ready ~now b] is [Some xs], where is [xs] includes all the elements | ||
| {!push}ed since the last batch, if the batch ready to be emitted. | ||
|
|
||
| A batch is ready to pop if it contains some elements and | ||
|
|
||
| - batching is disabled, and any elements have been batched, or batching was | ||
| enabled and at least [batch] elements have been pushed, or | ||
| - a [timeout] was provided, and more than a [timeout] span has passed since | ||
| the last pop was ready, or | ||
| - the pop is [force]d, | ||
|
|
||
| @param now the current time | ||
|
|
||
| @param force | ||
| override the other batch conditions, for when when we just want to emit | ||
| batches before exit or because the user asks for it *) | ||
|
|
||
| val push : 'a t -> 'a list -> [ `Dropped | `Ok ] | ||
| (** [push b xs] is [`Ok] if it succeeds in pushing the values in [xs] into the batch | ||
| [b], or [`Dropped] if the current size of the batch has exceeded the high water | ||
| mark determined by the [batch] argument to {!make}]. ) *) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| (library | ||
| (name opentelemetry_client) | ||
| (public_name opentelemetry.client) | ||
| (libraries opentelemetry pbrt) | ||
| (libraries opentelemetry pbrt mtime mtime.clock.os) | ||
| (synopsis "Common types and logic shared between client implementations")) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.