Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 15 additions & 95 deletions src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
module OT = Opentelemetry
module Config = Config
module Signal = Opentelemetry_client.Signal
module Batch = Opentelemetry_client.Batch
open Opentelemetry
open Common_

Expand Down Expand Up @@ -164,88 +165,6 @@ end = struct
)
end

(** Batch of resources to be pushed later.

This type is thread-safe. *)
module Batch : sig
type 'a t

val push' : 'a t -> 'a -> unit

val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option
(** Is the batch ready to be emitted? If batching is disabled, this is true as
soon as {!is_empty} is false. If a timeout is provided for this batch,
then it will be ready if an element has been in it for at least the
timeout.
@param now passed to implement timeout *)

val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t
(** Create a new batch *)
end = struct
type 'a t = {
mutable size: int;
mutable q: 'a list;
batch: int option;
high_watermark: int;
timeout: Mtime.span option;
mutable start: Mtime.t;
}

let make ?batch ?timeout () : _ t =
Option.iter (fun b -> assert (b > 0)) batch;
let high_watermark = Option.fold ~none:100 ~some:(fun x -> x * 10) batch in
{
size = 0;
start = Mtime_clock.now ();
q = [];
batch;
timeout;
high_watermark;
}

let timeout_expired_ ~now self : bool =
match self.timeout with
| Some t ->
let elapsed = Mtime.span now self.start in
Mtime.Span.compare elapsed t >= 0
| None -> false

let is_full_ self : bool =
match self.batch with
| None -> self.size > 0
| Some b -> self.size >= b

let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
if self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self)
then (
let l = self.q in
self.q <- [];
self.size <- 0;
assert (l <> []);
Some l
) else
None

let push (self : _ t) x : bool =
if self.size >= self.high_watermark then (
(* drop this to prevent queue from growing too fast *)
Atomic.incr n_dropped;
true
) else (
if self.size = 0 && Option.is_some self.timeout then
(* current batch starts now *)
self.start <- Mtime_clock.now ();

(* add to queue *)
self.size <- 1 + self.size;
self.q <- x :: self.q;
let ready = is_full_ self in
ready
)

let push' self x = ignore (push self x : bool)
end

(** An emitter. This is used by {!Backend} below to forward traces/metrics/…
from the program to whatever collector client we have. *)
module type EMITTER = sig
Expand Down Expand Up @@ -280,13 +199,13 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
else
None

let batch_traces : Trace.resource_spans list Batch.t =
let batch_traces : Trace.resource_spans Batch.t =
Batch.make ?batch:config.batch_traces ?timeout ()

let batch_metrics : Metrics.resource_metrics list Batch.t =
let batch_metrics : Metrics.resource_metrics Batch.t =
Batch.make ?batch:config.batch_metrics ?timeout ()

let batch_logs : Logs.resource_logs list Batch.t =
let batch_logs : Logs.resource_logs Batch.t =
Batch.make ?batch:config.batch_logs ?timeout ()

let on_tick_cbs_ = Atomic.make (AList.make ())
Expand Down Expand Up @@ -317,13 +236,9 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let send_logs_http client (l : Logs.resource_logs list) =
Conv.logs l |> send_http_ client ~url:config.url_logs

let maybe_pop ?force ~now batch =
Batch.pop_if_ready ?force ~now batch
|> Option.map (List.fold_left (fun acc l -> List.rev_append l acc) [])

(* emit metrics, if the batch is full or timeout lapsed *)
let emit_metrics_maybe ~now ?force httpc : bool Lwt.t =
match maybe_pop ?force ~now batch_metrics with
match Batch.pop_if_ready ?force ~now batch_metrics with
| None -> Lwt.return false
| Some l ->
let batch = !gc_metrics @ l in
Expand All @@ -332,14 +247,14 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
true

let emit_traces_maybe ~now ?force httpc : bool Lwt.t =
match maybe_pop ?force ~now batch_traces with
match Batch.pop_if_ready ?force ~now batch_traces with
| None -> Lwt.return false
| Some l ->
let+ () = send_traces_http httpc l in
true

let emit_logs_maybe ~now ?force httpc : bool Lwt.t =
match maybe_pop ?force ~now batch_logs with
match Batch.pop_if_ready ?force ~now batch_logs with
| None -> Lwt.return false
| Some l ->
let+ () = send_logs_http httpc l in
Expand Down Expand Up @@ -381,9 +296,14 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
can also be several user threads that produce spans and call
the emit functions. *)

let push_to_batch b e =
match Batch.push b e with
| `Ok -> ()
| `Dropped -> Atomic.incr n_errors

