diff --git a/include/opentelemetry.hrl b/include/opentelemetry.hrl index 716f7cc6..4f58f0ec 100644 --- a/include/opentelemetry.hrl +++ b/include/opentelemetry.hrl @@ -146,3 +146,6 @@ %% developer-facing error message message :: unicode:unicode_binary() }). + +%% table to store active spans +-define(SPAN_TAB, otel_span_table). diff --git a/src/opentelemetry.app.src b/src/opentelemetry.app.src index 3bd3e164..6fd83a34 100644 --- a/src/opentelemetry.app.src +++ b/src/opentelemetry.app.src @@ -10,7 +10,9 @@ ]}, {env, [{tracer, {ot_tracer_default, #{span => {ot_span_ets, []}, ctx => {ot_ctx_pdict, []}}}}, - {sampler, {always_on, #{}}}]}, + {sampler, {always_on, #{}}}, + {exporter, [{exporters, []}, + {scheduled_delay_ms, 30000}]}]}, {modules, []}, {licenses, ["Apache 2.0"]}, diff --git a/src/opentelemetry.erl b/src/opentelemetry.erl index 04454502..92ae3252 100644 --- a/src/opentelemetry.erl +++ b/src/opentelemetry.erl @@ -41,6 +41,8 @@ span_kind/0, link/0, links/0, + attribute_key/0, + attribute_value/0, attributes/0, annotation/0, time_events/0, @@ -59,8 +61,10 @@ -type span() :: #span{}. -type span_name() :: unicode:unicode_binary(). +-type attribute_key() :: unicode:unicode_binary(). -type attribute_value() :: any(). --type attributes() :: [{unicode:unicode_binary(), attribute_value()}]. +-type attribute() :: {unicode:unicode_binary(), attribute_value()}. +-type attributes() :: [attribute()]. -type annotation() :: #annotation{}. -type span_kind() :: ?SPAN_KIND_INTERNAL | diff --git a/src/opentelemetry_sup.erl b/src/opentelemetry_sup.erl index 60098742..a3a4eebb 100644 --- a/src/opentelemetry_sup.erl +++ b/src/opentelemetry_sup.erl @@ -32,9 +32,19 @@ init([Children, Opts]) -> SupFlags = #{strategy => one_for_one, intensity => 0, period => 1}, + + ExporterOpts = proplists:get_value(exporter, Opts, []), + Exporter = #{id => ot_exporter, + start => {ot_exporter, start_link, [ExporterOpts]}, + restart => permanent, + shutdown => 1000, + type => worker, + modules => [ot_exporter]}, + ChildSpecs = [#{id => ot_span_sup, start => {ot_span_sup, start_link, [Opts]}, - type => supervisor} | Children], + type => supervisor}, + Exporter | Children], {ok, {SupFlags, ChildSpecs}}. %% internal functions diff --git a/src/ot_exporter.erl b/src/ot_exporter.erl new file mode 100644 index 00000000..b1779e74 --- /dev/null +++ b/src/ot_exporter.erl @@ -0,0 +1,258 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2019, OpenTelemetry Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc This module has the behaviour that each exporter must implement +%% and creates the buffer of trace spans to be exported. +%% +%% The exporter process can be configured to export the current finished +%% spans based on timeouts and the size of the finished spans table. +%% +%% Timeouts: +%% exporting_timeout_ms: How long to let the exports run before killing. +%% check_table_size_ms: Timeout to check the size of the export table. +%% scheduled_delay_ms: How often to trigger running the exporters. +%% +%% The size limit of the current table where finished spans are stored can +%% be configured with the `max_queue_size' option. +%% @end +%%%----------------------------------------------------------------------- +-module(ot_exporter). + +-behaviour(gen_statem). + +-compile({no_auto_import, [register/2]}). + +-export([start_link/1, + store_span/1, + register/1, + register/2]). + +-export([init/1, + callback_mode/0, + idle/3, + exporting/3, + terminate/3]). + +-include("opentelemetry.hrl"). +-include_lib("kernel/include/logger.hrl"). + +%% behaviour for exporters to implement +-type opts() :: term(). + +%% Do any initialization of the exporter here and return configuration +%% that will be passed along with a list of spans to the `export' function. +-callback init(term()) -> opts(). + +%% This function is called when the configured interval expires with any +%% spans that have been collected so far and the configuration returned in `init'. +%% Do whatever needs to be done to export each span here, the caller will block +%% until it returns. +-callback export(ets:tid(), opts()) -> ok | success | failed_not_retryable | failed_retryable. + +-record(data, {exporters :: [{module(), term()}], + handed_off_table :: atom() | undefined, + runner_pid :: pid() | undefined, + max_queue_size :: integer() | infinity, + exporting_timeout_ms :: integer(), + check_table_size_ms :: integer() | infinity, + scheduled_delay_ms :: integer()}). + +-define(CURRENT_TABLES_KEY, {?MODULE, current_table}). +-define(TABLE_1, ot_export_table1). +-define(TABLE_2, ot_export_table2). +-define(CURRENT_TABLE, persistent_term:get(?CURRENT_TABLES_KEY)). + +-define(DEFAULT_MAX_QUEUE_SIZE, 2048). +-define(DEFAULT_SCHEDULED_DEPLAY_MS, timer:seconds(5)). +-define(DEFAULT_EXPORTING_TIMEOUT, timer:minutes(5)). +-define(DEFAULT_CHECK_TABLE_SIZE_INTERVAL, infinity). + +start_link(Opts) -> + gen_statem:start_link({local, ?MODULE}, ?MODULE, [Opts], []). + +%% @equiv register(Exporter, []). +register(Exporter) -> + register(Exporter, []). + +%% @doc Register new traces exporter `Exporter' with `Config'. +-spec register(module(), term()) -> ok. +register(Exporter, Options) -> + gen_statem:call(?MODULE, {register, init_exporter({Exporter, Options})}). + +-spec store_span(opencensus:span()) -> true | {error, invalid_span} | {error, no_export_buffer}. +store_span(Span=#span{}) -> + try + ets:insert(?CURRENT_TABLE, Span) + catch + error:badarg -> + {error, no_export_buffer} + end; +store_span(_) -> + {error, invalid_span}. + +init([Args]) -> + process_flag(trap_exit, true), + + SizeLimit = proplists:get_value(max_queue_size, Args, ?DEFAULT_MAX_QUEUE_SIZE), + ExportingTimeout = proplists:get_value(exporting_timeout_ms, Args, ?DEFAULT_EXPORTING_TIMEOUT), + ScheduledDelay = proplists:get_value(scheduled_delay_ms, Args, ?DEFAULT_SCHEDULED_DEPLAY_MS), + CheckTableSize = proplists:get_value(check_table_size_ms, Args, ?DEFAULT_CHECK_TABLE_SIZE_INTERVAL), + Exporters = [init_exporter(Config) || Config <- proplists:get_value(exporters, Args, [])], + + _Tid1 = new_export_table(?TABLE_1), + _Tid2 = new_export_table(?TABLE_2), + persistent_term:put(?CURRENT_TABLES_KEY, ?TABLE_1), + + {ok, idle, #data{exporters=Exporters, + handed_off_table=undefined, + max_queue_size=case SizeLimit of + infinity -> infinity; + _ -> SizeLimit div erlang:system_info(wordsize) + end, + exporting_timeout_ms=ExportingTimeout, + check_table_size_ms=CheckTableSize, + scheduled_delay_ms=ScheduledDelay}}. + +callback_mode() -> + [state_functions, state_enter]. + +idle(enter, _OldState, #data{scheduled_delay_ms=SendInterval}) -> + {keep_state_and_data, [{{timeout, export_spans}, SendInterval, export_spans}]}; +idle(_, export_spans, Data) -> + {next_state, exporting, Data}; +idle(EventType, Event, Data) -> + handle_event_(idle, EventType, Event, Data). + +exporting({timeout, export_spans}, export_spans, _) -> + {keep_state_and_data, [postpone]}; +exporting(enter, _OldState, Data=#data{exporting_timeout_ms=ExportingTimeout, + scheduled_delay_ms=SendInterval}) -> + {OldTableName, RunnerPid} = export_spans(Data), + {keep_state, Data#data{runner_pid=RunnerPid, + handed_off_table=OldTableName}, + [{state_timeout, ExportingTimeout, exporting_timeout}, + {{timeout, export_spans}, SendInterval, export_spans}]}; +exporting(state_timeout, exporting_timeout, Data=#data{handed_off_table=ExportingTable}) -> + %% kill current exporting process because it is taking too long + %% which deletes the exporting table, so create a new one and + %% repeat the state to force another span exporting immediately + Data1 = kill_runner(Data), + new_export_table(ExportingTable), + {repeat_state, Data1}; +%% important to verify runner_pid and FromPid are the same in case it was sent +%% after kill_runner was called but before it had done the unlink +exporting(info, {'EXIT', FromPid, _}, Data=#data{runner_pid=FromPid}) -> + complete_exporting([], Data); +%% important to verify runner_pid and FromPid are the same in case it was sent +%% after kill_runner was called but before it had done the unlink +exporting(info, {completed, FromPid, FailedExporters}, Data=#data{runner_pid=FromPid}) -> + complete_exporting(FailedExporters, Data); +exporting(EventType, Event, Data) -> + handle_event_(exporting, EventType, Event, Data). + +handle_event_(_State, {timeout, check_table_size}, check_table_size, #data{max_queue_size=infinity}) -> + keep_state_and_data; +handle_event_(State, {timeout, check_table_size}, check_table_size, Data=#data{max_queue_size=SizeLimit}) -> + case ets:info(?CURRENT_TABLE, memory) of + M when M >= SizeLimit, State =:= idle -> + Data1 = kill_runner(Data), + {next_state, exporting, Data1}; + M when M >= SizeLimit, State =:= exporting -> + Data1 = kill_runner(Data), + {repeat_state, Data1}; + _ -> + keep_state_and_data + end; +handle_event_(_, {call, From}, {register, Exporter}, Data=#data{exporters=Exporters}) -> + {keep_state, Data#data{exporters=[Exporter | Exporters]}, [{reply, From, ok}]}; +handle_event_(_, _, _, _) -> + keep_state_and_data. + +terminate(_, _, _Data) -> + ok. + +%% + +complete_exporting(FailedExporters, Data=#data{exporters=Exporters, + handed_off_table=ExportingTable}) + when ExportingTable =/= undefined -> + new_export_table(ExportingTable), + {next_state, idle, Data#data{exporters=Exporters--FailedExporters, + runner_pid=undefined, + handed_off_table=undefined}}. + +kill_runner(Data=#data{runner_pid=RunnerPid}) -> + erlang:unlink(RunnerPid), + erlang:exit(RunnerPid, kill), + Data#data{runner_pid=undefined, + handed_off_table=undefined}. + +new_export_table(Name) -> + ets:new(Name, [public, named_table, {write_concurrency, true}, duplicate_bag]). + +init_exporter({Exporter, Config}) when is_atom(Exporter) -> + {fun Exporter:export/2, Exporter:init(Config)}; +init_exporter(Exporter) when is_atom(Exporter) -> + {fun Exporter:export/2, Exporter:init([])}; +init_exporter(Exporter) when is_function(Exporter) -> + {Exporter, []}; +init_exporter({Exporter, Config}) when is_function(Exporter) -> + {Exporter, Config}. + +export_spans(#data{exporters=Exporters}) -> + CurrentTable = ?CURRENT_TABLE, + NewCurrentTable = case CurrentTable of + ?TABLE_1 -> + ?TABLE_2; + ?TABLE_2 -> + ?TABLE_1 + end, + + %% an atom is a single word so this does not trigger a global GC + persistent_term:put(?CURRENT_TABLES_KEY, NewCurrentTable), + + Self = self(), + RunnerPid = erlang:spawn_link(fun() -> send_spans(Self, Exporters) end), + ets:give_away(CurrentTable, RunnerPid, export), + {CurrentTable, RunnerPid}. + +%% Additional benefit of using a separate process is calls to `register` won't +%% timeout if the actual exporting takes longer than the call timeout +send_spans(FromPid, Exporters) -> + receive + {'ETS-TRANSFER', Table, FromPid, export} -> + TableName = ets:rename(Table, current_send_table), + FailedExporters = lists:filtermap(fun({Exporter, Config}) -> + export(Exporter, TableName, Config) + end, Exporters), + ets:delete(TableName), + completed(FromPid, FailedExporters) + end. + +completed(FromPid, FailedExporters) -> + FromPid ! {completed, self(), FailedExporters}. + +export(undefined, _, _) -> + true; +export(Exporter, SpansTid, Config) -> + %% don't let a exporter exception crash us + %% and return true if exporter failed + try + Exporter(SpansTid, Config) =:= failed_not_retryable + catch + Class:Exception:StackTrace -> + ?LOG_INFO("dropping exporter that threw exception: exporter=~p ~p:~p stacktrace=~p", + [Exporter, Class, Exception, StackTrace]), + true + end. diff --git a/src/ot_exporter_pid.erl b/src/ot_exporter_pid.erl new file mode 100644 index 00000000..1b72aa5b --- /dev/null +++ b/src/ot_exporter_pid.erl @@ -0,0 +1,32 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2019, OpenTelemetry Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc A test reporter for sending trace spans to an Erlang PID as message. +%% @end +%%%----------------------------------------------------------------------- +-module(ot_exporter_pid). + +-behaviour(ot_exporter). + +-export([init/1, + export/2]). + +init(Pid) -> + Pid. + +export(SpansTid, Pid) -> + ets:foldl(fun(Span, _Acc) -> + Pid ! {span, Span} + end, [], SpansTid), + ok. diff --git a/src/ot_exporter_stdout.erl b/src/ot_exporter_stdout.erl new file mode 100644 index 00000000..77e9fd89 --- /dev/null +++ b/src/ot_exporter_stdout.erl @@ -0,0 +1,32 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2019, OpenTelemetry Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc Reporter that prints spans to stdout. +%% @end +%%%----------------------------------------------------------------------- +-module(ot_exporter_stdout). + +-behaviour(ot_exporter). + +-export([init/1, + export/2]). + +init(_) -> + ok. + +export(SpansTid, _) -> + ets:foldl(fun(Span, _Acc) -> + io:format("~p~n", [Span]) + end, [], SpansTid), + ok. diff --git a/src/ot_exporter_tab.erl b/src/ot_exporter_tab.erl new file mode 100644 index 00000000..fccff10a --- /dev/null +++ b/src/ot_exporter_tab.erl @@ -0,0 +1,32 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2019, OpenTelemetry Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc A test reporter that keeps finished spans in an ETS table. +%% @end +%%%----------------------------------------------------------------------- +-module(ot_exporter_tab). + +-behaviour(ot_exporter). + +-export([init/1, + export/2]). + +init(Tid) -> + Tid. + +export(SpansTid, Tid) -> + ets:foldl(fun(Span, _Acc) -> + ets:insert(Tid, Span) + end, [], SpansTid), + ok. diff --git a/src/ot_span.erl b/src/ot_span.erl index 66e4cddd..ce7fcb0f 100644 --- a/src/ot_span.erl +++ b/src/ot_span.erl @@ -22,6 +22,7 @@ finish_span/2, get_ctx/2, is_recording_events/2, + set_attribute/4, set_attributes/3, add_events/3, add_links/3, @@ -37,13 +38,16 @@ -export_type([start_opts/0]). -callback start_span(opentelemetry:span_name(), start_opts()) -> opentelemetry:span_ctx(). --callback finish_span(opentelemetry:span_ctx()) -> ok. +-callback finish_span(opentelemetry:span_ctx()) -> boolean() | {error, term()}. -callback get_ctx(opentelemetry:span()) -> opentelemetry:span_ctx(). -callback is_recording_events(opentelemetry:span_ctx()) -> boolean(). --callback set_attributes(opentelemetry:span_ctx(), opentelemetry:attributes()) -> ok. --callback add_events(opentelemetry:span_ctx(), opentelemetry:time_events()) -> ok. --callback set_status(opentelemetry:span_ctx(), opentelemetry:status()) -> ok. --callback update_name(opentelemetry:span_ctx(), opentelemetry:span_name()) -> ok. +-callback set_attribute(opentelemetry:span_ctx(), + opentelemetry:attribute_key(), + opentelemetry:attribute_value()) -> boolean(). +-callback set_attributes(opentelemetry:span_ctx(), opentelemetry:attributes()) -> boolean(). +-callback add_events(opentelemetry:span_ctx(), opentelemetry:time_events()) -> boolean(). +-callback set_status(opentelemetry:span_ctx(), opentelemetry:status()) -> boolean(). +-callback update_name(opentelemetry:span_ctx(), opentelemetry:span_name()) -> boolean(). start_span(Module, Name, Opts) -> Module:start_span(Name, Opts). @@ -57,6 +61,9 @@ get_ctx(Module, Span) -> is_recording_events(Module, SpanCtx) -> Module:is_recording_events(SpanCtx). +set_attribute(Module, SpanCtx, Key, Value) -> + Module:set_attribute(SpanCtx, Key, Value). + set_attributes(Module, SpanCtx, Attributes) -> Module:set_attributes(SpanCtx, Attributes). diff --git a/src/ot_span_ets.erl b/src/ot_span_ets.erl index 47452b42..6bd9ebce 100644 --- a/src/ot_span_ets.erl +++ b/src/ot_span_ets.erl @@ -30,6 +30,7 @@ finish_span/1, get_ctx/1, is_recording_events/1, + set_attribute/3, set_attributes/2, add_events/2, set_status/2, @@ -39,9 +40,6 @@ -record(state, {}). -%% table to store active spans --define(SPAN_TAB, otel_span_table). - start_link(Opts) -> gen_server:start_link(?MODULE, Opts, []). @@ -53,17 +51,16 @@ start_span(Name, Opts) -> SpanCtx. %% @doc Finish a span based on its context and send to reporter. --spec finish_span(opentelemetry:span_ctx()) -> ok. +-spec finish_span(opentelemetry:span_ctx()) -> boolean() | {error, term()}. finish_span(#span_ctx{span_id=SpanId, tracestate=Tracestate, trace_flags=TraceOptions}) when ?IS_SPAN_ENABLED(TraceOptions) -> case ets:take(?SPAN_TAB, SpanId) of [Span] -> - _Span1 = ot_span_utils:end_span(Span#span{tracestate=Tracestate}), - %% oc_reporter:store_span(Span1), - ok; + Span1 = ot_span_utils:end_span(Span#span{tracestate=Tracestate}), + ot_exporter:store_span(Span1); _ -> - ok + false end; finish_span(_) -> ok. @@ -73,24 +70,42 @@ get_ctx(_Span) -> #span_ctx{}. -spec is_recording_events(opentelemetry:span_ctx()) -> boolean(). -is_recording_events(_SpanCtx) -> - false. - --spec set_attributes(opentelemetry:span_ctx(), opentelemetry:attributes()) -> ok. -set_attributes(_SpanCtx, _Attributes) -> - ok. - --spec add_events(opentelemetry:span_ctx(), opentelemetry:time_events()) -> ok. -add_events(_SpanCtx, _TimeEvents) -> - ok. +is_recording_events(#span_ctx{is_recorded=IsRecorded}) -> + IsRecorded. + +-spec set_attribute(opentelemetry:span_ctx(), + opentelemetry:attribute_key(), + opentelemetry:attribute_value()) -> boolean(). +set_attribute(#span_ctx{span_id=SpanId}, Key, Value) -> + set_attributes(#span_ctx{span_id=SpanId}, [{Key, Value}]). + +-spec set_attributes(opentelemetry:span_ctx(), opentelemetry:attributes()) -> boolean(). +set_attributes(#span_ctx{span_id=SpanId}, NewAttributes) -> + case ets:lookup(?SPAN_TAB, SpanId) of + [Span=#span{attributes=Attributes}] -> + Span1 = Span#span{attributes=Attributes++NewAttributes}, + 1 =:= ets:select_replace(?SPAN_TAB, [{Span, [], [{const, Span1}]}]); + _ -> + false + end. + +-spec add_events(opentelemetry:span_ctx(), opentelemetry:time_events()) -> boolean(). +add_events(#span_ctx{span_id=SpanId}, NewTimeEvents) -> + case ets:lookup(?SPAN_TAB, SpanId) of + [Span=#span{time_events=TimeEvents}] -> + Span1 = Span#span{time_events=TimeEvents++NewTimeEvents}, + 1 =:= ets:select_replace(?SPAN_TAB, [{Span, [], [{const, Span1}]}]); + _ -> + false + end. --spec set_status(opentelemetry:span_ctx(), opentelemetry:status()) -> ok. -set_status(_SpanCtx, _Status) -> - ok. +-spec set_status(opentelemetry:span_ctx(), opentelemetry:status()) -> boolean(). +set_status(#span_ctx{span_id=SpanId}, Status) -> + ets:update_element(?SPAN_TAB, SpanId, {#span.status, Status}). --spec update_name(opentelemetry:span_ctx(), opentelemetry:span_name()) -> ok. -update_name(_SpanCtx, _SpanName) -> - ok. +-spec update_name(opentelemetry:span_ctx(), opentelemetry:span_name()) -> boolean(). +update_name(#span_ctx{span_id=SpanId}, Name) -> + ets:update_element(?SPAN_TAB, SpanId, {#span.name, Name}). %% diff --git a/src/ot_span_sup.erl b/src/ot_span_sup.erl index 58222f6e..439c06dc 100644 --- a/src/ot_span_sup.erl +++ b/src/ot_span_sup.erl @@ -32,11 +32,19 @@ start_link(Opts) -> start_child(ChildSpec) -> supervisor:start_child(?SERVER, ChildSpec). -init([_Opts]) -> +init([Opts]) -> + SweeperOpts = proplists:get_value(sweeper, Opts, #{}), + Sweeper = #{id => ot_span_sweeper, + start => {ot_span_sweeper, start_link, [SweeperOpts]}, + restart => permanent, + shutdown => 1000, + type => worker, + modules => [ot_span_sweeper]}, + SupFlags = #{strategy => one_for_one, intensity => 0, period => 1}, - ChildSpecs = [], + ChildSpecs = [Sweeper], {ok, {SupFlags, ChildSpecs}}. %% internal functions diff --git a/src/ot_span_sweeper.erl b/src/ot_span_sweeper.erl new file mode 100644 index 00000000..bb3dbcad --- /dev/null +++ b/src/ot_span_sweeper.erl @@ -0,0 +1,163 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2019, OpenTelemetry Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc The span sweeper is a process that can be configured to remove, +%% either by finishing or deleting, spans that are still active after a +%% period of time. +%% @end +%%%------------------------------------------------------------------------- +-module(ot_span_sweeper). + +-behaviour(gen_statem). + +-export([start_link/1]). + +-export([init/1, + callback_mode/0, + handle_event/4, + code_change/4, + terminate/3]). + +-export([storage_size/0]). + +-include("opentelemetry.hrl"). +-include_lib("kernel/include/logger.hrl"). + +-record(data, {interval :: integer() | infinity, + strategy :: drop | finish | failed_attribute_and_finish | fun((opencensus:span()) -> ok), + ttl :: integer() | infinity, + storage_size :: integer() | infinity}). + +storage_size() -> + {ets:info(?SPAN_TAB, size), ets:info(?SPAN_TAB, memory) * erlang:system_info({wordsize, external})}. + +start_link(Opts) -> + gen_statem:start_link({local, ?MODULE}, ?MODULE, [Opts], []). + +init([SweeperConfig]) -> + Interval = maps:get(interval, SweeperConfig, timer:minutes(10)), + Strategy = maps:get(strategy, SweeperConfig, drop), + TTL = maps:get(span_ttl, SweeperConfig, timer:minutes(30)), + StorageSize = maps:get(storage_size, SweeperConfig, infinity), + {ok, ready, #data{interval=Interval, + strategy=Strategy, + ttl=maybe_convert_time_unit(TTL), + storage_size=StorageSize}, + [hibernate, {state_timeout, Interval, sweep}]}. + +maybe_convert_time_unit(infinity) -> + infinity; +maybe_convert_time_unit(TTL) -> + erlang:convert_time_unit(TTL, millisecond, native). + +callback_mode() -> + handle_event_function. + +handle_event(state_timeout, sweep, _, #data{interval=Interval} = Data) -> + do_gc(Data), + {keep_state_and_data, [hibernate, {state_timeout, Interval, sweep}]}; +handle_event(_, _, _, _Data) -> + keep_state_and_data. + +code_change(_, State, Data, _) -> + {ok, State, Data}. + +terminate(_Reason, _State, _Data) -> + ok. + +%% +do_gc(#data{strategy=Strategy, + ttl=TTL, + storage_size=infinity}) -> + sweep_spans(Strategy, TTL); +do_gc(#data{strategy=Strategy, + ttl=TTL, + storage_size=MaxSize}) -> + + {_, StorageSize} = storage_size(), + + if + StorageSize >= 2 * MaxSize -> + %% High overload kill storage. + ets:delete_all_objects(?SPAN_TAB); + StorageSize >= MaxSize -> + %% Low overload, reduce TTL + sweep_spans(Strategy, overload_ttl(TTL)); + true -> + sweep_spans(Strategy, TTL) + end. + +overload_ttl(infinity) -> + infinity; +overload_ttl(TTL) -> + TTL div 10. + +sweep_spans(_, infinity) -> + ok; +sweep_spans(drop, TTL) -> + TooOld = erlang:monotonic_time() - TTL, + case ets:select_delete(?SPAN_TAB, expired_match_spec(TooOld, true)) of + 0 -> + ok; + NumDeleted -> + ?LOG_INFO("sweep old spans: ttl=~p num_dropped=~p", [TTL, NumDeleted]) + end; +sweep_spans(finish, TTL) -> + Expired = select_expired(TTL), + [begin + case ets:take(?SPAN_TAB, SpanId) of + [] -> + %% must have finished without needing to be swept + ok; + [Span] -> + finish_span(Span) + end + end || SpanId <- Expired], + ok; +sweep_spans(failed_attribute_and_finish, TTL) -> + ExpiredSpanIds = select_expired(TTL), + [begin + case ets:take(?SPAN_TAB, SpanId) of + [] -> + %% must have finished without needing to be swept + ok; + [Span=#span{attributes=Attributes}] -> + Span1 = Span#span{attributes=Attributes ++ [{<<"finished_by_sweeper">>, true}]}, + finish_span(Span1) + end + end || SpanId <- ExpiredSpanIds], + ok; +sweep_spans(Fun, TTL) when is_function(Fun) -> + Expired = select_expired(TTL), + [Fun(Span) || Span <- Expired], + ok. + +%% ignore these functions because dialyzer doesn't like match spec use of '_' +-dialyzer({nowarn_function, expired_match_spec/2}). +-dialyzer({nowarn_function, finish_span/1}). +-dialyzer({nowarn_function, select_expired/1}). + +select_expired(TTL) -> + TooOld = erlang:monotonic_time() - TTL, + ets:select(?SPAN_TAB, expired_match_spec(TooOld, '$1')). + +expired_match_spec(Time, Return) -> + [{#span{span_id='$1', start_time={'$2', '_'}, _='_'}, + [{'<', '$2', Time}], + [Return]}]. + +finish_span(Span=#span{tracestate=Tracestate}) -> + %% hack to not lose tracestate when finishing without span ctx + Span1 = ot_span_utils:end_span(Span#span{tracestate=Tracestate}), + ot_exporter:store_span(Span1). diff --git a/test/opentelemetry_SUITE.erl b/test/opentelemetry_SUITE.erl index 8000d9e4..b704de0a 100644 --- a/test/opentelemetry_SUITE.erl +++ b/test/opentelemetry_SUITE.erl @@ -6,6 +6,7 @@ -include_lib("common_test/include/ct.hrl"). -include("opentelemetry.hrl"). +-include("ot_test_utils.hrl"). all() -> [{group, ot_ctx_pdict}, @@ -28,19 +29,28 @@ end_per_suite(_Config) -> init_per_group(CtxModule, Config) -> application:set_env(opentelemetry, tracer, {ot_tracer_default, #{span => {ot_span_ets, []}, ctx => {CtxModule, []}}}), + application:set_env(opentelemetry, exporter, [{exporters, []}, + {scheduled_delay_ms, 1}]), {ok, _} = application:ensure_all_started(opentelemetry), + Config. end_per_group(_, _Config) -> ok = application:stop(opentelemetry). init_per_testcase(_, Config) -> - Config. + %% adds an exporter for a new table + %% spans will be exported to a separate table for each of the test cases + Tid = ets:new(exported_spans, [public, bag]), + ot_exporter:register(ot_exporter_tab, Tid), + [{tid, Tid} | Config]. end_per_testcase(_, _Config) -> ok. -child_spans(_Config) -> +child_spans(Config) -> + Tid = ?config(tid, Config), + %% start a span and 2 children SpanCtx1 = ot_tracer:start_span(<<"span-1">>), SpanCtx2 = ot_tracer:start_span(<<"span-2">>), @@ -50,6 +60,8 @@ child_spans(_Config) -> ?assertMatch(SpanCtx3, ot_tracer:current_span_ctx()), ot_tracer:finish(), + assert_exported(Tid, SpanCtx3), + %% 2nd span should be the current span ctx now ?assertMatch(SpanCtx2, ot_tracer:current_span_ctx()), @@ -67,9 +79,13 @@ child_spans(_Config) -> %% finish first and no span should be current ctx ot_tracer:finish(), - ?assertMatch(undefined, ot_tracer:current_span_ctx()). + ?assertMatch(undefined, ot_tracer:current_span_ctx()), + + assert_all_exported(Tid, [SpanCtx1, SpanCtx2, SpanCtx3, SpanCtx4]). + +non_default_tracer(Config) -> + Tid = ?config(tid, Config), -non_default_tracer(_Config) -> SpanCtx1 = ot_tracer:start_span(<<"span-1">>), ?assertNotMatch(#span_ctx{trace_id=0, span_id=0}, SpanCtx1), @@ -79,4 +95,27 @@ non_default_tracer(_Config) -> ?assertMatch(#span_ctx{trace_id=0, span_id=0}, SpanCtx2), ?assertMatch(SpanCtx2, ot_tracer:current_span_ctx()), - ot_tracer:finish(). + ot_tracer:finish(), + + assert_exported(Tid, SpanCtx1), + assert_not_exported(Tid, SpanCtx2). + +%% + +assert_all_exported(Tid, SpanCtxs) -> + [assert_exported(Tid, SpanCtx) || SpanCtx <- SpanCtxs]. + +assert_exported(Tid, #span_ctx{trace_id=TraceId, + span_id=SpanId}) -> + ?UNTIL([] =/= ets:match(Tid, #span{trace_id=TraceId, + span_id=SpanId, + _='_'})). + +assert_not_exported(Tid, #span_ctx{trace_id=TraceId, + span_id=SpanId}) -> + %% sleep so exporter has run before we check + %% since we can't do like when checking it exists with UNTIL + timer:sleep(100), + ?assertMatch([], ets:match(Tid, #span{trace_id=TraceId, + span_id=SpanId, + _='_'})). diff --git a/test/ot_exporter_SUITE.erl b/test/ot_exporter_SUITE.erl new file mode 100644 index 00000000..0fe836dc --- /dev/null +++ b/test/ot_exporter_SUITE.erl @@ -0,0 +1,29 @@ +-module(ot_exporter_SUITE). + +-compile(export_all). + +-include_lib("stdlib/include/assert.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-include("opentelemetry.hrl"). +-include("ot_sampler.hrl"). + +all() -> + [exporting_timeout_test]. + +%% verifies that after the runner has to be killed for taking too long +%% that everything is still functional and the exporter does not crash +exporting_timeout_test(_Config) -> + process_flag(trap_exit, true), + + {ok, Pid} = ot_exporter:start_link([{exporters, [fun(_, _) -> timer:sleep(timer:minutes(10)) end]}, + {exporting_timeout_ms, 1}, + {scheduled_delay_ms, 1}]), + + receive + {'EXIT', Pid, _} -> + ct:fail(exporter_crash) + after + 100 -> + ok + end. diff --git a/test/ot_sweeper_SUITE.erl b/test/ot_sweeper_SUITE.erl new file mode 100644 index 00000000..d2367d9e --- /dev/null +++ b/test/ot_sweeper_SUITE.erl @@ -0,0 +1,175 @@ +%%% --------------------------------------------------------------------------- +%%% @doc +%%% @end +%%% --------------------------------------------------------------------------- +-module(ot_sweeper_SUITE). + +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-include("ot_test_utils.hrl"). +-include("opentelemetry.hrl"). + +all() -> + [storage_size, + drop, + finish, + failed_attribute_and_finish]. + +init_per_suite(Config) -> + application:load(opentelemetry), + Config. + +end_per_suite(_Config) -> + application:unload(opentelemetry), + ok. + +init_per_testcase(storage_size, Config) -> + application:set_env(opentelemetry, sweeper, #{interval => 250, + strategy => finish, + span_ttl => 500, + storage_size => 100}), + application:set_env(opentelemetry, tracer, {ot_tracer_default, #{span => {ot_span_ets, []}, + ctx => {ot_ctx_pdict, []}}}), + application:set_env(opentelemetry, exporter, [{exporters, [{ot_exporter_pid, self()}]}, + {scheduled_delay_ms, 1}]), + {ok, _} = application:ensure_all_started(opentelemetry), + Config; +init_per_testcase(Type, Config) -> + application:set_env(opentelemetry, sweeper, #{interval => 250, + strategy => Type, + span_ttl => 500}), + application:set_env(opentelemetry, tracer, {ot_tracer_default, #{span => {ot_span_ets, []}, + ctx => {ot_ctx_pdict, []}}}), + application:set_env(opentelemetry, exporter, [{exporters, [{ot_exporter_pid, self()}]}, + {scheduled_delay_ms, 1}]), + {ok, _} = application:ensure_all_started(opentelemetry), + Config. + +end_per_testcase(_, _Config) -> + ok = application:stop(opentelemetry), + ok. + +storage_size(_Config) -> + SpanName1 = <<"span-1">>, + SpanCtx = ot_tracer:start_span(SpanName1), + + ChildSpanName1 = <<"child-span-1">>, + ChildSpanCtx = ot_tracer:start_span(ChildSpanName1), + + [ChildSpanData] = ets:lookup(?SPAN_TAB, ChildSpanCtx#span_ctx.span_id), + ?assertEqual(ChildSpanName1, ChildSpanData#span.name), + ?assertEqual(SpanCtx#span_ctx.span_id, ChildSpanData#span.parent_span_id), + + %% wait until the sweeper sweeps away the parent span + ?UNTIL(ets:tab2list(?SPAN_TAB) =:= []), + + %% sleep long enough that the exporter would have run again for sure + timer:sleep(10), + + %% should be no exported spans + ?assertEqual(no_span, receive + {span, #span{name=N}} when N =:= SpanName1 -> + got_span + after + 0 -> + no_span + end). + +drop(_Config) -> + SpanName1 = <<"span-1">>, + SpanCtx = ot_tracer:start_span(SpanName1), + + ChildSpanName1 = <<"child-span-1">>, + ChildSpanCtx = ot_tracer:start_span(ChildSpanName1), + + [ChildSpanData] = ets:lookup(?SPAN_TAB, ChildSpanCtx#span_ctx.span_id), + ?assertEqual(ChildSpanName1, ChildSpanData#span.name), + ?assertEqual(SpanCtx#span_ctx.span_id, ChildSpanData#span.parent_span_id), + + ot_tracer:finish(), + + %% wait until the sweeper sweeps away the parent span + ?UNTIL(ets:tab2list(?SPAN_TAB) =:= []), + + ot_tracer:finish(), + + receive + {span, S=#span{name=Name}} when Name =:= ChildSpanName1 -> + %% Verify the end time and duration are set when the span was finished + ?assertMatch({ST, O} when is_integer(ST) + andalso is_integer(O), S#span.start_time), + ?assertMatch({ST, O} when is_integer(ST) + andalso is_integer(O), S#span.end_time) + after + 1000 -> ct:fail("Do not received any message after 1s") + end, + + %% sleep long enough that the exporter would have run again for sure + timer:sleep(10), + + %% should be no exported span for span-1 + ?assertEqual(no_span, receive + {span, #span{name=N}} when N =:= SpanName1 -> + got_span + after + 0 -> + no_span + end). + +finish(_Config) -> + SpanName1 = <<"span-1">>, + _SpanCtx = ot_tracer:start_span(SpanName1), + + ChildSpanName1 = <<"child-span-1">>, + _ChildSpanCtx = ot_tracer:start_span(ChildSpanName1), + ot_tracer:finish(), + + %% wait until the sweeper sweeps away the parent span + ?UNTIL(ets:tab2list(?SPAN_TAB) =:= []), + + lists:foreach(fun(Name) -> + receive + {span, S=#span{name=Name}} -> + %% Verify the end time and duration are set when the span was finished + ?assertMatch({ST, O} when is_integer(ST) + andalso is_integer(O), S#span.start_time), + ?assertMatch({ST, O} when is_integer(ST) + andalso is_integer(O), S#span.end_time) + after + 1000 -> ct:fail("Do not received any message after 1s") + end + end, [SpanName1, ChildSpanName1]). + +failed_attribute_and_finish(_Config) -> + SpanName1 = <<"span-1">>, + SpanCtx = ot_tracer:start_span(SpanName1), + + ChildSpanName1 = <<"child-span-1">>, + ChildSpanCtx = ot_tracer:start_span(ChildSpanName1), + + [ChildSpanData] = ets:lookup(?SPAN_TAB, ChildSpanCtx#span_ctx.span_id), + ?assertEqual(ChildSpanName1, ChildSpanData#span.name), + ?assertEqual(SpanCtx#span_ctx.span_id, ChildSpanData#span.parent_span_id), + + ot_tracer:finish(), + + %% wait until the sweeper sweeps away the parent span + ?UNTIL(ets:tab2list(?SPAN_TAB) =:= []), + + receive + {span, S=#span{name=Name, + attributes=Attributes}} when Name =:= SpanName1 -> + %% should have attribute finished_by_sweeper + ?assertMatch([{<<"finished_by_sweeper">>, true}], Attributes), + + %% Verify the end time and duration are set when the span was finished + ?assertMatch({ST, O} when is_integer(ST) + andalso is_integer(O), S#span.start_time), + ?assertMatch({ST, O} when is_integer(ST) + andalso is_integer(O), S#span.end_time) + after + 1000 -> ct:fail("Do not received any message after 1s") + end. diff --git a/test/ot_test_utils.hrl b/test/ot_test_utils.hrl new file mode 100644 index 00000000..005872ee --- /dev/null +++ b/test/ot_test_utils.hrl @@ -0,0 +1,13 @@ +%% Try for 1 seconds +-define(UNTIL(X), (fun Until(I) when I =:= 10 -> + ct:fail(timeout); + Until(I) -> + case X of + true -> + ok; + false -> + timer:sleep(100), + Until(I+1) + end + end)(0)). +