From d7037bacade3ebe72ae656816b0dda7da6f49222 Mon Sep 17 00:00:00 2001 From: Tristan Sloughter Date: Thu, 7 Nov 2019 09:15:59 -0700 Subject: [PATCH] split the API into a separate application This patch moves the API modules to another application, opentelemetry_api. This repo is now the SDK implementation and when booted will start the ot_tracer_server which is used to serialize configuration of the tracer and set the default_tracer through the API. --- README.md | 36 ++- include/opentelemetry.hrl | 151 --------- rebar.config | 4 +- src/opentelemetry.app.src | 13 +- src/opentelemetry.erl | 115 ------- src/opentelemetry_app.erl | 13 +- src/opentelemetry_sup.erl | 53 ++-- src/ot_batch_processor.erl | 293 ++++++++++++++++++ src/ot_ctx.erl | 53 ---- src/ot_exporter.erl | 244 +-------------- src/ot_exporter_pid.erl | 8 +- src/ot_exporter_stdout.erl | 8 +- src/ot_exporter_tab.erl | 8 +- src/ot_sampler.erl | 2 +- src/ot_span.erl | 80 ----- src/ot_span_ets.erl | 52 ++-- src/ot_span_ets.hrl | 1 + src/ot_span_processor.erl | 27 ++ src/ot_span_sup.erl | 18 +- src/ot_span_sweeper.erl | 23 +- src/ot_span_utils.erl | 2 +- src/ot_tracer.erl | 113 ------- src/ot_tracer.hrl | 12 + src/ot_tracer_default.erl | 91 +++--- src/ot_tracer_noop.erl | 69 ----- src/ot_tracer_server.erl | 65 ++++ test/opentelemetry_SUITE.erl | 93 +++--- ...SUITE.erl => ot_batch_processor_SUITE.erl} | 23 +- test/ot_samplers_SUITE.erl | 2 +- test/ot_sweeper_SUITE.erl | 55 ++-- 30 files changed, 702 insertions(+), 1025 deletions(-) delete mode 100644 include/opentelemetry.hrl delete mode 100644 src/opentelemetry.erl create mode 100644 src/ot_batch_processor.erl delete mode 100644 src/ot_ctx.erl delete mode 100644 src/ot_span.erl create mode 100644 src/ot_span_ets.hrl create mode 100644 src/ot_span_processor.erl delete mode 100644 src/ot_tracer.erl create mode 100644 src/ot_tracer.hrl delete mode 100644 src/ot_tracer_noop.erl create mode 100644 src/ot_tracer_server.erl rename test/{ot_exporter_SUITE.erl => ot_batch_processor_SUITE.erl} (50%) diff --git a/README.md b/README.md index 5bf09ba3..4ed6e1a9 100644 --- a/README.md +++ b/README.md @@ -7,11 +7,41 @@ Requires OTP 21.3 of above. ## Design -The [OpenTelemetry specification](https://github.com/open-telemetry/opentelemetry-specification) defines a language library as having 2 components, the API and the SDK. The API must not only define the interfaces of any implementation in that language but also be able to function as a minimal implementation. The SDK is the default implementation of the API that must be optional. +The [OpenTelemetry specification](https://github.com/open-telemetry/opentelemetry-specification) defines a language library as having 2 components, the API and the SDK. The API must not only define the interfaces of any implementation in that language but also be able to function as a noop implementation of the tracer. The SDK is the default implementation of the API that must be optional. -In this library the API is defined as the behaviours `ot_tracer`, `ot_span` and `ot_ctx`. It is not a separate Erlang application from the SDK, nor are there subdirectories following the [layout described in the spec](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/library-layout.md). Not using subdirectories as the spec's layout describes is simply because the OTP application structure has all source files directly under `src/`. There is no separation of API and SDK into distinct libraries because a) there is not yet support in rebar3 for using git repos with multiple applications as dependencies b) since Erlang has a flat namespace the same module naming strategy would be used whether they are separate applications or not. Additionally, a release can be configured to drop any part of an application the user chooses. So in the unlikely event there are users who wish to not include the SDK's default implementation it is possible. +If you are instrumenting a project your application should only depend on the [OpenTelemetry API](https://github.com/open-telemetry/opentelemetry-erlang-api/) application. -### Default Tracer +This repository is the Erlang's SDK implementation and should be included in the final release and configured to setup the sampler, span processors and span exporters. + +## Using + +In an Erlang project add `opentelemetry` as the first element of the release's applications: + +``` erlang +{relx, [{release, {, }, + [{opentelemetry, temporary}, + ]}, + ...]}. +``` + +In the above example `opentelemetry` is set to `temporary` so that if the `opentelemetry` application crashes, or is shutdown, it does not terminate the other applications in the project. This is optional, the `opentelemetry` application purposely sticks to `permanent` for the processes started by the root supervisor to leave it up to the end user whether they want the crash or shutdown or `opentelemetry` to be ignored or cause the shutdown of the rest of the applications in the release. + +Doing the same for an Elixir project would be: + +``` elixir +def project do + [ + ... + releases: [ + : [ + applications: [opentelemetry: :temporary] + ], + + ... + ] + ] +end +``` ## Benchmarks diff --git a/include/opentelemetry.hrl b/include/opentelemetry.hrl deleted file mode 100644 index 4f58f0ec..00000000 --- a/include/opentelemetry.hrl +++ /dev/null @@ -1,151 +0,0 @@ -%%%------------------------------------------------------------------------ -%% Copyright 2019, OpenTelemetry Authors -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%%------------------------------------------------------------------------ - -%% These records are based on protos found in the opentelemetry-proto repo: -%% src/opentelemetry/proto/trace/v1/trace.proto -%% They are not exact translations because further processing is done after -%% the span has finished and can be vendor specific. For example, there is -%% no count of the number of dropped attributes in the span record. And -%% an attribute's value can be a function to only evaluate the value if it -%% is actually used (at the time of exporting). - -%% for use in guards: sampling bit is the first bit in 8-bit trace options --define(IS_SPAN_ENABLED(TraceOptions), (TraceOptions band 1) =/= 0). - --define(MESSAGE_EVENT_TYPE_UNSPECIFIED, 'TYPE_UNSPECIFIED'). --define(MESSAGE_EVENT_TYPE_SENT, 'SENT'). --define(MESSAGE_EVENT_TYPE_RECEIVED, 'RECEIVED'). - --define(SPAN_KIND_UNSPECIFIED, 'SPAN_KIND_UNSPECIFIED'). --define(SPAN_KIND_INTERNAL, 'INTERNAL'). --define(SPAN_KIND_SERVER, 'SERVER'). --define(SPAN_KIND_CLIENT, 'CLIENT'). --define(SPAN_KIND_PRODUCER, 'PRODUCER'). --define(SPAN_KIND_CONSUMER, 'CONSUMER'). - --record(span_ctx, { - %% 128 bit int trace id - trace_id :: opentelemetry:trace_id() | undefined, - %% 64 bit int span id - span_id :: opentelemetry:span_id() | undefined, - %% 8-bit integer, lowest bit is if it is sampled - trace_flags = 1 :: integer() | undefined, - %% Tracestate represents tracing-system specific context in a list of key-value pairs. - %% Tracestate allows different vendors propagate additional information and - %% inter-operate with their legacy Id formats. - tracestate :: opentelemetry:tracestate() | undefined, - %% IsValid is a boolean flag which returns true if the SpanContext has a non-zero - %% TraceID and a non-zero SpanID. - is_valid :: boolean() | undefined, - %% true if the span context came from a remote process - is_remote :: boolean() | undefined, - %% this field is not propagated and is only here as an implementation optimization - %% If true updates like adding events are done on the span. The same as if the - %% trace flags lowest bit is 1 but simply not propagated. - is_recorded :: boolean() | undefined - }). - --record(span, { - %% name of the span - name :: unicode:unicode_binary(), - - %% 128 bit int trace id - trace_id :: opentelemetry:trace_id() | undefined, - - %% 64 bit int span id - span_id :: opentelemetry:span_id() | undefined, - %% 64 bit int parent span - parent_span_id :: opentelemetry:span_id() | undefined, - - tracestate :: opentelemetry:tracestate() | undefined, - - %% 8-bit integer, lowest bit is if it is sampled - trace_options = 1 :: integer() | undefined, - - kind = ?SPAN_KIND_UNSPECIFIED :: opentelemetry:span_kind() | undefined, - - start_time :: wts:timestamp(), - end_time :: wts:timestamp() | undefined, - - %% A set of attributes on the span. - %% Kept as a list so ets:select_replace/2 can be used to add new elements - attributes = [] :: opentelemetry:attributes() | undefined, - - %% optional stacktrace from where the span was started - stack_trace :: opentelemetry:stack_trace() | undefined, - - %% A time-stamped annotation or message event in the Span. - time_events = [] :: opentelemetry:time_events(), - - %% links to spans in other traces - links = [] :: opentelemetry:links(), - - %% An optional final status for this span. - status :: opentelemetry:status() | undefined, - - %% An optional resource that is associated with this span. If not set, this span - %% should be part of a batch that does include the resource information, unless resource - %% information is unknown. - resource :: opentelemetry:resource() | undefined, - - %% A highly recommended but not required flag that identifies when a trace - %% crosses a process boundary. True when the parent_span belongs to the - %% same process as the current span. - same_process_as_parent_span = undefined :: boolean() | undefined, - - %% An optional number of child spans that were generated while this span - %% was active. If set, allows implementation to detect missing child spans. - child_span_count = undefined :: integer() | undefined, - - %% this field is not propagated and is only here as an implementation optimization - %% If true updates like adding events are done on the span. The same as if the - %% trace flags lowest bit is 1 but simply not propagated. - is_recorded :: boolean() | undefined - }). - --record(link, { - trace_id :: opentelemetry:trace_id(), - span_id :: opentelemetry:span_id(), - attributes :: opentelemetry:attributes(), - tracestate :: opentelemetry:tracestate() - }). - --record(message_event, { - %% type of MessageEvent. Indicates whether the RPC message was sent or received. - type = 'TYPE_UNSPECIFIED' :: opentelemetry:message_event_type(), - - %% identifier for the message, which must be unique in this span. - id :: integer(), - - %% number of uncompressed bytes sent or received - uncompressed_size :: integer(), - - %% number of compressed bytes sent or received - compressed_size :: integer() - }). - --record(annotation, { - description :: unicode:unicode_binary() | undefined, - attributes :: opentelemetry:attributes() | undefined - }). - --record(status, { - code :: integer(), - %% developer-facing error message - message :: unicode:unicode_binary() - }). - -%% table to store active spans --define(SPAN_TAB, otel_span_table). diff --git a/rebar.config b/rebar.config index def4df8b..3a0037a3 100644 --- a/rebar.config +++ b/rebar.config @@ -1,5 +1,7 @@ {erl_opts, [debug_info]}. -{deps, [{wts, "~> 0.3"}]}. +{deps, [{wts, "~> 0.3"}, + {opentelemetry_api, {git, "https://github.com/tsloughter/opentelemetry-erlang-api", + {branch, "initial-commit"}}}]}. {shell, [{apps, [opentelemetry]}]}. diff --git a/src/opentelemetry.app.src b/src/opentelemetry.app.src index 6fd83a34..9d171134 100644 --- a/src/opentelemetry.app.src +++ b/src/opentelemetry.app.src @@ -6,13 +6,14 @@ {applications, [kernel, stdlib, - wts + wts, + opentelemetry_api ]}, - {env, [{tracer, {ot_tracer_default, #{span => {ot_span_ets, []}, - ctx => {ot_ctx_pdict, []}}}}, - {sampler, {always_on, #{}}}, - {exporter, [{exporters, []}, - {scheduled_delay_ms, 30000}]}]}, + {env, [{sampler, {always_on, #{}}}, + {processors, [%% #{id => my_processor, + %% module => ot_batch_processor, + %% config => #{}} + ]}]}, {modules, []}, {licenses, ["Apache 2.0"]}, diff --git a/src/opentelemetry.erl b/src/opentelemetry.erl deleted file mode 100644 index 92ae3252..00000000 --- a/src/opentelemetry.erl +++ /dev/null @@ -1,115 +0,0 @@ -%%%------------------------------------------------------------------------ -%% Copyright 2019, OpenTelemetry Authors -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%% -%% @doc The types defined here, and referencing records in opentelemetry.hrl -%% are used to store trace information while being collected on the -%% Erlang node. -%% -%% Thus, while the types are based on protos found in the opentelemetry-proto -%% repo: src/opentelemetry/proto/trace/v1/trace.proto, -%% they are not exact translations because further processing is done after -%% the span has finished and can be vendor specific. For example, there is -%% no count of the number of dropped attributes in the span record. And -%% an attribute's value can be a function to only evaluate the value if it -%% is actually used (at the time of exporting). And the stacktrace is a -%% regular Erlang stack trace. -%% @end -%%%------------------------------------------------------------------------- --module(opentelemetry). - --export([generate_trace_id/0, - generate_span_id/0]). - --include("opentelemetry.hrl"). - --export_type([trace_id/0, - span_id/0, - span_name/0, - span_ctx/0, - span/0, - span_kind/0, - link/0, - links/0, - attribute_key/0, - attribute_value/0, - attributes/0, - annotation/0, - time_events/0, - message_event/0, - message_event_type/0, - stack_trace/0, - tracestate/0, - status/0, - resource/0, - http_headers/0]). - --type trace_id() :: non_neg_integer(). --type span_id() :: non_neg_integer(). - --type span_ctx() :: #span_ctx{}. --type span() :: #span{}. --type span_name() :: unicode:unicode_binary(). - --type attribute_key() :: unicode:unicode_binary(). --type attribute_value() :: any(). --type attribute() :: {unicode:unicode_binary(), attribute_value()}. --type attributes() :: [attribute()]. - --type annotation() :: #annotation{}. --type span_kind() :: ?SPAN_KIND_INTERNAL | - ?SPAN_KIND_SERVER | - ?SPAN_KIND_CLIENT. --type message_event() :: #message_event{}. --type message_event_type() :: ?MESSAGE_EVENT_TYPE_UNSPECIFIED | - ?MESSAGE_EVENT_TYPE_SENT | - ?MESSAGE_EVENT_TYPE_RECEIVED. --type time_events() :: [{wts:timestamp(), annotation() | message_event()}]. --type link() :: #link{}. --type links() :: [#link{}]. --type status() :: #status{}. - -%% The key must begin with a lowercase letter, and can only contain -%% lowercase letters 'a'-'z', digits '0'-'9', underscores '_', dashes -%% '-', asterisks '*', and forward slashes '/'. -%% The value is opaque string up to 256 characters printable ASCII -%% RFC0020 characters (i.e., the range 0x20 to 0x7E) except ',' and '='. -%% Note that this also excludes tabs, newlines, carriage returns, etc. --type tracestate() :: [{unicode:latin1_chardata(), unicode:latin1_chardata()}]. - --type stack_trace() :: [erlang:stack_item()]. - --type resource() :: #{unicode:unicode_binary() => unicode:unicode_binary()}. - --type http_headers() :: [{unicode:unicode_binary(), unicode:unicode_binary()}]. - -%%-------------------------------------------------------------------- -%% @doc -%% Generates a 128 bit random integer to use as a trace id. -%% @end -%%-------------------------------------------------------------------- --spec generate_trace_id() -> trace_id(). -generate_trace_id() -> - uniform(2 bsl 127 - 1). %% 2 shifted left by 127 == 2 ^ 128 - -%%-------------------------------------------------------------------- -%% @doc -%% Generates a 64 bit random integer to use as a span id. -%% @end -%%-------------------------------------------------------------------- --spec generate_span_id() -> span_id(). -generate_span_id() -> - uniform(2 bsl 63 - 1). %% 2 shifted left by 63 == 2 ^ 64 - -uniform(X) -> - rand:uniform(X). diff --git a/src/opentelemetry_app.erl b/src/opentelemetry_app.erl index f7e0cf21..8084b128 100644 --- a/src/opentelemetry_app.erl +++ b/src/opentelemetry_app.erl @@ -23,18 +23,7 @@ start(_StartType, _StartArgs) -> Opts = application:get_all_env(opentelemetry), - - {sampler, {Sampler, SamplerOpts}} = lists:keyfind(sampler, 1, Opts), - SamplerFun = ot_sampler:setup(Sampler, SamplerOpts), - - %% The default sampler implementation must be passed to the tracer chosen. - %% Since the tracer is started after the rest of the supervision tree has started - %% and we only get its children, we must instantiate the sampler before anything else. - %% The sampler turns out to be instantiated before any supervision tree is started. - {tracer, {Tracer, TracerOpts}} = lists:keyfind(tracer, 1, Opts), - TracerChildren = ot_tracer:setup(Tracer, TracerOpts, SamplerFun), - - opentelemetry_sup:start_link(TracerChildren, Opts). + opentelemetry_sup:start_link(Opts). stop(_State) -> ok. diff --git a/src/opentelemetry_sup.erl b/src/opentelemetry_sup.erl index a3a4eebb..b8c4f002 100644 --- a/src/opentelemetry_sup.erl +++ b/src/opentelemetry_sup.erl @@ -19,32 +19,43 @@ -behaviour(supervisor). --export([start_link/2]). +-export([start_link/1]). -export([init/1]). -define(SERVER, ?MODULE). -start_link(Children, Opts) -> - supervisor:start_link({local, ?SERVER}, ?MODULE, [Children, Opts]). +start_link(Opts) -> + supervisor:start_link({local, ?SERVER}, ?MODULE, [Opts]). -init([Children, Opts]) -> +init([Opts]) -> SupFlags = #{strategy => one_for_one, - intensity => 0, - period => 1}, - - ExporterOpts = proplists:get_value(exporter, Opts, []), - Exporter = #{id => ot_exporter, - start => {ot_exporter, start_link, [ExporterOpts]}, - restart => permanent, - shutdown => 1000, - type => worker, - modules => [ot_exporter]}, - - ChildSpecs = [#{id => ot_span_sup, - start => {ot_span_sup, start_link, [Opts]}, - type => supervisor}, - Exporter | Children], + intensity => 1, + period => 5}, + + %% configuration server + TracerServer = #{id => ot_tracer_server, + start => {ot_tracer_server, start_link, [Opts]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [ot_tracer_server]}, + + Processors = proplists:get_value(processors, Opts, []), + BatchProcessorOpts = proplists:get_value(ot_batch_processor, Processors, []), + BatchProcessor = #{id => ot_batch_processor, + start => {ot_batch_processor, start_link, [BatchProcessorOpts]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [ot_batch_processor]}, + + SpanSup = #{id => ot_span_sup, + start => {ot_span_sup, start_link, [Opts]}, + type => supervisor, + restart => permanent, + shutdown => 5000, + modules => [ot_span_sup]}, + + ChildSpecs = [BatchProcessor, SpanSup, TracerServer], {ok, {SupFlags, ChildSpecs}}. - -%% internal functions diff --git a/src/ot_batch_processor.erl b/src/ot_batch_processor.erl new file mode 100644 index 00000000..022b79f1 --- /dev/null +++ b/src/ot_batch_processor.erl @@ -0,0 +1,293 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2019, OpenTelemetry Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc This module has the behaviour that each exporter must implement +%% and creates the buffer of trace spans to be exported. +%% +%% The exporter process can be configured to export the current finished +%% spans based on timeouts and the size of the finished spans table. +%% +%% Timeouts: +%% exporting_timeout_ms: How long to let the exports run before killing. +%% check_table_size_ms: Timeout to check the size of the export table. +%% scheduled_delay_ms: How often to trigger running the exporters. +%% +%% The size limit of the current table where finished spans are stored can +%% be configured with the `max_queue_size' option. +%% @end +%%%----------------------------------------------------------------------- +-module(ot_batch_processor). + +-behaviour(gen_statem). +-behaviour(ot_span_processor). + +-export([start_link/1, + on_start/2, + on_end/2, + set_exporter/1, + set_exporter/2]). + +-export([init/1, + callback_mode/0, + idle/3, + exporting/3, + terminate/3]). + +-include_lib("opentelemetry_api/include/opentelemetry.hrl"). +-include_lib("kernel/include/logger.hrl"). + +%% behaviour for exporters to implement +-type opts() :: term(). + +%% Do any initialization of the exporter here and return configuration +%% that will be passed along with a list of spans to the `export' function. +-callback init(term()) -> opts(). + +%% This function is called when the configured interval expires with any +%% spans that have been collected so far and the configuration returned in `init'. +%% Do whatever needs to be done to export each span here, the caller will block +%% until it returns. +-callback export(ets:tid(), opts()) -> ok | success | failed_not_retryable | failed_retryable. + +-record(data, {exporter :: {module(), term()} | undefined, + handed_off_table :: atom() | undefined, + runner_pid :: pid() | undefined, + max_queue_size :: integer() | infinity, + exporting_timeout_ms :: integer(), + check_table_size_ms :: integer() | infinity, + scheduled_delay_ms :: integer()}). + +-define(CURRENT_TABLES_KEY, {?MODULE, current_table}). +-define(TABLE_1, ot_export_table1). +-define(TABLE_2, ot_export_table2). +-define(CURRENT_TABLE, persistent_term:get(?CURRENT_TABLES_KEY)). + +-define(DEFAULT_MAX_QUEUE_SIZE, 2048). +-define(DEFAULT_SCHEDULED_DEPLAY_MS, timer:seconds(5)). +-define(DEFAULT_EXPORTER_TIMEOUT_MS, timer:minutes(5)). +-define(DEFAULT_CHECK_TABLE_SIZE_MS, timer:seconds(1)). + +-define(ENABLED_KEY, {?MODULE, enabled_key}). + +start_link(Opts) -> + gen_statem:start_link({local, ?MODULE}, ?MODULE, [Opts], []). + +%% @equiv set_exporter(Exporter, []). +set_exporter(Exporter) -> + set_exporter(Exporter, []). + +%% @doc Sets the batch exporter `Exporter'. +-spec set_exporter(module(), term()) -> ok. +set_exporter(Exporter, Options) -> + gen_statem:call(?MODULE, {set_exporter, {Exporter, Options}}). + +-spec on_start(opentelemetry:span(), ot_span_processor:processor_config()) -> opentelemetry:span(). +on_start(Span, _) -> + Span. + +-spec on_end(opentelemetry:span(), ot_span_processor:processor_config()) + -> true | dropped | {error, invalid_span} | {error, no_export_buffer}. +on_end(Span=#span{}, _) -> + do_insert(Span); +on_end(_, _) -> + {error, invalid_span}. + +init([Args]) -> + process_flag(trap_exit, true), + + SizeLimit = proplists:get_value(max_queue_size, Args, ?DEFAULT_MAX_QUEUE_SIZE), + ExportingTimeout = proplists:get_value(exporting_timeout_ms, Args, ?DEFAULT_EXPORTER_TIMEOUT_MS), + ScheduledDelay = proplists:get_value(scheduled_delay_ms, Args, ?DEFAULT_SCHEDULED_DEPLAY_MS), + CheckTableSize = proplists:get_value(check_table_size_ms, Args, ?DEFAULT_CHECK_TABLE_SIZE_MS), + + Exporter = init_exporter(proplists:get_value(exporter, Args, undefined)), + + _Tid1 = new_export_table(?TABLE_1), + _Tid2 = new_export_table(?TABLE_2), + persistent_term:put(?CURRENT_TABLES_KEY, ?TABLE_1), + + enable(), + + {ok, idle, #data{exporter=Exporter, + handed_off_table=undefined, + max_queue_size=case SizeLimit of + infinity -> infinity; + _ -> SizeLimit div erlang:system_info(wordsize) + end, + exporting_timeout_ms=ExportingTimeout, + check_table_size_ms=CheckTableSize, + scheduled_delay_ms=ScheduledDelay}}. + +callback_mode() -> + [state_functions, state_enter]. + +idle(enter, _OldState, #data{scheduled_delay_ms=SendInterval}) -> + {keep_state_and_data, [{{timeout, export_spans}, SendInterval, export_spans}]}; +idle(_, export_spans, Data) -> + {next_state, exporting, Data}; +idle(EventType, Event, Data) -> + handle_event_(idle, EventType, Event, Data). + +exporting({timeout, export_spans}, export_spans, _) -> + {keep_state_and_data, [postpone]}; +exporting(enter, _OldState, Data=#data{exporting_timeout_ms=ExportingTimeout, + scheduled_delay_ms=SendInterval}) -> + {OldTableName, RunnerPid} = export_spans(Data), + {keep_state, Data#data{runner_pid=RunnerPid, + handed_off_table=OldTableName}, + [{state_timeout, ExportingTimeout, exporting_timeout}, + {{timeout, export_spans}, SendInterval, export_spans}]}; +exporting(state_timeout, exporting_timeout, Data=#data{handed_off_table=ExportingTable}) -> + %% kill current exporting process because it is taking too long + %% which deletes the exporting table, so create a new one and + %% repeat the state to force another span exporting immediately + Data1 = kill_runner(Data), + new_export_table(ExportingTable), + {repeat_state, Data1}; +%% important to verify runner_pid and FromPid are the same in case it was sent +%% after kill_runner was called but before it had done the unlink +exporting(info, {'EXIT', FromPid, _}, Data=#data{runner_pid=FromPid}) -> + complete_exporting(Data); +%% important to verify runner_pid and FromPid are the same in case it was sent +%% after kill_runner was called but before it had done the unlink +exporting(info, {completed, FromPid}, Data=#data{runner_pid=FromPid}) -> + complete_exporting(Data); +exporting(EventType, Event, Data) -> + handle_event_(exporting, EventType, Event, Data). + +handle_event_(_State, {timeout, check_table_size}, check_table_size, #data{max_queue_size=infinity}) -> + keep_state_and_data; +handle_event_(_State, {timeout, check_table_size}, check_table_size, #data{max_queue_size=MaxQueueSize}) -> + case ets:info(?CURRENT_TABLE, size) of + M when M >= MaxQueueSize -> + disable(), + keep_state_and_data; + _ -> + enable(), + keep_state_and_data + end; +handle_event_(_, {call, From}, {set_exporter, Exporter}, Data=#data{exporter=OldExporter}) -> + shutdown_exporter(OldExporter), + {keep_state, Data#data{exporter=init_exporter(Exporter)}, [{reply, From, ok}]}; +handle_event_(_, _, _, _) -> + keep_state_and_data. + +terminate(_, _, _Data) -> + %% TODO: flush buffers to exporter + ok. + +%% + +enable()-> + persistent_term:put(?ENABLED_KEY, true). + +disable() -> + persistent_term:put(?ENABLED_KEY, false). + +is_enabled() -> + persistent_term:get(?ENABLED_KEY, true). + +do_insert(Span) -> + try + case is_enabled() of + true -> + ets:insert(?CURRENT_TABLE, Span); + _ -> + dropped + end + catch + error:badarg -> + {error, no_batch_span_processor}; + _:_ -> + {error, other} + end. + +complete_exporting(Data=#data{handed_off_table=ExportingTable}) + when ExportingTable =/= undefined -> + new_export_table(ExportingTable), + {next_state, idle, Data#data{runner_pid=undefined, + handed_off_table=undefined}}. + +kill_runner(Data=#data{runner_pid=RunnerPid}) -> + erlang:unlink(RunnerPid), + erlang:exit(RunnerPid, kill), + Data#data{runner_pid=undefined, + handed_off_table=undefined}. + +new_export_table(Name) -> + ets:new(Name, [public, named_table, {write_concurrency, true}, duplicate_bag]). + +init_exporter(undefined) -> + undefined; +init_exporter({ExporterModule, Config}) when is_atom(ExporterModule) -> + case ExporterModule:init(Config) of + {ok, ExporterConfig} -> + {ExporterModule, ExporterConfig}; + ignore -> + undefined + end; +init_exporter(ExporterModule) when is_atom(ExporterModule) -> + init_exporter({ExporterModule, []}). + +shutdown_exporter(undefined) -> + ok; +shutdown_exporter({ExporterModule, Config}) -> + ExporterModule:shutdown(Config). + +export_spans(#data{exporter=Exporter}) -> + CurrentTable = ?CURRENT_TABLE, + NewCurrentTable = case CurrentTable of + ?TABLE_1 -> + ?TABLE_2; + ?TABLE_2 -> + ?TABLE_1 + end, + + %% an atom is a single word so this does not trigger a global GC + persistent_term:put(?CURRENT_TABLES_KEY, NewCurrentTable), + %% set the table to accept inserts + enable(), + + Self = self(), + RunnerPid = erlang:spawn_link(fun() -> send_spans(Self, Exporter) end), + ets:give_away(CurrentTable, RunnerPid, export), + {CurrentTable, RunnerPid}. + +%% Additional benefit of using a separate process is calls to `register` won't +%% timeout if the actual exporting takes longer than the call timeout +send_spans(FromPid, Exporter) -> + receive + {'ETS-TRANSFER', Table, FromPid, export} -> + TableName = ets:rename(Table, current_send_table), + export(Exporter, TableName), + ets:delete(TableName), + completed(FromPid) + end. + +completed(FromPid) -> + FromPid ! {completed, self()}. + +export(undefined, _) -> + true; +export({Exporter, Config}, SpansTid) -> + %% don't let a exporter exception crash us + %% and return true if exporter failed + try + Exporter:export(SpansTid, Config) =:= failed_not_retryable + catch + Class:Exception:StackTrace -> + ?LOG_INFO("exporter threw exception: exporter=~p ~p:~p stacktrace=~p", + [Exporter, Class, Exception, StackTrace]), + true + end. diff --git a/src/ot_ctx.erl b/src/ot_ctx.erl deleted file mode 100644 index 89a4cd51..00000000 --- a/src/ot_ctx.erl +++ /dev/null @@ -1,53 +0,0 @@ -%%%------------------------------------------------------------------------ -%% Copyright 2019, OpenTelemetry Authors -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%% -%% @doc -%% @end -%%%------------------------------------------------------------------------- --module(ot_ctx). - --export([get/2, - get/3, - with_value/3, - with_value/4]). - --type key() :: term(). --type instrumented_fun() :: fun(() -> term()). - --export_type([key/0, instrumented_fun/0]). - -%% Get value of `Key' from current context and return `Default' if none. --callback get(Key :: key(), Default :: term()) -> term(). - -%% Set value of `Key' in current context to `Value'. --callback with_value(Key :: key(), Value :: term()) -> ok. - --spec get(Impl :: module(), key()) -> term(). -get(Module, Key) -> get(Module, Key, undefined). - --spec get(Impl :: module(), key(), term()) -> term(). -get(Module, Key, Default) -> Module:get(Key, Default). - --spec with_value(Impl :: module(), key(), term()) -> term(). -with_value(Module, Key, Value) -> Module:with_value(Key, Value). - --spec with_value(Impl :: module(), key(), term(), instrumented_fun()) -> term(). -with_value(Module, Key, Value, Fun) -> - Orig = get(Module, Key), - try - with_value(Module, Key, Value), - Fun() - after - with_value(Module, Key, Orig) - end. diff --git a/src/ot_exporter.erl b/src/ot_exporter.erl index b1779e74..67ec136a 100644 --- a/src/ot_exporter.erl +++ b/src/ot_exporter.erl @@ -12,247 +12,11 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %% -%% @doc This module has the behaviour that each exporter must implement -%% and creates the buffer of trace spans to be exported. -%% -%% The exporter process can be configured to export the current finished -%% spans based on timeouts and the size of the finished spans table. -%% -%% Timeouts: -%% exporting_timeout_ms: How long to let the exports run before killing. -%% check_table_size_ms: Timeout to check the size of the export table. -%% scheduled_delay_ms: How often to trigger running the exporters. -%% -%% The size limit of the current table where finished spans are stored can -%% be configured with the `max_queue_size' option. +%% @doc %% @end %%%----------------------------------------------------------------------- -module(ot_exporter). --behaviour(gen_statem). - --compile({no_auto_import, [register/2]}). - --export([start_link/1, - store_span/1, - register/1, - register/2]). - --export([init/1, - callback_mode/0, - idle/3, - exporting/3, - terminate/3]). - --include("opentelemetry.hrl"). --include_lib("kernel/include/logger.hrl"). - -%% behaviour for exporters to implement --type opts() :: term(). - -%% Do any initialization of the exporter here and return configuration -%% that will be passed along with a list of spans to the `export' function. --callback init(term()) -> opts(). - -%% This function is called when the configured interval expires with any -%% spans that have been collected so far and the configuration returned in `init'. -%% Do whatever needs to be done to export each span here, the caller will block -%% until it returns. --callback export(ets:tid(), opts()) -> ok | success | failed_not_retryable | failed_retryable. - --record(data, {exporters :: [{module(), term()}], - handed_off_table :: atom() | undefined, - runner_pid :: pid() | undefined, - max_queue_size :: integer() | infinity, - exporting_timeout_ms :: integer(), - check_table_size_ms :: integer() | infinity, - scheduled_delay_ms :: integer()}). - --define(CURRENT_TABLES_KEY, {?MODULE, current_table}). --define(TABLE_1, ot_export_table1). --define(TABLE_2, ot_export_table2). --define(CURRENT_TABLE, persistent_term:get(?CURRENT_TABLES_KEY)). - --define(DEFAULT_MAX_QUEUE_SIZE, 2048). --define(DEFAULT_SCHEDULED_DEPLAY_MS, timer:seconds(5)). --define(DEFAULT_EXPORTING_TIMEOUT, timer:minutes(5)). --define(DEFAULT_CHECK_TABLE_SIZE_INTERVAL, infinity). - -start_link(Opts) -> - gen_statem:start_link({local, ?MODULE}, ?MODULE, [Opts], []). - -%% @equiv register(Exporter, []). -register(Exporter) -> - register(Exporter, []). - -%% @doc Register new traces exporter `Exporter' with `Config'. --spec register(module(), term()) -> ok. -register(Exporter, Options) -> - gen_statem:call(?MODULE, {register, init_exporter({Exporter, Options})}). - --spec store_span(opencensus:span()) -> true | {error, invalid_span} | {error, no_export_buffer}. -store_span(Span=#span{}) -> - try - ets:insert(?CURRENT_TABLE, Span) - catch - error:badarg -> - {error, no_export_buffer} - end; -store_span(_) -> - {error, invalid_span}. - -init([Args]) -> - process_flag(trap_exit, true), - - SizeLimit = proplists:get_value(max_queue_size, Args, ?DEFAULT_MAX_QUEUE_SIZE), - ExportingTimeout = proplists:get_value(exporting_timeout_ms, Args, ?DEFAULT_EXPORTING_TIMEOUT), - ScheduledDelay = proplists:get_value(scheduled_delay_ms, Args, ?DEFAULT_SCHEDULED_DEPLAY_MS), - CheckTableSize = proplists:get_value(check_table_size_ms, Args, ?DEFAULT_CHECK_TABLE_SIZE_INTERVAL), - Exporters = [init_exporter(Config) || Config <- proplists:get_value(exporters, Args, [])], - - _Tid1 = new_export_table(?TABLE_1), - _Tid2 = new_export_table(?TABLE_2), - persistent_term:put(?CURRENT_TABLES_KEY, ?TABLE_1), - - {ok, idle, #data{exporters=Exporters, - handed_off_table=undefined, - max_queue_size=case SizeLimit of - infinity -> infinity; - _ -> SizeLimit div erlang:system_info(wordsize) - end, - exporting_timeout_ms=ExportingTimeout, - check_table_size_ms=CheckTableSize, - scheduled_delay_ms=ScheduledDelay}}. - -callback_mode() -> - [state_functions, state_enter]. - -idle(enter, _OldState, #data{scheduled_delay_ms=SendInterval}) -> - {keep_state_and_data, [{{timeout, export_spans}, SendInterval, export_spans}]}; -idle(_, export_spans, Data) -> - {next_state, exporting, Data}; -idle(EventType, Event, Data) -> - handle_event_(idle, EventType, Event, Data). - -exporting({timeout, export_spans}, export_spans, _) -> - {keep_state_and_data, [postpone]}; -exporting(enter, _OldState, Data=#data{exporting_timeout_ms=ExportingTimeout, - scheduled_delay_ms=SendInterval}) -> - {OldTableName, RunnerPid} = export_spans(Data), - {keep_state, Data#data{runner_pid=RunnerPid, - handed_off_table=OldTableName}, - [{state_timeout, ExportingTimeout, exporting_timeout}, - {{timeout, export_spans}, SendInterval, export_spans}]}; -exporting(state_timeout, exporting_timeout, Data=#data{handed_off_table=ExportingTable}) -> - %% kill current exporting process because it is taking too long - %% which deletes the exporting table, so create a new one and - %% repeat the state to force another span exporting immediately - Data1 = kill_runner(Data), - new_export_table(ExportingTable), - {repeat_state, Data1}; -%% important to verify runner_pid and FromPid are the same in case it was sent -%% after kill_runner was called but before it had done the unlink -exporting(info, {'EXIT', FromPid, _}, Data=#data{runner_pid=FromPid}) -> - complete_exporting([], Data); -%% important to verify runner_pid and FromPid are the same in case it was sent -%% after kill_runner was called but before it had done the unlink -exporting(info, {completed, FromPid, FailedExporters}, Data=#data{runner_pid=FromPid}) -> - complete_exporting(FailedExporters, Data); -exporting(EventType, Event, Data) -> - handle_event_(exporting, EventType, Event, Data). - -handle_event_(_State, {timeout, check_table_size}, check_table_size, #data{max_queue_size=infinity}) -> - keep_state_and_data; -handle_event_(State, {timeout, check_table_size}, check_table_size, Data=#data{max_queue_size=SizeLimit}) -> - case ets:info(?CURRENT_TABLE, memory) of - M when M >= SizeLimit, State =:= idle -> - Data1 = kill_runner(Data), - {next_state, exporting, Data1}; - M when M >= SizeLimit, State =:= exporting -> - Data1 = kill_runner(Data), - {repeat_state, Data1}; - _ -> - keep_state_and_data - end; -handle_event_(_, {call, From}, {register, Exporter}, Data=#data{exporters=Exporters}) -> - {keep_state, Data#data{exporters=[Exporter | Exporters]}, [{reply, From, ok}]}; -handle_event_(_, _, _, _) -> - keep_state_and_data. - -terminate(_, _, _Data) -> - ok. - -%% - -complete_exporting(FailedExporters, Data=#data{exporters=Exporters, - handed_off_table=ExportingTable}) - when ExportingTable =/= undefined -> - new_export_table(ExportingTable), - {next_state, idle, Data#data{exporters=Exporters--FailedExporters, - runner_pid=undefined, - handed_off_table=undefined}}. - -kill_runner(Data=#data{runner_pid=RunnerPid}) -> - erlang:unlink(RunnerPid), - erlang:exit(RunnerPid, kill), - Data#data{runner_pid=undefined, - handed_off_table=undefined}. - -new_export_table(Name) -> - ets:new(Name, [public, named_table, {write_concurrency, true}, duplicate_bag]). - -init_exporter({Exporter, Config}) when is_atom(Exporter) -> - {fun Exporter:export/2, Exporter:init(Config)}; -init_exporter(Exporter) when is_atom(Exporter) -> - {fun Exporter:export/2, Exporter:init([])}; -init_exporter(Exporter) when is_function(Exporter) -> - {Exporter, []}; -init_exporter({Exporter, Config}) when is_function(Exporter) -> - {Exporter, Config}. - -export_spans(#data{exporters=Exporters}) -> - CurrentTable = ?CURRENT_TABLE, - NewCurrentTable = case CurrentTable of - ?TABLE_1 -> - ?TABLE_2; - ?TABLE_2 -> - ?TABLE_1 - end, - - %% an atom is a single word so this does not trigger a global GC - persistent_term:put(?CURRENT_TABLES_KEY, NewCurrentTable), - - Self = self(), - RunnerPid = erlang:spawn_link(fun() -> send_spans(Self, Exporters) end), - ets:give_away(CurrentTable, RunnerPid, export), - {CurrentTable, RunnerPid}. - -%% Additional benefit of using a separate process is calls to `register` won't -%% timeout if the actual exporting takes longer than the call timeout -send_spans(FromPid, Exporters) -> - receive - {'ETS-TRANSFER', Table, FromPid, export} -> - TableName = ets:rename(Table, current_send_table), - FailedExporters = lists:filtermap(fun({Exporter, Config}) -> - export(Exporter, TableName, Config) - end, Exporters), - ets:delete(TableName), - completed(FromPid, FailedExporters) - end. - -completed(FromPid, FailedExporters) -> - FromPid ! {completed, self(), FailedExporters}. - -export(undefined, _, _) -> - true; -export(Exporter, SpansTid, Config) -> - %% don't let a exporter exception crash us - %% and return true if exporter failed - try - Exporter(SpansTid, Config) =:= failed_not_retryable - catch - Class:Exception:StackTrace -> - ?LOG_INFO("dropping exporter that threw exception: exporter=~p ~p:~p stacktrace=~p", - [Exporter, Class, Exception, StackTrace]), - true - end. +-callback init(term()) -> {ok, term()} | ignore. +-callback export(ets:tid(), term()) -> ok | success | failed_not_retryable | failed_retryable. +-callback shutdown(term()) -> ok. diff --git a/src/ot_exporter_pid.erl b/src/ot_exporter_pid.erl index 1b72aa5b..abd87f61 100644 --- a/src/ot_exporter_pid.erl +++ b/src/ot_exporter_pid.erl @@ -20,13 +20,17 @@ -behaviour(ot_exporter). -export([init/1, - export/2]). + export/2, + shutdown/1]). init(Pid) -> - Pid. + {ok, Pid}. export(SpansTid, Pid) -> ets:foldl(fun(Span, _Acc) -> Pid ! {span, Span} end, [], SpansTid), ok. + +shutdown(_) -> + ok. diff --git a/src/ot_exporter_stdout.erl b/src/ot_exporter_stdout.erl index 77e9fd89..aa87fb8e 100644 --- a/src/ot_exporter_stdout.erl +++ b/src/ot_exporter_stdout.erl @@ -20,13 +20,17 @@ -behaviour(ot_exporter). -export([init/1, - export/2]). + export/2, + shutdown/1]). init(_) -> - ok. + {ok, []}. export(SpansTid, _) -> ets:foldl(fun(Span, _Acc) -> io:format("~p~n", [Span]) end, [], SpansTid), ok. + +shutdown(_) -> + ok. diff --git a/src/ot_exporter_tab.erl b/src/ot_exporter_tab.erl index fccff10a..d4285453 100644 --- a/src/ot_exporter_tab.erl +++ b/src/ot_exporter_tab.erl @@ -20,13 +20,17 @@ -behaviour(ot_exporter). -export([init/1, - export/2]). + export/2, + shutdown/1]). init(Tid) -> - Tid. + {ok, Tid}. export(SpansTid, Tid) -> ets:foldl(fun(Span, _Acc) -> ets:insert(Tid, Span) end, [], SpansTid), ok. + +shutdown(_) -> + ok. diff --git a/src/ot_sampler.erl b/src/ot_sampler.erl index 959524e0..7bff41cf 100644 --- a/src/ot_sampler.erl +++ b/src/ot_sampler.erl @@ -21,7 +21,7 @@ -export([setup/2]). --include("opentelemetry.hrl"). +-include_lib("opentelemetry_api/include/opentelemetry.hrl"). -include("ot_sampler.hrl"). -type sampling_decision() :: ?NOT_RECORD | ?RECORD | ?RECORD_AND_PROPAGATE. diff --git a/src/ot_span.erl b/src/ot_span.erl deleted file mode 100644 index ce7fcb0f..00000000 --- a/src/ot_span.erl +++ /dev/null @@ -1,80 +0,0 @@ -%%%------------------------------------------------------------------------ -%% Copyright 2019, OpenTelemetry Authors -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%% -%% @doc -%% Span behaviour. -%% @end -%%%------------------------------------------------------------------------- --module(ot_span). - --export([start_span/3, - finish_span/2, - get_ctx/2, - is_recording_events/2, - set_attribute/4, - set_attributes/3, - add_events/3, - add_links/3, - set_status/3, - update_name/3]). - --type start_opts() :: #{parent => undefined | opentelemetry:span() | opentelemetry:span_ctx(), - sampler => ot_sampler:sampler(), - links => opentelemetry:links(), - is_recorded => boolean(), - kind => opentelemetry:span_kind()}. - --export_type([start_opts/0]). - --callback start_span(opentelemetry:span_name(), start_opts()) -> opentelemetry:span_ctx(). --callback finish_span(opentelemetry:span_ctx()) -> boolean() | {error, term()}. --callback get_ctx(opentelemetry:span()) -> opentelemetry:span_ctx(). --callback is_recording_events(opentelemetry:span_ctx()) -> boolean(). --callback set_attribute(opentelemetry:span_ctx(), - opentelemetry:attribute_key(), - opentelemetry:attribute_value()) -> boolean(). --callback set_attributes(opentelemetry:span_ctx(), opentelemetry:attributes()) -> boolean(). --callback add_events(opentelemetry:span_ctx(), opentelemetry:time_events()) -> boolean(). --callback set_status(opentelemetry:span_ctx(), opentelemetry:status()) -> boolean(). --callback update_name(opentelemetry:span_ctx(), opentelemetry:span_name()) -> boolean(). - -start_span(Module, Name, Opts) -> - Module:start_span(Name, Opts). - -finish_span(Module, Ctx) -> - Module:finish_span(Ctx). - -get_ctx(Module, Span) -> - Module:get_ctx(Span). - -is_recording_events(Module, SpanCtx) -> - Module:is_recording_events(SpanCtx). - -set_attribute(Module, SpanCtx, Key, Value) -> - Module:set_attribute(SpanCtx, Key, Value). - -set_attributes(Module, SpanCtx, Attributes) -> - Module:set_attributes(SpanCtx, Attributes). - -add_events(Module, SpanCtx, Events) -> - Module:add_events(SpanCtx, Events). - -add_links(Module, SpanCtx, Links) -> - Module:add_links(SpanCtx, Links). - -set_status(Module, SpanCtx, Status) -> - Module:set_status(SpanCtx, Status). - -update_name(Module, SpanCtx, Name) -> - Module:update_name(SpanCtx, Name). diff --git a/src/ot_span_ets.erl b/src/ot_span_ets.erl index 6bd9ebce..ed084679 100644 --- a/src/ot_span_ets.erl +++ b/src/ot_span_ets.erl @@ -27,7 +27,9 @@ handle_cast/2]). -export([start_span/2, - finish_span/1, + start_span/3, + end_span/1, + end_span/2, get_ctx/1, is_recording_events/1, set_attribute/3, @@ -36,38 +38,52 @@ set_status/2, update_name/2]). --include("opentelemetry.hrl"). +-include_lib("opentelemetry_api/include/opentelemetry.hrl"). +-include("ot_span_ets.hrl"). -record(state, {}). start_link(Opts) -> gen_server:start_link(?MODULE, Opts, []). -%% @doc Start a span and insert into the active span ets table. --spec start_span(opentelemetry:span_name(), ot_span:start_opts()) -> opentelemetry:span_ctx(). start_span(Name, Opts) -> + start_span(Name, Opts, fun(Span) -> Span end). + +%% @doc Start a span and insert into the active span ets table. +-spec start_span(opentelemetry:span_name(), ot_span:start_opts(), fun()) -> opentelemetry:span_ctx(). +start_span(Name, Opts, Processors) -> {SpanCtx, Span} = ot_span_utils:start_span(Name, Opts), - _ = storage_insert(Span), + Span1 = Processors(Span), + _ = storage_insert(Span1), SpanCtx. -%% @doc Finish a span based on its context and send to reporter. --spec finish_span(opentelemetry:span_ctx()) -> boolean() | {error, term()}. -finish_span(#span_ctx{span_id=SpanId, - tracestate=Tracestate, - trace_flags=TraceOptions}) when ?IS_SPAN_ENABLED(TraceOptions) -> +end_span(SpanCtx) -> + end_span(SpanCtx, fun(Span) -> Span end). + +%% @doc End a span based on its context and send to reporter. +-spec end_span(opentelemetry:span_ctx(), fun()) -> boolean() | {error, term()}. +end_span(#span_ctx{span_id=SpanId, + tracestate=Tracestate, + trace_flags=TraceOptions}, Processors) when ?IS_SPAN_ENABLED(TraceOptions) -> case ets:take(?SPAN_TAB, SpanId) of [Span] -> Span1 = ot_span_utils:end_span(Span#span{tracestate=Tracestate}), - ot_exporter:store_span(Span1); + Processors(Span1); _ -> false end; -finish_span(_) -> +end_span(_, _) -> ok. -spec get_ctx(opentelemetry:span()) -> opentelemetry:span_ctx(). -get_ctx(_Span) -> - #span_ctx{}. +get_ctx(#span{trace_id=TraceId, + span_id=SpanId, + tracestate=TraceState, + is_recorded=IsRecorded}) -> + #span_ctx{trace_id=TraceId, + span_id=SpanId, + tracestate=TraceState, + is_recorded=IsRecorded}. -spec is_recording_events(opentelemetry:span_ctx()) -> boolean(). is_recording_events(#span_ctx{is_recorded=IsRecorded}) -> @@ -89,11 +105,11 @@ set_attributes(#span_ctx{span_id=SpanId}, NewAttributes) -> false end. --spec add_events(opentelemetry:span_ctx(), opentelemetry:time_events()) -> boolean(). -add_events(#span_ctx{span_id=SpanId}, NewTimeEvents) -> +-spec add_events(opentelemetry:span_ctx(), opentelemetry:timed_events()) -> boolean(). +add_events(#span_ctx{span_id=SpanId}, NewTimedEvents) -> case ets:lookup(?SPAN_TAB, SpanId) of - [Span=#span{time_events=TimeEvents}] -> - Span1 = Span#span{time_events=TimeEvents++NewTimeEvents}, + [Span=#span{timed_events=TimeEvents}] -> + Span1 = Span#span{timed_events=TimeEvents++NewTimedEvents}, 1 =:= ets:select_replace(?SPAN_TAB, [{Span, [], [{const, Span1}]}]); _ -> false diff --git a/src/ot_span_ets.hrl b/src/ot_span_ets.hrl new file mode 100644 index 00000000..4946dac1 --- /dev/null +++ b/src/ot_span_ets.hrl @@ -0,0 +1 @@ +-define(SPAN_TAB, otel_span_table). diff --git a/src/ot_span_processor.erl b/src/ot_span_processor.erl new file mode 100644 index 00000000..7f840008 --- /dev/null +++ b/src/ot_span_processor.erl @@ -0,0 +1,27 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2019, OpenTelemetry Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc +%% @end +%%%------------------------------------------------------------------------- +-module(ot_span_processor). + +-export([]). + +-type processor_config() :: term(). + +-export_type([processor_config/0]). + +-callback on_start(opentelemetry:span(), processor_config()) -> opentelemetry:span(). +-callback on_end(opentelemetry:span(), processor_config()) -> true | dropped | {error, invalid_span} | {error, no_export_buffer}. diff --git a/src/ot_span_sup.erl b/src/ot_span_sup.erl index 439c06dc..b8babf9b 100644 --- a/src/ot_span_sup.erl +++ b/src/ot_span_sup.erl @@ -33,18 +33,26 @@ start_child(ChildSpec) -> supervisor:start_child(?SERVER, ChildSpec). init([Opts]) -> + SupFlags = #{strategy => one_for_one, + intensity => 1, + period => 5}, + SweeperOpts = proplists:get_value(sweeper, Opts, #{}), Sweeper = #{id => ot_span_sweeper, start => {ot_span_sweeper, start_link, [SweeperOpts]}, restart => permanent, - shutdown => 1000, + shutdown => 5000, type => worker, modules => [ot_span_sweeper]}, - SupFlags = #{strategy => one_for_one, - intensity => 0, - period => 1}, - ChildSpecs = [Sweeper], + SpanHandler = #{id => ot_span_ets, + start => {ot_span_ets, start_link, [[]]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [ot_span_ets]}, + + ChildSpecs = [SpanHandler, Sweeper], {ok, {SupFlags, ChildSpecs}}. %% internal functions diff --git a/src/ot_span_sweeper.erl b/src/ot_span_sweeper.erl index bb3dbcad..3b1c710a 100644 --- a/src/ot_span_sweeper.erl +++ b/src/ot_span_sweeper.erl @@ -31,11 +31,12 @@ -export([storage_size/0]). --include("opentelemetry.hrl"). +-include_lib("opentelemetry_api/include/opentelemetry.hrl"). +-include("ot_span_ets.hrl"). -include_lib("kernel/include/logger.hrl"). -record(data, {interval :: integer() | infinity, - strategy :: drop | finish | failed_attribute_and_finish | fun((opencensus:span()) -> ok), + strategy :: drop | end_span | failed_attribute_and_end_span | fun((opencensus:span()) -> ok), ttl :: integer() | infinity, storage_size :: integer() | infinity}). @@ -113,7 +114,7 @@ sweep_spans(drop, TTL) -> NumDeleted -> ?LOG_INFO("sweep old spans: ttl=~p num_dropped=~p", [TTL, NumDeleted]) end; -sweep_spans(finish, TTL) -> +sweep_spans(end_span, TTL) -> Expired = select_expired(TTL), [begin case ets:take(?SPAN_TAB, SpanId) of @@ -121,11 +122,11 @@ sweep_spans(finish, TTL) -> %% must have finished without needing to be swept ok; [Span] -> - finish_span(Span) + end_span(Span) end end || SpanId <- Expired], ok; -sweep_spans(failed_attribute_and_finish, TTL) -> +sweep_spans(failed_attribute_and_end_span, TTL) -> ExpiredSpanIds = select_expired(TTL), [begin case ets:take(?SPAN_TAB, SpanId) of @@ -134,7 +135,7 @@ sweep_spans(failed_attribute_and_finish, TTL) -> ok; [Span=#span{attributes=Attributes}] -> Span1 = Span#span{attributes=Attributes ++ [{<<"finished_by_sweeper">>, true}]}, - finish_span(Span1) + end_span(Span1) end end || SpanId <- ExpiredSpanIds], ok; @@ -145,7 +146,7 @@ sweep_spans(Fun, TTL) when is_function(Fun) -> %% ignore these functions because dialyzer doesn't like match spec use of '_' -dialyzer({nowarn_function, expired_match_spec/2}). --dialyzer({nowarn_function, finish_span/1}). +-dialyzer({nowarn_function, end_span/1}). -dialyzer({nowarn_function, select_expired/1}). select_expired(TTL) -> @@ -157,7 +158,9 @@ expired_match_spec(Time, Return) -> [{'<', '$2', Time}], [Return]}]. -finish_span(Span=#span{tracestate=Tracestate}) -> - %% hack to not lose tracestate when finishing without span ctx +end_span(Span=#span{tracestate=Tracestate}) -> + %% hack to not lose tracestate when ending without span ctx Span1 = ot_span_utils:end_span(Span#span{tracestate=Tracestate}), - ot_exporter:store_span(Span1). + Tracer = opentelemetry:get_tracer(), + Processors = ot_tracer_default:on_end(Tracer), + Processors(Span1). diff --git a/src/ot_span_utils.erl b/src/ot_span_utils.erl index b820acfc..e89f63c7 100644 --- a/src/ot_span_utils.erl +++ b/src/ot_span_utils.erl @@ -21,7 +21,7 @@ -export([start_span/2, end_span/1]). --include("opentelemetry.hrl"). +-include_lib("opentelemetry_api/include/opentelemetry.hrl"). -include("ot_sampler.hrl"). -type start_opts() :: #{parent => undefined | opentelemetry:span() | opentelemetry:span_ctx(), diff --git a/src/ot_tracer.erl b/src/ot_tracer.erl deleted file mode 100644 index 459ec733..00000000 --- a/src/ot_tracer.erl +++ /dev/null @@ -1,113 +0,0 @@ -%%%------------------------------------------------------------------------ -%% Copyright 2019, OpenTelemetry Authors -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%% -%% @doc -%% @end -%%%------------------------------------------------------------------------- --module(ot_tracer). - --export([setup/3, - start_span/1, - start_span/2, - start_span/3, - with_span/1, - with_span/2, - current_span_ctx/0, - finish/0, - get_binary_format/0, - get_http_text_format/0]). - --include("opentelemetry.hrl"). - --record(state, {tracer :: module(), - sampler :: ot_sampler:sampler()}). - --define(state, (persistent_term:get({?MODULE, state}))). --define(tracer, ?state#state.tracer). --define(sampler, ?state#state.sampler). --define(CURRENT_TRACER, {?MODULE, current_tracer}). - --callback setup(map()) -> [supervisor:child_spec()]. --callback start_span(opentelemetry:span_name(), ot_span:start_opts()) -> opentelemetry:span_ctx(). --callback with_span(opentelemetry:span_ctx()) -> ok. --callback finish() -> ok. --callback current_span_ctx() -> opentelemetry:span_ctx(). -%% -callback get_current_span() -> opentelemetry:span(). --callback get_binary_format() -> binary(). --callback get_http_text_format() -> opentelemetry:http_headers(). - --spec setup(module(), map(), ot_sampler:sampler()) -> [supervisor:child_spec()]. -setup(Tracer, TracerOpts, Sampler) -> - persistent_term:put({?MODULE, state}, #state{tracer=Tracer, - sampler=Sampler}), - Tracer:setup(TracerOpts). - --spec start_span(opentelemetry:span_name()) -> opentelemetry:span_ctx(). -start_span(Name) -> - start_span(Name, #{}). - --spec start_span(opentelemetry:span_name(), ot_span:start_opts()) -> opentelemetry:span_ctx(). -start_span(Name, Opts) -> - start_span(?state, Name, Opts). - --spec start_span(module(), opentelemetry:span_name(), ot_span:start_opts()) -> opentelemetry:span_ctx(). -start_span(#state{tracer=Tracer}, Name, Opts) when is_map_key(sampler, Opts)-> - ot_ctx_pdict:with_value(?CURRENT_TRACER, Tracer), - Tracer:start_span(Name, Opts); -start_span(#state{tracer=Tracer, - sampler=Sampler}, Name, Opts) -> - ot_ctx_pdict:with_value(?CURRENT_TRACER, Tracer), - Tracer:start_span(Name, Opts#{sampler => Sampler}); -start_span(Tracer, Name, Opts) when is_map_key(sampler, Opts)-> - ot_ctx_pdict:with_value(?CURRENT_TRACER, Tracer), - Tracer:start_span(Name, Opts); -start_span(Tracer, Name, Opts) -> - ot_ctx:with_value(ot_ctx_pdict, ?CURRENT_TRACER, Tracer), - Tracer:start_span(Name, Opts). - --spec with_span(opentelemetry:span_ctx()) -> ok. -with_span(Span) -> - with_span(?tracer, Span). - --spec with_span(module() | opentelemetry:span_ctx(), opentelemetry:span_ctx() | fun()) -> ok. -with_span(Span=#span_ctx{}, Fun) -> - Tracer = ot_ctx:get(ot_ctx_pdict, ?CURRENT_TRACER, ?tracer), - with_span(Tracer, Span, Fun); -with_span(Tracer, Span) -> - ot_ctx:with_value(ot_ctx_pdict, ?CURRENT_TRACER, Tracer), - Tracer:with_span(Span). - --spec with_span(module(), opentelemetry:span_ctx(), fun()) -> ok. -with_span(Tracer, SpanCtx, Fun) -> - ot_ctx:with_value(ot_ctx_pdict, ?CURRENT_TRACER, Tracer, fun() -> Tracer:with_value(SpanCtx, Fun) end). - --spec finish() -> ok. -finish() -> - Tracer = ot_ctx:get(ot_ctx_pdict, ?CURRENT_TRACER, ?tracer), - Tracer:finish(). - --spec current_span_ctx() -> opentelemetry:span_ctx(). -current_span_ctx() -> - Tracer = ot_ctx:get(ot_ctx_pdict, ?CURRENT_TRACER, ?tracer), - Tracer:current_span_ctx(). - --spec get_binary_format() -> binary(). -get_binary_format() -> - Tracer = ot_ctx:get(ot_ctx_pdict, ?CURRENT_TRACER, ?tracer), - Tracer:get_binary_format(). - --spec get_http_text_format() -> opentelemetry:http_headers(). -get_http_text_format() -> - Tracer = ot_ctx:get(ot_ctx_pdict, ?CURRENT_TRACER, ?tracer), - Tracer:get_http_text_format(). diff --git a/src/ot_tracer.hrl b/src/ot_tracer.hrl new file mode 100644 index 00000000..306f682c --- /dev/null +++ b/src/ot_tracer.hrl @@ -0,0 +1,12 @@ +-define(SPAN_CTX, {ot_tracer_default, span_ctx}). +-define(CTX_IMPL_KEY, {ot_tracer_default, ctx}). + +-define(ctx, (persistent_term:get(?CTX_IMPL_KEY))). + +-record(tracer, {name :: unicode:unicode_binary() | undefined, + module :: module(), + span_module :: module(), + ctx_module :: module(), + processors :: [{module(), term()}], + sampler :: ot_sampler:sampler(), + resource :: term() | undefined}). diff --git a/src/ot_tracer_default.erl b/src/ot_tracer_default.erl index fb42d4fa..0c8a206a 100644 --- a/src/ot_tracer_default.erl +++ b/src/ot_tracer_default.erl @@ -19,62 +19,57 @@ -behaviour(ot_tracer). --export([setup/1, - start_span/2, - with_span/1, +-export([start_span/3, with_span/2, - finish/0, - current_span_ctx/0, - get_binary_format/0, - get_http_text_format/0]). + with_span/3, + end_span/1, + on_end/1, + current_span_ctx/1, + get_binary_format/1, + get_http_text_format/1]). --define(SPAN_CTX, {?MODULE, span_ctx}). --define(CTX_IMPL_KEY, {?MODULE, ctx}). --define(SPAN_IMPL_KEY, {?MODULE, span}). +%% tracer access functions +-export([span_module/1]). --define(ctx, (persistent_term:get(?CTX_IMPL_KEY))). --define(span, (persistent_term:get(?SPAN_IMPL_KEY))). +-include("ot_tracer.hrl"). -type pdict_trace_ctx() :: {opentelemetry:span_ctx(), pdict_trace_ctx() | undefined}. --spec setup(map()) -> [supervisor:child_spec()]. -setup(Opts) -> - lists:filtermap(fun({ConfigKey, PersistentKey}) -> - {Module, Args} = maps:get(ConfigKey, Opts), - persistent_term:put(PersistentKey, Module), - case erlang:function_exported(Module, start_link, 1) of - true -> - {true, #{id => Module, - start => {Module, start_link, [Args]}}}; - false -> - false - end - end, [{ctx, ?CTX_IMPL_KEY}, - {span, ?SPAN_IMPL_KEY}]). - --spec start_span(opentelemetry:span_name(), ot_span:start_opts()) -> opentelemetry:span_ctx(). -start_span(Name, Opts) -> +-spec start_span(opentelemetry:tracer(), opentelemetry:span_name(), ot_span:start_opts()) + -> opentelemetry:span_ctx(). +start_span(Tracer, Name, Opts) when is_map_key(sampler, Opts) -> case ot_ctx:get(?ctx, ?SPAN_CTX) of {SpanCtx, _}=Ctx -> - SpanCtx1 = ot_span:start_span(?span, Name, Opts#{parent => SpanCtx}), + Processors = on_start(Tracer), + SpanCtx1 = ot_span_ets:start_span(Name, Opts#{parent => SpanCtx}, Processors), ot_ctx:with_value(?ctx, ?SPAN_CTX, {SpanCtx1, Ctx}), SpanCtx1; _ -> - SpanCtx = ot_span:start_span(?span, Name, Opts#{parent => undefined}), + Processors = on_start(Tracer), + SpanCtx = ot_span_ets:start_span(Name, Opts#{parent => undefined}, Processors), ot_ctx:with_value(?ctx, ?SPAN_CTX, {SpanCtx, undefined}), SpanCtx - end. + end; +start_span(Tracer={_, #tracer{sampler=Sampler}}, Name, Opts) -> + start_span(Tracer, Name, Opts#{sampler => Sampler}). + + +on_start({_, #tracer{processors=Processors}}) -> + fun(Span) -> [P:on_start(Span, Config) || {P, Config} <- Processors] end. --spec with_span(opentelemetry:span_ctx()) -> ok. -with_span(SpanCtx) -> +on_end({_, #tracer{processors=Processors}}) -> + fun(Span) -> [P:on_end(Span, Config) || {P, Config} <- Processors] end. + +-spec with_span(opentelemetry:tracer(), opentelemetry:span_ctx()) -> ok. +with_span(_Tracer, SpanCtx) -> ot_ctx:with_value(?ctx, ?SPAN_CTX, {SpanCtx, undefined}). --spec with_span(opentelemetry:span_ctx(), fun()) -> ok. -with_span(SpanCtx, Fun) -> +-spec with_span(opentelemetry:tracer(), opentelemetry:span_ctx(), fun()) -> ok. +with_span(_Tracer, SpanCtx, Fun) -> ot_ctx:with_value(?ctx, ?SPAN_CTX, {SpanCtx, undefined}, Fun). --spec current_span_ctx() -> opentelemetry:span_ctx(). -current_span_ctx() -> +-spec current_span_ctx(opentelemetry:tracer()) -> opentelemetry:span_ctx(). +current_span_ctx(_Tracer) -> case ot_ctx:get(?ctx, ?SPAN_CTX) of {SpanCtx, _ParentPdictSpanCtx} -> SpanCtx; @@ -89,23 +84,27 @@ current_span_ctx() -> current_ctx() -> ot_ctx:get(?ctx, ?SPAN_CTX). +span_module({_, #tracer{span_module=SpanModule}}) -> + SpanModule. + %%-------------------------------------------------------------------- %% @doc -%% Finishes the span in the current pdict context. And sets the parent +%% Ends the span in the current pdict context. And sets the parent %% as the current span ctx or undefined if there is no local parent. %% @end %%-------------------------------------------------------------------- --spec finish() -> ok. -finish() -> +-spec end_span(opentelemetry:tracer()) -> ok. +end_span(Tracer) -> {SpanCtx, ParentCtx} = current_ctx(), - ot_span:finish_span(?span, SpanCtx), + Processors = on_end(Tracer), + ot_span_ets:end_span(SpanCtx, Processors), ot_ctx:with_value(?ctx, ?SPAN_CTX, ParentCtx), ok. --spec get_binary_format() -> binary(). -get_binary_format() -> +-spec get_binary_format(opentelemetry:tracer()) -> binary(). +get_binary_format(_) -> <<>>. --spec get_http_text_format() -> opentelemetry:http_headers(). -get_http_text_format() -> +-spec get_http_text_format(opentelemetry:tracer()) -> opentelemetry:http_headers(). +get_http_text_format(_) -> []. diff --git a/src/ot_tracer_noop.erl b/src/ot_tracer_noop.erl deleted file mode 100644 index 391915f5..00000000 --- a/src/ot_tracer_noop.erl +++ /dev/null @@ -1,69 +0,0 @@ -%%%------------------------------------------------------------------------ -%% Copyright 2019, OpenTelemetry Authors -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%% -%% @doc -%% @end -%%%------------------------------------------------------------------------- --module(ot_tracer_noop). - --behaviour(ot_tracer). - --export([setup/1, - start_span/2, - with_span/1, - with_span/2, - finish/0, - current_span_ctx/0, - get_binary_format/0, - get_http_text_format/0]). - --include("opentelemetry.hrl"). - --define(NOOP_SPAN_CTX, #span_ctx{trace_id=0, - span_id=0, - trace_flags=0, - tracestate=[], - is_valid=false}). - --spec setup(map()) -> [supervisor:child_spec()]. -setup(_Opts) -> - []. - --spec start_span(opentelemetry:span_name(), ot_span:start_opts()) -> opentelemetry:span_ctx(). -start_span(_Name, _) -> - ?NOOP_SPAN_CTX. - --spec with_span(opentelemetry:span_ctx()) -> ok. -with_span(_SpanCtx) -> - ok. - --spec with_span(opentelemetry:span_ctx(), fun()) -> ok. -with_span(_SpanCtx, _) -> - ok. - --spec current_span_ctx() -> opentelemetry:span_ctx(). -current_span_ctx() -> - ?NOOP_SPAN_CTX. - --spec finish() -> ok. -finish() -> - ok. - --spec get_binary_format() -> binary(). -get_binary_format() -> - <<>>. - --spec get_http_text_format() -> opentelemetry:http_headers(). -get_http_text_format() -> - []. diff --git a/src/ot_tracer_server.erl b/src/ot_tracer_server.erl new file mode 100644 index 00000000..093edc95 --- /dev/null +++ b/src/ot_tracer_server.erl @@ -0,0 +1,65 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2019, OpenTelemetry Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc +%% @end +%%%------------------------------------------------------------------------- +-module(ot_tracer_server). + +-behaviour(gen_server). + +-export([start_link/1, + add_span_processor/1]). + +-export([init/1, + handle_call/3, + handle_cast/2]). + +-include("ot_tracer.hrl"). + +-record(state, {processors :: [module()], + sampler :: ot_sampler:sampler()}). + +start_link(Opts) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, Opts, []). + +add_span_processor(SpanProcessor) -> + gen_server:call(?MODULE, {add_span_processor, SpanProcessor}). + +init(Opts) -> + {Sampler, SamplerOpts} = proplists:get_value(sampler, Opts, {always_on, #{}}), + SamplerFun = ot_sampler:setup(Sampler, SamplerOpts), + Processors = proplists:get_value(processors, Opts, []), + + Tracer = #tracer{module=ot_tracer_default, + sampler=SamplerFun, + processors=Processors, + span_module=ot_span_ets, + ctx_module=ot_ctx_pdict}, + opentelemetry:set_default_tracer({ot_tracer_default, Tracer}), + + %% TODO: remove this and use ctx module from tracer state everywhere + persistent_term:put(?CTX_IMPL_KEY, ot_ctx_pdict), + + {ok, #state{sampler=SamplerFun, + processors=Processors}}. + +handle_call({add_span_processor, SpanProcessor}, _From, State=#state{processors=Processors}) -> + %% do something with these + {reply, ok, State#state{processors=Processors++[SpanProcessor]}}; +handle_call(_Msg, _From, State) -> + {noreply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. diff --git a/test/opentelemetry_SUITE.erl b/test/opentelemetry_SUITE.erl index b704de0a..1df84178 100644 --- a/test/opentelemetry_SUITE.erl +++ b/test/opentelemetry_SUITE.erl @@ -5,7 +5,7 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("common_test/include/ct.hrl"). --include("opentelemetry.hrl"). +-include_lib("opentelemetry_api/include/opentelemetry.hrl"). -include("ot_test_utils.hrl"). all() -> @@ -13,11 +13,11 @@ all() -> {group, ot_ctx_seqtrace}]. all_testcases() -> - [child_spans, non_default_tracer]. + [child_spans, update_span_data]. groups() -> - [{ot_ctx_pdict, [parallel, shuffle], all_testcases()}, - {ot_ctx_seqtrace, [parallel, shuffle], all_testcases()}]. + [{ot_ctx_pdict, [shuffle], all_testcases()}, + {ot_ctx_seqtrace, [shuffle], all_testcases()}]. init_per_suite(Config) -> application:load(opentelemetry), @@ -26,13 +26,9 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok. -init_per_group(CtxModule, Config) -> - application:set_env(opentelemetry, tracer, {ot_tracer_default, #{span => {ot_span_ets, []}, - ctx => {CtxModule, []}}}), - application:set_env(opentelemetry, exporter, [{exporters, []}, - {scheduled_delay_ms, 1}]), +init_per_group(_CtxModule, Config) -> + application:set_env(opentelemetry, processors, [{ot_batch_processor, [{scheduled_delay_ms, 1}]}]), {ok, _} = application:ensure_all_started(opentelemetry), - Config. end_per_group(_, _Config) -> @@ -42,7 +38,7 @@ init_per_testcase(_, Config) -> %% adds an exporter for a new table %% spans will be exported to a separate table for each of the test cases Tid = ets:new(exported_spans, [public, bag]), - ot_exporter:register(ot_exporter_tab, Tid), + ot_batch_processor:set_exporter(ot_exporter_tab, Tid), [{tid, Tid} | Config]. end_per_testcase(_, _Config) -> @@ -52,53 +48,68 @@ child_spans(Config) -> Tid = ?config(tid, Config), %% start a span and 2 children - SpanCtx1 = ot_tracer:start_span(<<"span-1">>), - SpanCtx2 = ot_tracer:start_span(<<"span-2">>), - SpanCtx3 = ot_tracer:start_span(<<"span-3">>), + SpanCtx1 = otel:start_span(<<"span-1">>), + SpanCtx2 = otel:start_span(<<"span-2">>), + SpanCtx3 = otel:start_span(<<"span-3">>), - %% finish the 3rd span - ?assertMatch(SpanCtx3, ot_tracer:current_span_ctx()), - ot_tracer:finish(), + %% end the 3rd span + ?assertMatch(SpanCtx3, otel:current_span_ctx()), + otel:end_span(), assert_exported(Tid, SpanCtx3), %% 2nd span should be the current span ctx now - ?assertMatch(SpanCtx2, ot_tracer:current_span_ctx()), + ?assertMatch(SpanCtx2, otel:current_span_ctx()), %% start another child of the 2nd span - SpanCtx4 = ot_tracer:start_span(<<"span-4">>), - ?assertMatch(SpanCtx4, ot_tracer:current_span_ctx()), + SpanCtx4 = otel:start_span(<<"span-4">>), + ?assertMatch(SpanCtx4, otel:current_span_ctx()), - %% finish 4th span and 2nd should be current - ot_tracer:finish(), - ?assertMatch(SpanCtx2, ot_tracer:current_span_ctx()), + %% end 4th span and 2nd should be current + otel:end_span(), + ?assertMatch(SpanCtx2, otel:current_span_ctx()), - %% finish 2th span and 1st should be current - ot_tracer:finish(), - ?assertMatch(SpanCtx1, ot_tracer:current_span_ctx()), + %% end 2th span and 1st should be current + otel:end_span(), + ?assertMatch(SpanCtx1, otel:current_span_ctx()), - %% finish first and no span should be current ctx - ot_tracer:finish(), - ?assertMatch(undefined, ot_tracer:current_span_ctx()), + %% end first and no span should be current ctx + otel:end_span(), + ?assertMatch(undefined, otel:current_span_ctx()), assert_all_exported(Tid, [SpanCtx1, SpanCtx2, SpanCtx3, SpanCtx4]). -non_default_tracer(Config) -> +update_span_data(Config) -> Tid = ?config(tid, Config), - SpanCtx1 = ot_tracer:start_span(<<"span-1">>), - ?assertNotMatch(#span_ctx{trace_id=0, - span_id=0}, SpanCtx1), - ot_tracer:finish(), + Links = [#link{trace_id=0, + span_id=0, + attributes=[], + tracestate=[]}], + + SpanCtx1=#span_ctx{trace_id=TraceId, + span_id=SpanId} = otel:start_span(<<"span-1">>, #{links => Links}), + otel:set_attribute(<<"key-1">>, <<"value-1">>), + + TimedEvents = opentelemetry:timed_events([{opentelemetry:timestamp(), <<"timed-event-name">>, []}]), + Status = opentelemetry:status(0, <<"status">>), + + %% with spanctx and tracer passed as an argument + Tracer = opentelemetry:get_tracer(), + ot_span:set_status(Tracer, SpanCtx1, Status), - SpanCtx2 = ot_tracer:start_span(ot_tracer_noop, <<"span-2">>, #{}), - ?assertMatch(#span_ctx{trace_id=0, - span_id=0}, SpanCtx2), - ?assertMatch(SpanCtx2, ot_tracer:current_span_ctx()), - ot_tracer:finish(), + ot_span:add_events(Tracer, SpanCtx1, TimedEvents), - assert_exported(Tid, SpanCtx1), - assert_not_exported(Tid, SpanCtx2). + ?assertMatch(SpanCtx1, otel:current_span_ctx()), + otel:end_span(), + + ?UNTIL([] =/= ets:match(Tid, #span{trace_id=TraceId, + span_id=SpanId, + attributes=[{<<"key-1">>, <<"value-1">>}], + links=Links, + status=Status, + timed_events=TimedEvents, + _='_'})). %% diff --git a/test/ot_exporter_SUITE.erl b/test/ot_batch_processor_SUITE.erl similarity index 50% rename from test/ot_exporter_SUITE.erl rename to test/ot_batch_processor_SUITE.erl index 0fe836dc..aa0904db 100644 --- a/test/ot_exporter_SUITE.erl +++ b/test/ot_batch_processor_SUITE.erl @@ -1,11 +1,11 @@ --module(ot_exporter_SUITE). +-module(ot_batch_processor_SUITE). -compile(export_all). -include_lib("stdlib/include/assert.hrl"). -include_lib("common_test/include/ct.hrl"). --include("opentelemetry.hrl"). +-include_lib("opentelemetry_api/include/opentelemetry.hrl"). -include("ot_sampler.hrl"). all() -> @@ -16,14 +16,25 @@ all() -> exporting_timeout_test(_Config) -> process_flag(trap_exit, true), - {ok, Pid} = ot_exporter:start_link([{exporters, [fun(_, _) -> timer:sleep(timer:minutes(10)) end]}, - {exporting_timeout_ms, 1}, - {scheduled_delay_ms, 1}]), + {ok, Pid} = ot_batch_processor:start_link([{exporter, ?MODULE}, + {exporting_timeout_ms, 1}, + {scheduled_delay_ms, 1}]), receive {'EXIT', Pid, _} -> - ct:fail(exporter_crash) + ct:fail(batch_processor_crash) after 100 -> ok end. + +%% exporter behaviour + +init(_) -> + {ok, []}. + +export(_, _) -> + timer:sleep(timer:minutes(10)). + +shutdown(_) -> + ok. diff --git a/test/ot_samplers_SUITE.erl b/test/ot_samplers_SUITE.erl index 5990c313..607699e6 100644 --- a/test/ot_samplers_SUITE.erl +++ b/test/ot_samplers_SUITE.erl @@ -5,7 +5,7 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("common_test/include/ct.hrl"). --include("opentelemetry.hrl"). +-include_lib("opentelemetry_api/include/opentelemetry.hrl"). -include("ot_sampler.hrl"). all() -> diff --git a/test/ot_sweeper_SUITE.erl b/test/ot_sweeper_SUITE.erl index d2367d9e..48cd0dbc 100644 --- a/test/ot_sweeper_SUITE.erl +++ b/test/ot_sweeper_SUITE.erl @@ -10,13 +10,15 @@ -include_lib("common_test/include/ct.hrl"). -include("ot_test_utils.hrl"). --include("opentelemetry.hrl"). +-include_lib("opentelemetry_api/include/opentelemetry.hrl"). + +-include("../src/ot_span_ets.hrl"). all() -> [storage_size, drop, - finish, - failed_attribute_and_finish]. + end_span, + failed_attribute_and_end_span]. init_per_suite(Config) -> application:load(opentelemetry), @@ -28,24 +30,25 @@ end_per_suite(_Config) -> init_per_testcase(storage_size, Config) -> application:set_env(opentelemetry, sweeper, #{interval => 250, - strategy => finish, + strategy => end_span, span_ttl => 500, storage_size => 100}), - application:set_env(opentelemetry, tracer, {ot_tracer_default, #{span => {ot_span_ets, []}, - ctx => {ot_ctx_pdict, []}}}), - application:set_env(opentelemetry, exporter, [{exporters, [{ot_exporter_pid, self()}]}, - {scheduled_delay_ms, 1}]), + application:set_env(opentelemetry, tracer, ot_tracer_default), + application:set_env(opentelemetry, processors, [{ot_batch_processor, [{scheduled_delay_ms, 1}]}]), {ok, _} = application:ensure_all_started(opentelemetry), + + ot_batch_processor:set_exporter(ot_exporter_pid, self()), Config; init_per_testcase(Type, Config) -> application:set_env(opentelemetry, sweeper, #{interval => 250, strategy => Type, span_ttl => 500}), - application:set_env(opentelemetry, tracer, {ot_tracer_default, #{span => {ot_span_ets, []}, - ctx => {ot_ctx_pdict, []}}}), - application:set_env(opentelemetry, exporter, [{exporters, [{ot_exporter_pid, self()}]}, - {scheduled_delay_ms, 1}]), + application:set_env(opentelemetry, tracer, ot_tracer_default), + application:set_env(opentelemetry, processors, [{ot_batch_processor, [{scheduled_delay_ms, 1}]}]), {ok, _} = application:ensure_all_started(opentelemetry), + + ot_batch_processor:set_exporter(ot_exporter_pid, self()), + Config. end_per_testcase(_, _Config) -> @@ -54,10 +57,10 @@ end_per_testcase(_, _Config) -> storage_size(_Config) -> SpanName1 = <<"span-1">>, - SpanCtx = ot_tracer:start_span(SpanName1), + SpanCtx = otel:start_span(SpanName1), ChildSpanName1 = <<"child-span-1">>, - ChildSpanCtx = ot_tracer:start_span(ChildSpanName1), + ChildSpanCtx = otel:start_span(ChildSpanName1), [ChildSpanData] = ets:lookup(?SPAN_TAB, ChildSpanCtx#span_ctx.span_id), ?assertEqual(ChildSpanName1, ChildSpanData#span.name), @@ -80,21 +83,21 @@ storage_size(_Config) -> drop(_Config) -> SpanName1 = <<"span-1">>, - SpanCtx = ot_tracer:start_span(SpanName1), + SpanCtx = otel:start_span(SpanName1), ChildSpanName1 = <<"child-span-1">>, - ChildSpanCtx = ot_tracer:start_span(ChildSpanName1), + ChildSpanCtx = otel:start_span(ChildSpanName1), [ChildSpanData] = ets:lookup(?SPAN_TAB, ChildSpanCtx#span_ctx.span_id), ?assertEqual(ChildSpanName1, ChildSpanData#span.name), ?assertEqual(SpanCtx#span_ctx.span_id, ChildSpanData#span.parent_span_id), - ot_tracer:finish(), + otel:end_span(), %% wait until the sweeper sweeps away the parent span ?UNTIL(ets:tab2list(?SPAN_TAB) =:= []), - ot_tracer:finish(), + otel:end_span(), receive {span, S=#span{name=Name}} when Name =:= ChildSpanName1 -> @@ -119,13 +122,13 @@ drop(_Config) -> no_span end). -finish(_Config) -> +end_span(_Config) -> SpanName1 = <<"span-1">>, - _SpanCtx = ot_tracer:start_span(SpanName1), + _SpanCtx = otel:start_span(SpanName1), ChildSpanName1 = <<"child-span-1">>, - _ChildSpanCtx = ot_tracer:start_span(ChildSpanName1), - ot_tracer:finish(), + _ChildSpanCtx = otel:start_span(ChildSpanName1), + otel:end_span(), %% wait until the sweeper sweeps away the parent span ?UNTIL(ets:tab2list(?SPAN_TAB) =:= []), @@ -143,18 +146,18 @@ finish(_Config) -> end end, [SpanName1, ChildSpanName1]). -failed_attribute_and_finish(_Config) -> +failed_attribute_and_end_span(_Config) -> SpanName1 = <<"span-1">>, - SpanCtx = ot_tracer:start_span(SpanName1), + SpanCtx = otel:start_span(SpanName1), ChildSpanName1 = <<"child-span-1">>, - ChildSpanCtx = ot_tracer:start_span(ChildSpanName1), + ChildSpanCtx = otel:start_span(ChildSpanName1), [ChildSpanData] = ets:lookup(?SPAN_TAB, ChildSpanCtx#span_ctx.span_id), ?assertEqual(ChildSpanName1, ChildSpanData#span.name), ?assertEqual(SpanCtx#span_ctx.span_id, ChildSpanData#span.parent_span_id), - ot_tracer:finish(), + otel:end_span(), %% wait until the sweeper sweeps away the parent span ?UNTIL(ets:tab2list(?SPAN_TAB) =:= []),