let push_trace e =
let@ () = guard_exn_ "push trace" in
Batch.push' batch_traces e;
push_to_batch batch_traces e;
let now = Mtime_clock.now () in
Lwt.async (fun () ->
let+ (_ : bool) = emit_traces_maybe ~now httpc in
Expand All @@ -392,15 +312,15 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let push_metrics e =
let@ () = guard_exn_ "push metrics" in
sample_gc_metrics_if_needed ();
Batch.push' batch_metrics e;
push_to_batch batch_metrics e;
let now = Mtime_clock.now () in
Lwt.async (fun () ->
let+ (_ : bool) = emit_metrics_maybe ~now httpc in
())

let push_logs e =
let@ () = guard_exn_ "push logs" in
Batch.push' batch_logs e;
push_to_batch batch_logs e;
let now = Mtime_clock.now () in
Lwt.async (fun () ->
let+ (_ : bool) = emit_logs_maybe ~now httpc in
Expand Down
76 changes: 76 additions & 0 deletions src/client/batch.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
type 'a t = {
mutable size: int;
mutable q: 'a list;
(* The queue is a FIFO represented as a list in reverse order *)
batch: int; (* Minimum size to batch before popping *)
high_watermark: int;
timeout: Mtime.span option;
mutable start: Mtime.t;
}

let default_high_watermark batch_size =
if batch_size = 1 then
100
else
batch_size * 10

let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t =
let high_watermark =
match high_watermark with
| Some x -> x
| None -> default_high_watermark batch
in
let start =
match now with
| Some x -> x
| None -> Mtime_clock.now ()
in
assert (batch > 0);
{ size = 0; q = []; start; batch; timeout; high_watermark }

let timeout_expired_ ~now self : bool =
match self.timeout with
| Some t ->
let elapsed = Mtime.span now self.start in
Mtime.Span.compare elapsed t >= 0
| None -> false

(* Big enough to send a batch *)
let is_full_ self : bool = self.size >= self.batch

let ready_to_pop ~force ~now self =
self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self)

let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
if ready_to_pop ~force ~now self then (
assert (self.q <> []);
let batch =
(* Reverse the list to retrieve the FIFO order. *)
List.rev self.q
in
self.q <- [];
self.size <- 0;
Some batch
) else
None

(* Helper so we can count new elements and prepend them onto the existing [q] in
one pass. *)
let append_with_count ~(elems : 'a list) ~(q : 'a list) : int * 'a list =
elems |> List.fold_left (fun (count, q') x -> succ count, x :: q') (0, q)

let push (self : _ t) elems : [ `Dropped | `Ok ] =
if self.size >= self.high_watermark then
(* drop this to prevent queue from growing too fast *)
`Dropped
else (
if self.size = 0 && Option.is_some self.timeout then
(* current batch starts now *)
self.start <- Mtime_clock.now ();

let count, q' = append_with_count ~elems ~q:self.q in
(* add to queue *)
self.size <- self.size + count;
self.q <- q';
`Ok
)
52 changes: 52 additions & 0 deletions src/client/batch.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
(** A thread-safe batch of resources to be popper when ready . *)

type 'a t

val make :
?batch:int ->
?high_watermark:int ->
?now:Mtime.t ->
?timeout:Mtime.span ->
unit ->
'a t
(** [make ()] is a new batch

@param batch
the number of elements after which the batch will be considered {b full},
and ready to pop. Set to [0] to disable batching. It is required that
[batch >= 0]. Default [1].

@param high_watermark
the batch size limit after which new elements will be [`Dropped] by
{!push}. This prevents the queue from growing too fast for effective
transmission in case of signal floods. Default
[if batch = 1 then 100 else batch * 10].

@param now the current time. Default [Mtime_clock.now ()].

@param timeout
the time span after which a batch is ready to pop, whether or not it is
{b full}. *)

val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option
(** [pop_if_ready ~now b] is [Some xs], where is [xs] includes all the elements
{!push}ed since the last batch, if the batch ready to be emitted.

A batch is ready to pop if it contains some elements and

- batching is disabled, and any elements have been batched, or batching was
enabled and at least [batch] elements have been pushed, or
- a [timeout] was provided, and more than a [timeout] span has passed since
the last pop was ready, or
- the pop is [force]d,

@param now the current time

@param force
override the other batch conditions, for when when we just want to emit
batches before exit or because the user asks for it *)

val push : 'a t -> 'a list -> [ `Dropped | `Ok ]
(** [push b xs] is [`Ok] if it succeeds in pushing the values in [xs] into the batch
[b], or [`Dropped] if the current size of the batch has exceeded the high water
mark determined by the [batch] argument to {!make}]. ) *)
2 changes: 1 addition & 1 deletion src/client/dune
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(library
(name opentelemetry_client)
(public_name opentelemetry.client)
(libraries opentelemetry pbrt)
(libraries opentelemetry pbrt mtime mtime.clock.os)
(synopsis "Common types and logic shared between client implementations"))
Loading