From 134defb4d5b5fa9af8dffd77703ce397e7ee3553 Mon Sep 17 00:00:00 2001 From: Tristan Sloughter Date: Thu, 7 Nov 2019 09:15:59 -0700 Subject: [PATCH] wip: restructure tracer around registry This is to be split into separate OTP applications to make the distinction clearer. For now the SDK registry is started when opentelemetry application is started. The registry is responsible for tracking the tracers, the sampler and span processors. The registry also currently sets a ctx and span module to use. I think this will change when context is split out. I'd also like to consider not having the span module be configurable in the SDK but always use ot_span_ets and if the user needs a fancy form of span handling they can replace the SDK if they can't get by with using span processors. --- src/opentelemetry.app.src | 10 +- src/opentelemetry.erl | 14 +- src/opentelemetry_app.erl | 13 +- src/opentelemetry_sup.erl | 34 +- src/ot_batch_processor.erl | 293 ++++++++++++++++++ src/ot_exporter.erl | 244 +-------------- src/ot_exporter_pid.erl | 8 +- src/ot_exporter_stdout.erl | 8 +- src/ot_exporter_tab.erl | 8 +- src/ot_registry_api.erl | 27 ++ src/ot_registry_sup.erl | 57 ++++ src/ot_registry_tracer.erl | 90 ++++++ src/ot_span_ets.erl | 15 +- src/ot_span_processor.erl | 27 ++ src/ot_span_sup.erl | 18 +- src/ot_span_sweeper.erl | 2 +- src/ot_tracer.erl | 58 +--- src/ot_tracer.hrl | 7 + src/ot_tracer_default.erl | 31 +- test/opentelemetry_SUITE.erl | 42 ++- ...SUITE.erl => ot_batch_processor_SUITE.erl} | 21 +- test/ot_sweeper_SUITE.erl | 17 +- 22 files changed, 649 insertions(+), 395 deletions(-) create mode 100644 src/ot_batch_processor.erl create mode 100644 src/ot_registry_api.erl create mode 100644 src/ot_registry_sup.erl create mode 100644 src/ot_registry_tracer.erl create mode 100644 src/ot_span_processor.erl create mode 100644 src/ot_tracer.hrl rename test/{ot_exporter_SUITE.erl => ot_batch_processor_SUITE.erl} (55%) diff --git a/src/opentelemetry.app.src b/src/opentelemetry.app.src index 6fd83a346..2c74fba70 100644 --- a/src/opentelemetry.app.src +++ b/src/opentelemetry.app.src @@ -8,11 +8,11 @@ stdlib, wts ]}, - {env, [{tracer, {ot_tracer_default, #{span => {ot_span_ets, []}, - ctx => {ot_ctx_pdict, []}}}}, - {sampler, {always_on, #{}}}, - {exporter, [{exporters, []}, - {scheduled_delay_ms, 30000}]}]}, + {env, [{sampler, {always_on, #{}}}, + {processors, [%% #{id => my_processor, + %% module => ot_batch_processor, + %% config => #{}} + ]}]}, {modules, []}, {licenses, ["Apache 2.0"]}, diff --git a/src/opentelemetry.erl b/src/opentelemetry.erl index 92ae32522..53f451c73 100644 --- a/src/opentelemetry.erl +++ b/src/opentelemetry.erl @@ -28,7 +28,10 @@ %%%------------------------------------------------------------------------- -module(opentelemetry). --export([generate_trace_id/0, +-export([set_default_tracer_registry/1, + get_tracer/0, + get_tracer/1, + generate_trace_id/0, generate_span_id/0]). -include("opentelemetry.hrl"). @@ -93,6 +96,15 @@ -type http_headers() :: [{unicode:unicode_binary(), unicode:unicode_binary()}]. +set_default_tracer_registry(TracerRegistry) -> + persistent_term:put({?MODULE, default_tracer_registry}, TracerRegistry). + +get_tracer() -> + (persistent_term:get({?MODULE, default_tracer_registry}, ot_registry_api)):get(). + +get_tracer(Name) -> + (persistent_term:get({?MODULE, default_tracer_registry}, ot_registry_api)):get(Name). + %%-------------------------------------------------------------------- %% @doc %% Generates a 128 bit random integer to use as a trace id. diff --git a/src/opentelemetry_app.erl b/src/opentelemetry_app.erl index f7e0cf216..8084b1283 100644 --- a/src/opentelemetry_app.erl +++ b/src/opentelemetry_app.erl @@ -23,18 +23,7 @@ start(_StartType, _StartArgs) -> Opts = application:get_all_env(opentelemetry), - - {sampler, {Sampler, SamplerOpts}} = lists:keyfind(sampler, 1, Opts), - SamplerFun = ot_sampler:setup(Sampler, SamplerOpts), - - %% The default sampler implementation must be passed to the tracer chosen. - %% Since the tracer is started after the rest of the supervision tree has started - %% and we only get its children, we must instantiate the sampler before anything else. - %% The sampler turns out to be instantiated before any supervision tree is started. - {tracer, {Tracer, TracerOpts}} = lists:keyfind(tracer, 1, Opts), - TracerChildren = ot_tracer:setup(Tracer, TracerOpts, SamplerFun), - - opentelemetry_sup:start_link(TracerChildren, Opts). + opentelemetry_sup:start_link(Opts). stop(_State) -> ok. diff --git a/src/opentelemetry_sup.erl b/src/opentelemetry_sup.erl index a3a4eebb2..af690643a 100644 --- a/src/opentelemetry_sup.erl +++ b/src/opentelemetry_sup.erl @@ -19,32 +19,28 @@ -behaviour(supervisor). --export([start_link/2]). +-export([start_link/1]). -export([init/1]). -define(SERVER, ?MODULE). -start_link(Children, Opts) -> - supervisor:start_link({local, ?SERVER}, ?MODULE, [Children, Opts]). +start_link(Opts) -> + supervisor:start_link({local, ?SERVER}, ?MODULE, [Opts]). -init([Children, Opts]) -> +init([Opts]) -> SupFlags = #{strategy => one_for_one, - intensity => 0, - period => 1}, - - ExporterOpts = proplists:get_value(exporter, Opts, []), - Exporter = #{id => ot_exporter, - start => {ot_exporter, start_link, [ExporterOpts]}, - restart => permanent, - shutdown => 1000, - type => worker, - modules => [ot_exporter]}, - - ChildSpecs = [#{id => ot_span_sup, - start => {ot_span_sup, start_link, [Opts]}, - type => supervisor}, - Exporter | Children], + intensity => 1, + period => 5}, + + RegistrySup = #{id => ot_registry_sup, + start => {ot_registry_sup, start_link, [Opts]}, + restart => permanent, + shutdown => 1000, + type => supervisor, + modules => [ot_registry_sup]}, + + ChildSpecs = [RegistrySup], {ok, {SupFlags, ChildSpecs}}. %% internal functions diff --git a/src/ot_batch_processor.erl b/src/ot_batch_processor.erl new file mode 100644 index 000000000..e367fb1ae --- /dev/null +++ b/src/ot_batch_processor.erl @@ -0,0 +1,293 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2019, OpenTelemetry Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc This module has the behaviour that each exporter must implement +%% and creates the buffer of trace spans to be exported. +%% +%% The exporter process can be configured to export the current finished +%% spans based on timeouts and the size of the finished spans table. +%% +%% Timeouts: +%% exporting_timeout_ms: How long to let the exports run before killing. +%% check_table_size_ms: Timeout to check the size of the export table. +%% scheduled_delay_ms: How often to trigger running the exporters. +%% +%% The size limit of the current table where finished spans are stored can +%% be configured with the `max_queue_size' option. +%% @end +%%%----------------------------------------------------------------------- +-module(ot_batch_processor). + +-behaviour(gen_statem). +-behaviour(ot_span_processor). + +-export([start_link/1, + on_start/2, + on_end/2, + set_exporter/1, + set_exporter/2]). + +-export([init/1, + callback_mode/0, + idle/3, + exporting/3, + terminate/3]). + +-include("opentelemetry.hrl"). +-include_lib("kernel/include/logger.hrl"). + +%% behaviour for exporters to implement +-type opts() :: term(). + +%% Do any initialization of the exporter here and return configuration +%% that will be passed along with a list of spans to the `export' function. +-callback init(term()) -> opts(). + +%% This function is called when the configured interval expires with any +%% spans that have been collected so far and the configuration returned in `init'. +%% Do whatever needs to be done to export each span here, the caller will block +%% until it returns. +-callback export(ets:tid(), opts()) -> ok | success | failed_not_retryable | failed_retryable. + +-record(data, {exporter :: {module(), term()} | undefined, + handed_off_table :: atom() | undefined, + runner_pid :: pid() | undefined, + max_queue_size :: integer() | infinity, + exporting_timeout_ms :: integer(), + check_table_size_ms :: integer() | infinity, + scheduled_delay_ms :: integer()}). + +-define(CURRENT_TABLES_KEY, {?MODULE, current_table}). +-define(TABLE_1, ot_export_table1). +-define(TABLE_2, ot_export_table2). +-define(CURRENT_TABLE, persistent_term:get(?CURRENT_TABLES_KEY)). + +-define(DEFAULT_MAX_QUEUE_SIZE, 2048). +-define(DEFAULT_SCHEDULED_DEPLAY_MS, timer:seconds(5)). +-define(DEFAULT_EXPORTER_TIMEOUT_MS, timer:minutes(5)). +-define(DEFAULT_CHECK_TABLE_SIZE_MS, timer:seconds(1)). + +-define(ENABLED_KEY, {?MODULE, enabled_key}). + +start_link(Opts) -> + gen_statem:start_link({local, ?MODULE}, ?MODULE, [Opts], []). + +%% @equiv set_exporter(Exporter, []). +set_exporter(Exporter) -> + set_exporter(Exporter, []). + +%% @doc Sets the batch exporter `Exporter'. +-spec set_exporter(module(), term()) -> ok. +set_exporter(Exporter, Options) -> + gen_statem:call(?MODULE, {set_exporter, {Exporter, Options}}). + +-spec on_start(opencensus:span(), ot_span_processor:processor_config()) -> opencensus:span(). +on_start(Span, _) -> + Span. + +-spec on_end(opencensus:span(), ot_span_processor:processor_config()) + -> true | dropped | {error, invalid_span} | {error, no_export_buffer}. +on_end(Span=#span{}, _) -> + do_insert(Span); +on_end(_, _) -> + {error, invalid_span}. + +init([Args]) -> + process_flag(trap_exit, true), + + SizeLimit = proplists:get_value(max_queue_size, Args, ?DEFAULT_MAX_QUEUE_SIZE), + ExportingTimeout = proplists:get_value(exporting_timeout_ms, Args, ?DEFAULT_EXPORTER_TIMEOUT_MS), + ScheduledDelay = proplists:get_value(scheduled_delay_ms, Args, ?DEFAULT_SCHEDULED_DEPLAY_MS), + CheckTableSize = proplists:get_value(check_table_size_ms, Args, ?DEFAULT_CHECK_TABLE_SIZE_MS), + + Exporter = init_exporter(proplists:get_value(exporter, Args, undefined)), + + _Tid1 = new_export_table(?TABLE_1), + _Tid2 = new_export_table(?TABLE_2), + persistent_term:put(?CURRENT_TABLES_KEY, ?TABLE_1), + + enable(), + + {ok, idle, #data{exporter=Exporter, + handed_off_table=undefined, + max_queue_size=case SizeLimit of + infinity -> infinity; + _ -> SizeLimit div erlang:system_info(wordsize) + end, + exporting_timeout_ms=ExportingTimeout, + check_table_size_ms=CheckTableSize, + scheduled_delay_ms=ScheduledDelay}}. + +callback_mode() -> + [state_functions, state_enter]. + +idle(enter, _OldState, #data{scheduled_delay_ms=SendInterval}) -> + {keep_state_and_data, [{{timeout, export_spans}, SendInterval, export_spans}]}; +idle(_, export_spans, Data) -> + {next_state, exporting, Data}; +idle(EventType, Event, Data) -> + handle_event_(idle, EventType, Event, Data). + +exporting({timeout, export_spans}, export_spans, _) -> + {keep_state_and_data, [postpone]}; +exporting(enter, _OldState, Data=#data{exporting_timeout_ms=ExportingTimeout, + scheduled_delay_ms=SendInterval}) -> + {OldTableName, RunnerPid} = export_spans(Data), + {keep_state, Data#data{runner_pid=RunnerPid, + handed_off_table=OldTableName}, + [{state_timeout, ExportingTimeout, exporting_timeout}, + {{timeout, export_spans}, SendInterval, export_spans}]}; +exporting(state_timeout, exporting_timeout, Data=#data{handed_off_table=ExportingTable}) -> + %% kill current exporting process because it is taking too long + %% which deletes the exporting table, so create a new one and + %% repeat the state to force another span exporting immediately + Data1 = kill_runner(Data), + new_export_table(ExportingTable), + {repeat_state, Data1}; +%% important to verify runner_pid and FromPid are the same in case it was sent +%% after kill_runner was called but before it had done the unlink +exporting(info, {'EXIT', FromPid, _}, Data=#data{runner_pid=FromPid}) -> + complete_exporting(Data); +%% important to verify runner_pid and FromPid are the same in case it was sent +%% after kill_runner was called but before it had done the unlink +exporting(info, {completed, FromPid}, Data=#data{runner_pid=FromPid}) -> + complete_exporting(Data); +exporting(EventType, Event, Data) -> + handle_event_(exporting, EventType, Event, Data). + +handle_event_(_State, {timeout, check_table_size}, check_table_size, #data{max_queue_size=infinity}) -> + keep_state_and_data; +handle_event_(_State, {timeout, check_table_size}, check_table_size, #data{max_queue_size=MaxQueueSize}) -> + case ets:info(?CURRENT_TABLE, size) of + M when M >= MaxQueueSize -> + disable(), + keep_state_and_data; + _ -> + enable(), + keep_state_and_data + end; +handle_event_(_, {call, From}, {set_exporter, Exporter}, Data=#data{exporter=OldExporter}) -> + shutdown_exporter(OldExporter), + {keep_state, Data#data{exporter=init_exporter(Exporter)}, [{reply, From, ok}]}; +handle_event_(_, _, _, _) -> + keep_state_and_data. + +terminate(_, _, _Data) -> + %% TODO: flush buffers to exporter + ok. + +%% + +enable()-> + persistent_term:put(?ENABLED_KEY, true). + +disable() -> + persistent_term:put(?ENABLED_KEY, false). + +is_enabled() -> + persistent_term:get(?ENABLED_KEY, true). + +do_insert(Span) -> + try + case is_enabled() of + true -> + ets:insert(?CURRENT_TABLE, Span); + _ -> + dropped + end + catch + error:badarg -> + {error, no_batch_span_processor}; + _:_ -> + {error, other} + end. + +complete_exporting(Data=#data{handed_off_table=ExportingTable}) + when ExportingTable =/= undefined -> + new_export_table(ExportingTable), + {next_state, idle, Data#data{runner_pid=undefined, + handed_off_table=undefined}}. + +kill_runner(Data=#data{runner_pid=RunnerPid}) -> + erlang:unlink(RunnerPid), + erlang:exit(RunnerPid, kill), + Data#data{runner_pid=undefined, + handed_off_table=undefined}. + +new_export_table(Name) -> + ets:new(Name, [public, named_table, {write_concurrency, true}, duplicate_bag]). + +init_exporter(undefined) -> + undefined; +init_exporter({ExporterModule, Config}) when is_atom(ExporterModule) -> + case ExporterModule:init(Config) of + {ok, ExporterConfig} -> + {ExporterModule, ExporterConfig}; + ignore -> + undefined + end; +init_exporter(ExporterModule) when is_atom(ExporterModule) -> + init_exporter({ExporterModule, []}). + +shutdown_exporter(undefined) -> + ok; +shutdown_exporter({ExporterModule, Config}) -> + ExporterModule:shutdown(Config). + +export_spans(#data{exporter=Exporter}) -> + CurrentTable = ?CURRENT_TABLE, + NewCurrentTable = case CurrentTable of + ?TABLE_1 -> + ?TABLE_2; + ?TABLE_2 -> + ?TABLE_1 + end, + + %% an atom is a single word so this does not trigger a global GC + persistent_term:put(?CURRENT_TABLES_KEY, NewCurrentTable), + %% set the table to accept inserts + enable(), + + Self = self(), + RunnerPid = erlang:spawn_link(fun() -> send_spans(Self, Exporter) end), + ets:give_away(CurrentTable, RunnerPid, export), + {CurrentTable, RunnerPid}. + +%% Additional benefit of using a separate process is calls to `register` won't +%% timeout if the actual exporting takes longer than the call timeout +send_spans(FromPid, Exporter) -> + receive + {'ETS-TRANSFER', Table, FromPid, export} -> + TableName = ets:rename(Table, current_send_table), + export(Exporter, TableName), + ets:delete(TableName), + completed(FromPid) + end. + +completed(FromPid) -> + FromPid ! {completed, self()}. + +export(undefined, _) -> + true; +export({Exporter, Config}, SpansTid) -> + %% don't let a exporter exception crash us + %% and return true if exporter failed + try + Exporter:export(SpansTid, Config) =:= failed_not_retryable + catch + Class:Exception:StackTrace -> + ?LOG_INFO("exporter threw exception: exporter=~p ~p:~p stacktrace=~p", + [Exporter, Class, Exception, StackTrace]), + true + end. diff --git a/src/ot_exporter.erl b/src/ot_exporter.erl index b1779e74a..67ec136af 100644 --- a/src/ot_exporter.erl +++ b/src/ot_exporter.erl @@ -12,247 +12,11 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %% -%% @doc This module has the behaviour that each exporter must implement -%% and creates the buffer of trace spans to be exported. -%% -%% The exporter process can be configured to export the current finished -%% spans based on timeouts and the size of the finished spans table. -%% -%% Timeouts: -%% exporting_timeout_ms: How long to let the exports run before killing. -%% check_table_size_ms: Timeout to check the size of the export table. -%% scheduled_delay_ms: How often to trigger running the exporters. -%% -%% The size limit of the current table where finished spans are stored can -%% be configured with the `max_queue_size' option. +%% @doc %% @end %%%----------------------------------------------------------------------- -module(ot_exporter). --behaviour(gen_statem). - --compile({no_auto_import, [register/2]}). - --export([start_link/1, - store_span/1, - register/1, - register/2]). - --export([init/1, - callback_mode/0, - idle/3, - exporting/3, - terminate/3]). - --include("opentelemetry.hrl"). --include_lib("kernel/include/logger.hrl"). - -%% behaviour for exporters to implement --type opts() :: term(). - -%% Do any initialization of the exporter here and return configuration -%% that will be passed along with a list of spans to the `export' function. --callback init(term()) -> opts(). - -%% This function is called when the configured interval expires with any -%% spans that have been collected so far and the configuration returned in `init'. -%% Do whatever needs to be done to export each span here, the caller will block -%% until it returns. --callback export(ets:tid(), opts()) -> ok | success | failed_not_retryable | failed_retryable. - --record(data, {exporters :: [{module(), term()}], - handed_off_table :: atom() | undefined, - runner_pid :: pid() | undefined, - max_queue_size :: integer() | infinity, - exporting_timeout_ms :: integer(), - check_table_size_ms :: integer() | infinity, - scheduled_delay_ms :: integer()}). - --define(CURRENT_TABLES_KEY, {?MODULE, current_table}). --define(TABLE_1, ot_export_table1). --define(TABLE_2, ot_export_table2). --define(CURRENT_TABLE, persistent_term:get(?CURRENT_TABLES_KEY)). - --define(DEFAULT_MAX_QUEUE_SIZE, 2048). --define(DEFAULT_SCHEDULED_DEPLAY_MS, timer:seconds(5)). --define(DEFAULT_EXPORTING_TIMEOUT, timer:minutes(5)). --define(DEFAULT_CHECK_TABLE_SIZE_INTERVAL, infinity). - -start_link(Opts) -> - gen_statem:start_link({local, ?MODULE}, ?MODULE, [Opts], []). - -%% @equiv register(Exporter, []). -register(Exporter) -> - register(Exporter, []). - -%% @doc Register new traces exporter `Exporter' with `Config'. --spec register(module(), term()) -> ok. -register(Exporter, Options) -> - gen_statem:call(?MODULE, {register, init_exporter({Exporter, Options})}). - --spec store_span(opencensus:span()) -> true | {error, invalid_span} | {error, no_export_buffer}. -store_span(Span=#span{}) -> - try - ets:insert(?CURRENT_TABLE, Span) - catch - error:badarg -> - {error, no_export_buffer} - end; -store_span(_) -> - {error, invalid_span}. - -init([Args]) -> - process_flag(trap_exit, true), - - SizeLimit = proplists:get_value(max_queue_size, Args, ?DEFAULT_MAX_QUEUE_SIZE), - ExportingTimeout = proplists:get_value(exporting_timeout_ms, Args, ?DEFAULT_EXPORTING_TIMEOUT), - ScheduledDelay = proplists:get_value(scheduled_delay_ms, Args, ?DEFAULT_SCHEDULED_DEPLAY_MS), - CheckTableSize = proplists:get_value(check_table_size_ms, Args, ?DEFAULT_CHECK_TABLE_SIZE_INTERVAL), - Exporters = [init_exporter(Config) || Config <- proplists:get_value(exporters, Args, [])], - - _Tid1 = new_export_table(?TABLE_1), - _Tid2 = new_export_table(?TABLE_2), - persistent_term:put(?CURRENT_TABLES_KEY, ?TABLE_1), - - {ok, idle, #data{exporters=Exporters, - handed_off_table=undefined, - max_queue_size=case SizeLimit of - infinity -> infinity; - _ -> SizeLimit div erlang:system_info(wordsize) - end, - exporting_timeout_ms=ExportingTimeout, - check_table_size_ms=CheckTableSize, - scheduled_delay_ms=ScheduledDelay}}. - -callback_mode() -> - [state_functions, state_enter]. - -idle(enter, _OldState, #data{scheduled_delay_ms=SendInterval}) -> - {keep_state_and_data, [{{timeout, export_spans}, SendInterval, export_spans}]}; -idle(_, export_spans, Data) -> - {next_state, exporting, Data}; -idle(EventType, Event, Data) -> - handle_event_(idle, EventType, Event, Data). - -exporting({timeout, export_spans}, export_spans, _) -> - {keep_state_and_data, [postpone]}; -exporting(enter, _OldState, Data=#data{exporting_timeout_ms=ExportingTimeout, - scheduled_delay_ms=SendInterval}) -> - {OldTableName, RunnerPid} = export_spans(Data), - {keep_state, Data#data{runner_pid=RunnerPid, - handed_off_table=OldTableName}, - [{state_timeout, ExportingTimeout, exporting_timeout}, - {{timeout, export_spans}, SendInterval, export_spans}]}; -exporting(state_timeout, exporting_timeout, Data=#data{handed_off_table=ExportingTable}) -> - %% kill current exporting process because it is taking too long - %% which deletes the exporting table, so create a new one and - %% repeat the state to force another span exporting immediately - Data1 = kill_runner(Data), - new_export_table(ExportingTable), - {repeat_state, Data1}; -%% important to verify runner_pid and FromPid are the same in case it was sent -%% after kill_runner was called but before it had done the unlink -exporting(info, {'EXIT', FromPid, _}, Data=#data{runner_pid=FromPid}) -> - complete_exporting([], Data); -%% important to verify runner_pid and FromPid are the same in case it was sent -%% after kill_runner was called but before it had done the unlink -exporting(info, {completed, FromPid, FailedExporters}, Data=#data{runner_pid=FromPid}) -> - complete_exporting(FailedExporters, Data); -exporting(EventType, Event, Data) -> - handle_event_(exporting, EventType, Event, Data). - -handle_event_(_State, {timeout, check_table_size}, check_table_size, #data{max_queue_size=infinity}) -> - keep_state_and_data; -handle_event_(State, {timeout, check_table_size}, check_table_size, Data=#data{max_queue_size=SizeLimit}) -> - case ets:info(?CURRENT_TABLE, memory) of - M when M >= SizeLimit, State =:= idle -> - Data1 = kill_runner(Data), - {next_state, exporting, Data1}; - M when M >= SizeLimit, State =:= exporting -> - Data1 = kill_runner(Data), - {repeat_state, Data1}; - _ -> - keep_state_and_data - end; -handle_event_(_, {call, From}, {register, Exporter}, Data=#data{exporters=Exporters}) -> - {keep_state, Data#data{exporters=[Exporter | Exporters]}, [{reply, From, ok}]}; -handle_event_(_, _, _, _) -> - keep_state_and_data. - -terminate(_, _, _Data) -> - ok. - -%% - -complete_exporting(FailedExporters, Data=#data{exporters=Exporters, - handed_off_table=ExportingTable}) - when ExportingTable =/= undefined -> - new_export_table(ExportingTable), - {next_state, idle, Data#data{exporters=Exporters--FailedExporters, - runner_pid=undefined, - handed_off_table=undefined}}. - -kill_runner(Data=#data{runner_pid=RunnerPid}) -> - erlang:unlink(RunnerPid), - erlang:exit(RunnerPid, kill), - Data#data{runner_pid=undefined, - handed_off_table=undefined}. - -new_export_table(Name) -> - ets:new(Name, [public, named_table, {write_concurrency, true}, duplicate_bag]). - -init_exporter({Exporter, Config}) when is_atom(Exporter) -> - {fun Exporter:export/2, Exporter:init(Config)}; -init_exporter(Exporter) when is_atom(Exporter) -> - {fun Exporter:export/2, Exporter:init([])}; -init_exporter(Exporter) when is_function(Exporter) -> - {Exporter, []}; -init_exporter({Exporter, Config}) when is_function(Exporter) -> - {Exporter, Config}. - -export_spans(#data{exporters=Exporters}) -> - CurrentTable = ?CURRENT_TABLE, - NewCurrentTable = case CurrentTable of - ?TABLE_1 -> - ?TABLE_2; - ?TABLE_2 -> - ?TABLE_1 - end, - - %% an atom is a single word so this does not trigger a global GC - persistent_term:put(?CURRENT_TABLES_KEY, NewCurrentTable), - - Self = self(), - RunnerPid = erlang:spawn_link(fun() -> send_spans(Self, Exporters) end), - ets:give_away(CurrentTable, RunnerPid, export), - {CurrentTable, RunnerPid}. - -%% Additional benefit of using a separate process is calls to `register` won't -%% timeout if the actual exporting takes longer than the call timeout -send_spans(FromPid, Exporters) -> - receive - {'ETS-TRANSFER', Table, FromPid, export} -> - TableName = ets:rename(Table, current_send_table), - FailedExporters = lists:filtermap(fun({Exporter, Config}) -> - export(Exporter, TableName, Config) - end, Exporters), - ets:delete(TableName), - completed(FromPid, FailedExporters) - end. - -completed(FromPid, FailedExporters) -> - FromPid ! {completed, self(), FailedExporters}. - -export(undefined, _, _) -> - true; -export(Exporter, SpansTid, Config) -> - %% don't let a exporter exception crash us - %% and return true if exporter failed - try - Exporter(SpansTid, Config) =:= failed_not_retryable - catch - Class:Exception:StackTrace -> - ?LOG_INFO("dropping exporter that threw exception: exporter=~p ~p:~p stacktrace=~p", - [Exporter, Class, Exception, StackTrace]), - true - end. +-callback init(term()) -> {ok, term()} | ignore. +-callback export(ets:tid(), term()) -> ok | success | failed_not_retryable | failed_retryable. +-callback shutdown(term()) -> ok. diff --git a/src/ot_exporter_pid.erl b/src/ot_exporter_pid.erl index 1b72aa5b3..abd87f61c 100644 --- a/src/ot_exporter_pid.erl +++ b/src/ot_exporter_pid.erl @@ -20,13 +20,17 @@ -behaviour(ot_exporter). -export([init/1, - export/2]). + export/2, + shutdown/1]). init(Pid) -> - Pid. + {ok, Pid}. export(SpansTid, Pid) -> ets:foldl(fun(Span, _Acc) -> Pid ! {span, Span} end, [], SpansTid), ok. + +shutdown(_) -> + ok. diff --git a/src/ot_exporter_stdout.erl b/src/ot_exporter_stdout.erl index 77e9fd899..aa87fb8e4 100644 --- a/src/ot_exporter_stdout.erl +++ b/src/ot_exporter_stdout.erl @@ -20,13 +20,17 @@ -behaviour(ot_exporter). -export([init/1, - export/2]). + export/2, + shutdown/1]). init(_) -> - ok. + {ok, []}. export(SpansTid, _) -> ets:foldl(fun(Span, _Acc) -> io:format("~p~n", [Span]) end, [], SpansTid), ok. + +shutdown(_) -> + ok. diff --git a/src/ot_exporter_tab.erl b/src/ot_exporter_tab.erl index fccff10ab..d42854538 100644 --- a/src/ot_exporter_tab.erl +++ b/src/ot_exporter_tab.erl @@ -20,13 +20,17 @@ -behaviour(ot_exporter). -export([init/1, - export/2]). + export/2, + shutdown/1]). init(Tid) -> - Tid. + {ok, Tid}. export(SpansTid, Tid) -> ets:foldl(fun(Span, _Acc) -> ets:insert(Tid, Span) end, [], SpansTid), ok. + +shutdown(_) -> + ok. diff --git a/src/ot_registry_api.erl b/src/ot_registry_api.erl new file mode 100644 index 000000000..409eceebb --- /dev/null +++ b/src/ot_registry_api.erl @@ -0,0 +1,27 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2019, OpenTelemetry Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc +%% @end +%%%------------------------------------------------------------------------- +-module(ot_registry_api). + +-export([get/0, + get/1]). + +get() -> + ot_tracer_noop. + +get(_Name) -> + ot_tracer_noop. diff --git a/src/ot_registry_sup.erl b/src/ot_registry_sup.erl new file mode 100644 index 000000000..ced7ceaf7 --- /dev/null +++ b/src/ot_registry_sup.erl @@ -0,0 +1,57 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2019, OpenTelemetry Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc +%% @end +%%%------------------------------------------------------------------------- +-module(ot_registry_sup). + +-behaviour(supervisor). + +-export([start_link/1]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link(Opts) -> + supervisor:start_link({local, ?SERVER}, ?MODULE, [Opts]). + +init([Opts]) -> + SupFlags = #{strategy => one_for_one, + intensity => 1, + period => 5}, + + TracerRegistry = #{id => ot_registry_tracer, + start => {ot_registry_tracer, start_link, [Opts]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [ot_registry_tracer]}, + + Processors = proplists:get_value(processors, Opts, []), + BatchProcessorOpts = proplists:get_value(ot_batch_processor, Processors, []), + BatchProcessor = #{id => ot_batch_processor, + start => {ot_batch_processor, start_link, [BatchProcessorOpts]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [ot_batch_processor]}, + + SpanSup = #{id => ot_span_sup, + start => {ot_span_sup, start_link, [Opts]}, + type => supervisor}, + + ChildSpecs = [BatchProcessor, SpanSup, TracerRegistry], + {ok, {SupFlags, ChildSpecs}}. diff --git a/src/ot_registry_tracer.erl b/src/ot_registry_tracer.erl new file mode 100644 index 000000000..20a8489b6 --- /dev/null +++ b/src/ot_registry_tracer.erl @@ -0,0 +1,90 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2019, OpenTelemetry Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc +%% @end +%%%------------------------------------------------------------------------- +-module(ot_registry_tracer). + +-behaviour(gen_server). + +-export([start_link/1, + get/0, + get/1, + get_sampler/0, + on_start/1, + on_end/1, + add_span_processor/1]). + +-export([init/1, + handle_call/3, + handle_cast/2]). + +-include("ot_tracer.hrl"). + +-record(state, {processors :: [module()], + sampler :: ot_sampler:sampler()}). + +start_link(Opts) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, Opts, []). + +get() -> + ot_tracer_default. + +get(_Name) -> + ot_tracer_default. + +get_sampler() -> + persistent_term:get({?MODULE, sampler}). + +on_start(Span) -> + call_processors(on_start, Span). + +on_end(Span) -> + call_processors(on_end, Span). + +call_processors(F, Span) -> + Processors = persistent_term:get({?MODULE, processors}, []), + [P:F(Span, Config) || {P, Config} <- Processors]. + +add_span_processor(SpanProcessor) -> + gen_server:call(?MODULE, {add_span_processor, SpanProcessor}). + +init(Opts) -> + process_flag(trap_exit, true), + + {Sampler, SamplerOpts} = proplists:get_value(sampler, Opts, {always_on, #{}}), + SamplerFun = ot_sampler:setup(Sampler, SamplerOpts), + persistent_term:put({?MODULE, sampler}, SamplerFun), + + opentelemetry:set_default_tracer_registry(?MODULE), + + Processors = proplists:get_value(processors, Opts, []), + persistent_term:put({?MODULE, processors}, Processors), + + %% DO NOT LIKE + persistent_term:put(?CTX_IMPL_KEY, ot_ctx_pdict), + persistent_term:put(?SPAN_IMPL_KEY, ot_span_ets), + + {ok, #state{sampler=SamplerFun, + processors=Processors}}. + +handle_call({add_span_processor, SpanProcessor}, _From, State=#state{processors=Processors}) -> + %% do something with these + {reply, ok, State#state{processors=Processors++[SpanProcessor]}}; +handle_call(_Msg, _From, State) -> + {noreply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. diff --git a/src/ot_span_ets.erl b/src/ot_span_ets.erl index 6bd9ebceb..a822f0c66 100644 --- a/src/ot_span_ets.erl +++ b/src/ot_span_ets.erl @@ -47,7 +47,8 @@ start_link(Opts) -> -spec start_span(opentelemetry:span_name(), ot_span:start_opts()) -> opentelemetry:span_ctx(). start_span(Name, Opts) -> {SpanCtx, Span} = ot_span_utils:start_span(Name, Opts), - _ = storage_insert(Span), + Span1 = ot_registry_tracer:on_start(Span), + _ = storage_insert(Span1), SpanCtx. %% @doc Finish a span based on its context and send to reporter. @@ -58,7 +59,7 @@ finish_span(#span_ctx{span_id=SpanId, case ets:take(?SPAN_TAB, SpanId) of [Span] -> Span1 = ot_span_utils:end_span(Span#span{tracestate=Tracestate}), - ot_exporter:store_span(Span1); + ot_registry_tracer:on_end(Span1); _ -> false end; @@ -66,8 +67,14 @@ finish_span(_) -> ok. -spec get_ctx(opentelemetry:span()) -> opentelemetry:span_ctx(). -get_ctx(_Span) -> - #span_ctx{}. +get_ctx(#span{trace_id=TraceId, + span_id=SpanId, + tracestate=TraceState, + is_recorded=IsRecorded}) -> + #span_ctx{trace_id=TraceId, + span_id=SpanId, + tracestate=TraceState, + is_recorded=IsRecorded}. -spec is_recording_events(opentelemetry:span_ctx()) -> boolean(). is_recording_events(#span_ctx{is_recorded=IsRecorded}) -> diff --git a/src/ot_span_processor.erl b/src/ot_span_processor.erl new file mode 100644 index 000000000..a5f75a539 --- /dev/null +++ b/src/ot_span_processor.erl @@ -0,0 +1,27 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2019, OpenTelemetry Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc +%% @end +%%%------------------------------------------------------------------------- +-module(ot_span_processor). + +-export([]). + +-type processor_config() :: term(). + +-export_type([processor_config/0]). + +-callback on_start(opentelemetry:span(), processor_config()) -> ok. +-callback on_end(opentelemetry:span(), processor_config()) -> ok. diff --git a/src/ot_span_sup.erl b/src/ot_span_sup.erl index 439c06dcd..b8babf9b1 100644 --- a/src/ot_span_sup.erl +++ b/src/ot_span_sup.erl @@ -33,18 +33,26 @@ start_child(ChildSpec) -> supervisor:start_child(?SERVER, ChildSpec). init([Opts]) -> + SupFlags = #{strategy => one_for_one, + intensity => 1, + period => 5}, + SweeperOpts = proplists:get_value(sweeper, Opts, #{}), Sweeper = #{id => ot_span_sweeper, start => {ot_span_sweeper, start_link, [SweeperOpts]}, restart => permanent, - shutdown => 1000, + shutdown => 5000, type => worker, modules => [ot_span_sweeper]}, - SupFlags = #{strategy => one_for_one, - intensity => 0, - period => 1}, - ChildSpecs = [Sweeper], + SpanHandler = #{id => ot_span_ets, + start => {ot_span_ets, start_link, [[]]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [ot_span_ets]}, + + ChildSpecs = [SpanHandler, Sweeper], {ok, {SupFlags, ChildSpecs}}. %% internal functions diff --git a/src/ot_span_sweeper.erl b/src/ot_span_sweeper.erl index bb3dbcad4..ea3f3df67 100644 --- a/src/ot_span_sweeper.erl +++ b/src/ot_span_sweeper.erl @@ -160,4 +160,4 @@ expired_match_spec(Time, Return) -> finish_span(Span=#span{tracestate=Tracestate}) -> %% hack to not lose tracestate when finishing without span ctx Span1 = ot_span_utils:end_span(Span#span{tracestate=Tracestate}), - ot_exporter:store_span(Span1). + ot_registry_tracer:on_end(Span1). diff --git a/src/ot_tracer.erl b/src/ot_tracer.erl index 459ec7337..c0e039b03 100644 --- a/src/ot_tracer.erl +++ b/src/ot_tracer.erl @@ -17,28 +17,20 @@ %%%------------------------------------------------------------------------- -module(ot_tracer). --export([setup/3, - start_span/1, +-export([start_span/1, start_span/2, start_span/3, with_span/1, with_span/2, + with_span/3, current_span_ctx/0, finish/0, get_binary_format/0, get_http_text_format/0]). -include("opentelemetry.hrl"). +-include("ot_tracer.hrl"). --record(state, {tracer :: module(), - sampler :: ot_sampler:sampler()}). - --define(state, (persistent_term:get({?MODULE, state}))). --define(tracer, ?state#state.tracer). --define(sampler, ?state#state.sampler). --define(CURRENT_TRACER, {?MODULE, current_tracer}). - --callback setup(map()) -> [supervisor:child_spec()]. -callback start_span(opentelemetry:span_name(), ot_span:start_opts()) -> opentelemetry:span_ctx(). -callback with_span(opentelemetry:span_ctx()) -> ok. -callback finish() -> ok. @@ -47,67 +39,51 @@ -callback get_binary_format() -> binary(). -callback get_http_text_format() -> opentelemetry:http_headers(). --spec setup(module(), map(), ot_sampler:sampler()) -> [supervisor:child_spec()]. -setup(Tracer, TracerOpts, Sampler) -> - persistent_term:put({?MODULE, state}, #state{tracer=Tracer, - sampler=Sampler}), - Tracer:setup(TracerOpts). - -spec start_span(opentelemetry:span_name()) -> opentelemetry:span_ctx(). start_span(Name) -> start_span(Name, #{}). -spec start_span(opentelemetry:span_name(), ot_span:start_opts()) -> opentelemetry:span_ctx(). -start_span(Name, Opts) -> - start_span(?state, Name, Opts). +start_span(SpanName, Opts) -> + Tracer = opentelemetry:get_tracer(), + start_span(Tracer, SpanName, Opts). -spec start_span(module(), opentelemetry:span_name(), ot_span:start_opts()) -> opentelemetry:span_ctx(). -start_span(#state{tracer=Tracer}, Name, Opts) when is_map_key(sampler, Opts)-> - ot_ctx_pdict:with_value(?CURRENT_TRACER, Tracer), - Tracer:start_span(Name, Opts); -start_span(#state{tracer=Tracer, - sampler=Sampler}, Name, Opts) -> - ot_ctx_pdict:with_value(?CURRENT_TRACER, Tracer), - Tracer:start_span(Name, Opts#{sampler => Sampler}); -start_span(Tracer, Name, Opts) when is_map_key(sampler, Opts)-> - ot_ctx_pdict:with_value(?CURRENT_TRACER, Tracer), - Tracer:start_span(Name, Opts); start_span(Tracer, Name, Opts) -> - ot_ctx:with_value(ot_ctx_pdict, ?CURRENT_TRACER, Tracer), Tracer:start_span(Name, Opts). -spec with_span(opentelemetry:span_ctx()) -> ok. with_span(Span) -> - with_span(?tracer, Span). + Tracer = opentelemetry:get_tracer(), + with_span(Tracer, Span). -spec with_span(module() | opentelemetry:span_ctx(), opentelemetry:span_ctx() | fun()) -> ok. with_span(Span=#span_ctx{}, Fun) -> - Tracer = ot_ctx:get(ot_ctx_pdict, ?CURRENT_TRACER, ?tracer), - with_span(Tracer, Span, Fun); -with_span(Tracer, Span) -> - ot_ctx:with_value(ot_ctx_pdict, ?CURRENT_TRACER, Tracer), + Tracer = opentelemetry:get_tracer(), + Tracer:with_span(Span, Fun); +with_span(Tracer, Span) when is_atom(Tracer) -> Tracer:with_span(Span). -spec with_span(module(), opentelemetry:span_ctx(), fun()) -> ok. -with_span(Tracer, SpanCtx, Fun) -> - ot_ctx:with_value(ot_ctx_pdict, ?CURRENT_TRACER, Tracer, fun() -> Tracer:with_value(SpanCtx, Fun) end). +with_span(Tracer, SpanCtx, Fun) when is_atom(Tracer) -> + Tracer:with_value(SpanCtx, Fun). -spec finish() -> ok. finish() -> - Tracer = ot_ctx:get(ot_ctx_pdict, ?CURRENT_TRACER, ?tracer), + Tracer = opentelemetry:get_tracer(), Tracer:finish(). -spec current_span_ctx() -> opentelemetry:span_ctx(). current_span_ctx() -> - Tracer = ot_ctx:get(ot_ctx_pdict, ?CURRENT_TRACER, ?tracer), + Tracer = opentelemetry:get_tracer(), Tracer:current_span_ctx(). -spec get_binary_format() -> binary(). get_binary_format() -> - Tracer = ot_ctx:get(ot_ctx_pdict, ?CURRENT_TRACER, ?tracer), + Tracer = opentelemetry:get_tracer(), Tracer:get_binary_format(). -spec get_http_text_format() -> opentelemetry:http_headers(). get_http_text_format() -> - Tracer = ot_ctx:get(ot_ctx_pdict, ?CURRENT_TRACER, ?tracer), + Tracer = opentelemetry:get_tracer(), Tracer:get_http_text_format(). diff --git a/src/ot_tracer.hrl b/src/ot_tracer.hrl new file mode 100644 index 000000000..396b99e7d --- /dev/null +++ b/src/ot_tracer.hrl @@ -0,0 +1,7 @@ +-define(SPAN_CTX, {ot_tracer_default, span_ctx}). +-define(CTX_IMPL_KEY, {ot_tracer_default, ctx}). +-define(SPAN_IMPL_KEY, {ot_tracer_default, span}). + +-define(ctx, (persistent_term:get(?CTX_IMPL_KEY))). +-define(span, (persistent_term:get(?SPAN_IMPL_KEY))). + diff --git a/src/ot_tracer_default.erl b/src/ot_tracer_default.erl index fb42d4fa4..5a302fa2c 100644 --- a/src/ot_tracer_default.erl +++ b/src/ot_tracer_default.erl @@ -19,8 +19,7 @@ -behaviour(ot_tracer). --export([setup/1, - start_span/2, +-export([start_span/2, with_span/1, with_span/2, finish/0, @@ -28,32 +27,12 @@ get_binary_format/0, get_http_text_format/0]). --define(SPAN_CTX, {?MODULE, span_ctx}). --define(CTX_IMPL_KEY, {?MODULE, ctx}). --define(SPAN_IMPL_KEY, {?MODULE, span}). - --define(ctx, (persistent_term:get(?CTX_IMPL_KEY))). --define(span, (persistent_term:get(?SPAN_IMPL_KEY))). +-include("ot_tracer.hrl"). -type pdict_trace_ctx() :: {opentelemetry:span_ctx(), pdict_trace_ctx() | undefined}. --spec setup(map()) -> [supervisor:child_spec()]. -setup(Opts) -> - lists:filtermap(fun({ConfigKey, PersistentKey}) -> - {Module, Args} = maps:get(ConfigKey, Opts), - persistent_term:put(PersistentKey, Module), - case erlang:function_exported(Module, start_link, 1) of - true -> - {true, #{id => Module, - start => {Module, start_link, [Args]}}}; - false -> - false - end - end, [{ctx, ?CTX_IMPL_KEY}, - {span, ?SPAN_IMPL_KEY}]). - -spec start_span(opentelemetry:span_name(), ot_span:start_opts()) -> opentelemetry:span_ctx(). -start_span(Name, Opts) -> +start_span(Name, Opts) when is_map_key(sampler, Opts) -> case ot_ctx:get(?ctx, ?SPAN_CTX) of {SpanCtx, _}=Ctx -> SpanCtx1 = ot_span:start_span(?span, Name, Opts#{parent => SpanCtx}), @@ -63,7 +42,9 @@ start_span(Name, Opts) -> SpanCtx = ot_span:start_span(?span, Name, Opts#{parent => undefined}), ot_ctx:with_value(?ctx, ?SPAN_CTX, {SpanCtx, undefined}), SpanCtx - end. + end; +start_span(Name, Opts) -> + start_span(Name, Opts#{sampler => ot_registry_tracer:get_sampler()}). -spec with_span(opentelemetry:span_ctx()) -> ok. with_span(SpanCtx) -> diff --git a/test/opentelemetry_SUITE.erl b/test/opentelemetry_SUITE.erl index b704de0ae..6dacce56c 100644 --- a/test/opentelemetry_SUITE.erl +++ b/test/opentelemetry_SUITE.erl @@ -13,11 +13,11 @@ all() -> {group, ot_ctx_seqtrace}]. all_testcases() -> - [child_spans, non_default_tracer]. + [child_spans]. groups() -> - [{ot_ctx_pdict, [parallel, shuffle], all_testcases()}, - {ot_ctx_seqtrace, [parallel, shuffle], all_testcases()}]. + [{ot_ctx_pdict, [shuffle], all_testcases()}, + {ot_ctx_seqtrace, [shuffle], all_testcases()}]. init_per_suite(Config) -> application:load(opentelemetry), @@ -26,13 +26,9 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok. -init_per_group(CtxModule, Config) -> - application:set_env(opentelemetry, tracer, {ot_tracer_default, #{span => {ot_span_ets, []}, - ctx => {CtxModule, []}}}), - application:set_env(opentelemetry, exporter, [{exporters, []}, - {scheduled_delay_ms, 1}]), +init_per_group(_CtxModule, Config) -> + application:set_env(opentelemetry, processors, [{ot_batch_processor, [{scheduled_delay_ms, 1}]}]), {ok, _} = application:ensure_all_started(opentelemetry), - Config. end_per_group(_, _Config) -> @@ -42,7 +38,7 @@ init_per_testcase(_, Config) -> %% adds an exporter for a new table %% spans will be exported to a separate table for each of the test cases Tid = ets:new(exported_spans, [public, bag]), - ot_exporter:register(ot_exporter_tab, Tid), + ot_batch_processor:set_exporter(ot_exporter_tab, Tid), [{tid, Tid} | Config]. end_per_testcase(_, _Config) -> @@ -83,22 +79,22 @@ child_spans(Config) -> assert_all_exported(Tid, [SpanCtx1, SpanCtx2, SpanCtx3, SpanCtx4]). -non_default_tracer(Config) -> - Tid = ?config(tid, Config), +%% non_default_tracer(Config) -> +%% Tid = ?config(tid, Config), - SpanCtx1 = ot_tracer:start_span(<<"span-1">>), - ?assertNotMatch(#span_ctx{trace_id=0, - span_id=0}, SpanCtx1), - ot_tracer:finish(), +%% SpanCtx1 = ot_tracer:start_span(<<"span-1">>), +%% ?assertNotMatch(#span_ctx{trace_id=0, +%% span_id=0}, SpanCtx1), +%% ot_tracer:finish(), - SpanCtx2 = ot_tracer:start_span(ot_tracer_noop, <<"span-2">>, #{}), - ?assertMatch(#span_ctx{trace_id=0, - span_id=0}, SpanCtx2), - ?assertMatch(SpanCtx2, ot_tracer:current_span_ctx()), - ot_tracer:finish(), +%% SpanCtx2 = ot_tracer:start_span(ot_tracer_noop, <<"span-2">>, #{}), +%% ?assertMatch(#span_ctx{trace_id=0, +%% span_id=0}, SpanCtx2), +%% ?assertMatch(SpanCtx2, ot_tracer:current_span_ctx()), +%% ot_tracer:finish(), - assert_exported(Tid, SpanCtx1), - assert_not_exported(Tid, SpanCtx2). +%% assert_exported(Tid, SpanCtx1), +%% assert_not_exported(Tid, SpanCtx2). %% diff --git a/test/ot_exporter_SUITE.erl b/test/ot_batch_processor_SUITE.erl similarity index 55% rename from test/ot_exporter_SUITE.erl rename to test/ot_batch_processor_SUITE.erl index 0fe836dc9..75da8d80a 100644 --- a/test/ot_exporter_SUITE.erl +++ b/test/ot_batch_processor_SUITE.erl @@ -1,4 +1,4 @@ --module(ot_exporter_SUITE). +-module(ot_batch_processor_SUITE). -compile(export_all). @@ -16,14 +16,25 @@ all() -> exporting_timeout_test(_Config) -> process_flag(trap_exit, true), - {ok, Pid} = ot_exporter:start_link([{exporters, [fun(_, _) -> timer:sleep(timer:minutes(10)) end]}, - {exporting_timeout_ms, 1}, - {scheduled_delay_ms, 1}]), + {ok, Pid} = ot_batch_processor:start_link([{exporter, ?MODULE}, + {exporting_timeout_ms, 1}, + {scheduled_delay_ms, 1}]), receive {'EXIT', Pid, _} -> - ct:fail(exporter_crash) + ct:fail(batch_processor_crash) after 100 -> ok end. + +%% exporter behaviour + +init(_) -> + {ok, []}. + +export(_, _) -> + timer:sleep(timer:minutes(10)). + +shutdown(_) -> + ok. diff --git a/test/ot_sweeper_SUITE.erl b/test/ot_sweeper_SUITE.erl index d2367d9e2..cb269f9e8 100644 --- a/test/ot_sweeper_SUITE.erl +++ b/test/ot_sweeper_SUITE.erl @@ -31,21 +31,22 @@ init_per_testcase(storage_size, Config) -> strategy => finish, span_ttl => 500, storage_size => 100}), - application:set_env(opentelemetry, tracer, {ot_tracer_default, #{span => {ot_span_ets, []}, - ctx => {ot_ctx_pdict, []}}}), - application:set_env(opentelemetry, exporter, [{exporters, [{ot_exporter_pid, self()}]}, - {scheduled_delay_ms, 1}]), + application:set_env(opentelemetry, tracer, ot_tracer_default), + application:set_env(opentelemetry, processors, [{ot_batch_processor, [{scheduled_delay_ms, 1}]}]), {ok, _} = application:ensure_all_started(opentelemetry), + + ot_batch_processor:set_exporter(ot_exporter_pid, self()), Config; init_per_testcase(Type, Config) -> application:set_env(opentelemetry, sweeper, #{interval => 250, strategy => Type, span_ttl => 500}), - application:set_env(opentelemetry, tracer, {ot_tracer_default, #{span => {ot_span_ets, []}, - ctx => {ot_ctx_pdict, []}}}), - application:set_env(opentelemetry, exporter, [{exporters, [{ot_exporter_pid, self()}]}, - {scheduled_delay_ms, 1}]), + application:set_env(opentelemetry, tracer, ot_tracer_default), + application:set_env(opentelemetry, processors, [{ot_batch_processor, [{scheduled_delay_ms, 1}]}]), {ok, _} = application:ensure_all_started(opentelemetry), + + ot_batch_processor:set_exporter(ot_exporter_pid, self()), + Config. end_per_testcase(_, _Config) ->