Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
strict_validation,
warn_export_vars,
warn_exported_vars,
warn_untyped_record,
warn_missing_spec,
warn_missing_doc
warn_untyped_record
% warn_missing_spec,
% warn_missing_doc
]}.

{deps, []}.
Expand All @@ -29,6 +29,10 @@
deprecated_functions
]}.

{dialyzer, [
{warnings, [no_return, error_handling, unknown]}
]}.

{profiles, [
{test, [
{deps, [{proper, "1.4.0"}]},
Expand Down
16 changes: 16 additions & 0 deletions rebar.config.script
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
%% -*- erlang -*-
{ErlOpts0, Config0} =
case lists:keytake(erl_opts, 1, CONFIG) of
false -> {[], CONFIG};
{value, {erl_opts, D}, Cfg0} -> {D, Cfg0}
end,

ErlOpts =
case list_to_integer(erlang:system_info(otp_release)) of
N when N >= 27 ->
[warn_missing_spec, warn_missing_doc | ErlOpts0];
_ ->
ErlOpts0
end,

[{erl_opts, ErlOpts} | Config0].
11 changes: 7 additions & 4 deletions src/ddskerl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@ DDSketch behaviour.
-if(?OTP_RELEASE >= 26).
?DOC("Options for the DDSketch.").
-type opts() :: #{atom() := dynamic()}.
?DOC("DDSketch instance.").
-type ddsketch() :: dynamic().
-else.
?DOC("Options for the DDSketch.").
-type opts() :: #{atom() := _}.
?DOC("DDSketch instance.").
-type ddsketch() :: _.
-endif.

?DOC("DDSketch instance.").
-type ddsketch() ::
ddskerl_std:ddsketch()
| ddskerl_bound:ddsketch()
| ddskerl_ets:ddsketch()
| ddskerl_counters:ddsketch().

-export_type([opts/0, ddsketch/0]).

?DOC("Create a new DDSketch instance.").
Expand Down
72 changes: 72 additions & 0 deletions src/ddskerl.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
-ifndef('DDSKERL_HRL').

-record(ddskerl_exact, {
data = #{} :: #{number() => non_neg_integer()},
total = 0 :: non_neg_integer(),
sum = 0 :: number()
}).

-record(ddskerl_std, {
data = #{} :: #{non_neg_integer() => non_neg_integer()},
total = 0 :: non_neg_integer(),
sum = 0 :: number(),
min :: undefined | number(),
max = 0 :: number(),
gamma :: float(),
inv_log_gamma :: float()
}).

-record(ddskerl_bound, {
data = gb_trees:empty() :: gb_trees:tree(non_neg_integer(), non_neg_integer()),
total = 0 :: non_neg_integer(),
min :: undefined | number(),
max = 0 :: number(),
sum = 0 :: number(),
bound :: non_neg_integer(),
gamma :: float(),
inv_log_gamma :: float()
}).

%% Total keeps track of the total count
%% overflow of values that escape the summary above the maximum bucket
%% underflow of values that escape the summary below the minimum bucket
-define(E_BOUND_POS, 2).
-define(E_GAMMA_POS, 3).
-define(E_INV_LOG_GAMMA_POS, 4).
-define(E_TOTAL_POS, 5).
-define(E_SUM_POS, 6).
-define(E_MIN_POS, 7).
-define(E_MAX_POS, 8).
-define(E_OVERFLOW_POS, 9).
-define(E_UNDERFLOW_POS, 10).
-define(E_PREFIX, 10).
-define(E_MIN_INT, (0)).
-define(E_MAX_INT, (1 bsl 64 - 1)).

-record(ddskerl_ets, {
ref :: ets:tab(),
name :: term()
}).

%% - total keeps track of the total count
%% - underflow of values that escape the summary below the minimum bucket: the interval (0,1]
%% - in between we find all the buckets
%% - overflow of values that escape the summary above the maximum bucket
-define(C_TOTAL_POS, 1).
-define(C_SUM_POS, 2).
-define(C_UNDERFLOW_POS, 3).
-define(C_EXTRA_KEYS, 4).
-define(C_PREFIX, 3).
-define(C_OVERFLOW_POS(Bound), ?C_EXTRA_KEYS + Bound).
-define(C_MAX_INT, (1 bsl 64 - 1)).

-record(ddskerl_counters, {
ref :: counters:counters_ref(),
min_max :: atomics:atomics_ref(),
width :: non_neg_integer(),
bound :: non_neg_integer(),
gamma :: float(),
inv_log_gamma :: float()
}).

-endif.
29 changes: 10 additions & 19 deletions src/ddskerl_bound.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,48 +47,39 @@ start_link(Opts) ->
> ```
""").

-include("./ddskerl.hrl").

-behaviour(ddskerl).

-export([new/1, total/1, sum/1, insert/2, merge/2, quantile/2]).

-record(ddskerl_bound, {
data = gb_trees:empty() :: gb_trees:tree(non_neg_integer(), non_neg_integer()),
total = 0 :: non_neg_integer(),
min :: undefined | number(),
max = 0 :: number(),
sum = 0 :: number(),
bound :: non_neg_integer(),
gamma :: float(),
inv_log_gamma :: float()
}).

?DOC("Options for the DDSketch.").
-type opts() :: #{error := float(), bound := non_neg_integer()}.

?DOC("DDSketch instance.").
-opaque ddskerl_bound() :: #ddskerl_bound{}.
-opaque ddsketch() :: #ddskerl_bound{}.

-export_type([ddskerl_bound/0, opts/0]).
-export_type([ddsketch/0, opts/0]).

?DOC("Create a new DDSketch instance.").
-spec new(opts()) -> ddskerl_bound().
-spec new(opts()) -> ddsketch().
new(#{error := Err, bound := Bound}) ->
Gamma = (1 + Err) / (1 - Err),
InvLogGamma = 1.0 / math:log2(Gamma),
#ddskerl_bound{bound = Bound, gamma = Gamma, inv_log_gamma = InvLogGamma}.

?DOC("Get the total number of elements in the DDSketch.").
-spec total(ddskerl_bound()) -> non_neg_integer().
-spec total(ddsketch()) -> non_neg_integer().
total(#ddskerl_bound{total = Total}) ->
Total.

?DOC("Get the sum of elements in the DDSketch.").
-spec sum(ddskerl_bound()) -> number().
-spec sum(ddsketch()) -> number().
sum(#ddskerl_bound{sum = Sum}) ->
Sum.

?DOC("Insert a value into the DDSketch.").
-spec insert(ddskerl_bound(), number()) -> ddskerl_bound().
-spec insert(ddsketch(), number()) -> ddsketch().
insert(
#ddskerl_bound{
data = Data,
Expand Down Expand Up @@ -131,7 +122,7 @@ insert(
end.

?DOC("Calculate the quantile of a DDSketch.").
-spec quantile(ddskerl_bound(), float()) -> float() | undefined.
-spec quantile(ddsketch(), float()) -> float() | undefined.
quantile(#ddskerl_bound{min = Min}, +0.0) ->
Min;
quantile(#ddskerl_bound{max = Max}, 1.0) ->
Expand Down Expand Up @@ -163,7 +154,7 @@ get_quantile(Data, Gamma, TotalQuantile, AccRank, {Pos, Count, NextIter}) ->
end.

?DOC("Merge two DDSketch instances.").
-spec merge(ddskerl_bound(), ddskerl_bound()) -> ddskerl_bound().
-spec merge(ddsketch(), ddsketch()) -> ddsketch().
merge(
#ddskerl_bound{
data = Data1, total = Total1, sum = Sum1, bound = Bound, gamma = G, min = Min1, max = Max1
Expand Down
57 changes: 19 additions & 38 deletions src/ddskerl_counters.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,31 +53,12 @@ If we're measuring picoseconds, this would suffice to measure 107 days.
> ```
""").

-include("./ddskerl.hrl").

-behaviour(ddskerl).

-export([new/1, total/1, sum/1, insert/2, merge/2, reset/1, quantile/2]).

-record(ddskerl_counters, {
ref :: counters:counters_ref(),
min_max :: atomics:atomics_ref(),
width :: non_neg_integer(),
bound :: non_neg_integer(),
gamma :: float(),
inv_log_gamma :: float()
}).

%% - total keeps track of the total count
%% - underflow of values that escape the summary below the minimum bucket: the interval (0,1]
%% - in between we find all the buckets
%% - overflow of values that escape the summary above the maximum bucket
-define(TOTAL_POS, 1).
-define(SUM_POS, 2).
-define(UNDERFLOW_POS, 3).
-define(EXTRA_KEYS, 4).
-define(PREFIX, 3).
-define(OVERFLOW_POS(Bound), ?EXTRA_KEYS + Bound).
-define(MAX_INT, (1 bsl 64 - 1)).

?DOC("Options for the DDSketch").
-type opts() :: #{error := float(), bound := non_neg_integer()}.

Expand All @@ -96,8 +77,8 @@ new(#{error := Err, bound := Bound}) ->
%% So that all future comparisons will succeed.
Width = 2 * erlang:system_info(schedulers),
MinMax = atomics:new(Width, [{signed, false}]),
[atomics:put(MinMax, Ix, ?MAX_INT) || Ix <- lists:seq(1, Width, 2)],
Ref = counters:new(?EXTRA_KEYS + Bound, [write_concurrency]),
[atomics:put(MinMax, Ix, ?C_MAX_INT) || Ix <- lists:seq(1, Width, 2)],
Ref = counters:new(?C_EXTRA_KEYS + Bound, [write_concurrency]),
Gamma = (1 + Err) / (1 - Err),
InvLogGamma = 1.0 / math:log2(Gamma),
#ddskerl_counters{
Expand All @@ -112,26 +93,26 @@ new(#{error := Err, bound := Bound}) ->
?DOC("Get the total number of elements in the DDSketch").
-spec total(ddsketch()) -> non_neg_integer().
total(#ddskerl_counters{ref = Ref}) ->
counters:get(Ref, ?TOTAL_POS).
counters:get(Ref, ?C_TOTAL_POS).

?DOC("Get the sum of elements in the DDSketch").
-spec sum(ddsketch()) -> non_neg_integer().
sum(#ddskerl_counters{ref = Ref}) ->
counters:get(Ref, ?SUM_POS).
counters:get(Ref, ?C_SUM_POS).

?DOC("Reset the DDSketch values to zero").
-spec reset(ddsketch()) -> ddsketch().
reset(#ddskerl_counters{ref = Ref, min_max = MinMax, bound = Bound, width = Width} = S) ->
[atomics:put(MinMax, Ix, ?MAX_INT) || Ix <- lists:seq(1, Width, 2)],
[atomics:put(MinMax, Ix, ?C_MAX_INT) || Ix <- lists:seq(1, Width, 2)],
[atomics:put(MinMax, Ix, 0) || Ix <- lists:seq(2, Width, 2)],
[counters:put(Ref, Pos, 0) || Pos <- lists:seq(?TOTAL_POS, ?OVERFLOW_POS(Bound))],
[counters:put(Ref, Pos, 0) || Pos <- lists:seq(?C_TOTAL_POS, ?C_OVERFLOW_POS(Bound))],
S.

?DOC("Insert a value into the DDSketch").
-spec insert(ddsketch(), Value :: number()) -> ddsketch().
insert(#ddskerl_counters{ref = Ref, min_max = MinMax} = S, Val) when 0 < Val, Val =< 1 ->
counters:add(Ref, ?TOTAL_POS, 1),
counters:add(Ref, ?UNDERFLOW_POS, 1),
counters:add(Ref, ?C_TOTAL_POS, 1),
counters:add(Ref, ?C_UNDERFLOW_POS, 1),
update_min_max_sum(Ref, MinMax, round(Val), erlang:system_info(scheduler_id)),
S;
insert(
Expand All @@ -141,21 +122,21 @@ insert(
1 < Val
->
Key = ceil(math:log2(Val) * InvLogGamma),
counters:add(Ref, ?TOTAL_POS, 1),
counters:add(Ref, ?C_TOTAL_POS, 1),
update_min_max_sum(Ref, MinMax, round(Val), erlang:system_info(scheduler_id)),
case Key =< Bound of
true ->
counters:add(Ref, ?UNDERFLOW_POS + Key, 1);
counters:add(Ref, ?C_UNDERFLOW_POS + Key, 1);
false ->
counters:add(Ref, ?OVERFLOW_POS(Bound), 1)
counters:add(Ref, ?C_OVERFLOW_POS(Bound), 1)
end,
S.

-spec update_min_max_sum(
counters:counters_ref(), atomics:atomics_ref(), non_neg_integer(), non_neg_integer()
) -> ok.
update_min_max_sum(Ref, MinMax, Value, SchedulerId) ->
counters:add(Ref, ?SUM_POS, Value),
counters:add(Ref, ?C_SUM_POS, Value),
MinIndex = 2 * SchedulerId - 1,
MaxIndex = 2 * SchedulerId,
Min = atomics:get(MinMax, MinIndex),
Expand Down Expand Up @@ -203,11 +184,11 @@ quantile(#ddskerl_counters{min_max = MinMax, width = Width}, 1.0) ->
quantile(#ddskerl_counters{ref = Ref, bound = Bound, gamma = Gamma}, Quantile) when
0 < Quantile, Quantile < 1
->
Total = counters:get(Ref, ?TOTAL_POS),
AccRank = counters:get(Ref, ?UNDERFLOW_POS),
Total = counters:get(Ref, ?C_TOTAL_POS),
AccRank = counters:get(Ref, ?C_UNDERFLOW_POS),
TotalQuantile = Total * Quantile,
ToIndex = ?OVERFLOW_POS(Bound) + 1,
get_quantile(Ref, Gamma, TotalQuantile, AccRank, ?PREFIX, ToIndex).
ToIndex = ?C_OVERFLOW_POS(Bound) + 1,
get_quantile(Ref, Gamma, TotalQuantile, AccRank, ?C_PREFIX, ToIndex).

-spec get_quantile(
counters:counters_ref(),
Expand All @@ -221,7 +202,7 @@ quantile(#ddskerl_counters{ref = Ref, bound = Bound, gamma = Gamma}, Quantile) w
get_quantile(_, _, _, _, End, End) ->
undefined;
get_quantile(_, Gamma, TotalQuantile, AccRank, Pos, _) when TotalQuantile =< AccRank ->
result(Gamma, Pos - ?PREFIX);
result(Gamma, Pos - ?C_PREFIX);
get_quantile(Ref, Gamma, TotalQuantile, AccRank, Pos, OverflowPos) ->
NewPos = Pos + 1,
Value = counters:get(Ref, NewPos),
Expand Down
Loading