diff --git a/lib/datadog/core/configuration/components.rb b/lib/datadog/core/configuration/components.rb index f7e00d7a98..46106accc1 100644 --- a/lib/datadog/core/configuration/components.rb +++ b/lib/datadog/core/configuration/components.rb @@ -169,8 +169,9 @@ def shutdown!(replacement = nil) unused_statsd = (old_statsd - (old_statsd & new_statsd)) unused_statsd.each(&:close) - telemetry.stop! + # enqueue closing event before stopping telemetry so it will be send out on shutdown telemetry.emit_closing! unless replacement + telemetry.stop! end end end diff --git a/lib/datadog/core/telemetry/client.rb b/lib/datadog/core/telemetry/client.rb index 07028e96ad..05a96e3923 100644 --- a/lib/datadog/core/telemetry/client.rb +++ b/lib/datadog/core/telemetry/client.rb @@ -19,7 +19,6 @@ class Client # @param [Boolean] dependency_collection Whether to send the `app-dependencies-loaded` event def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: true) @enabled = enabled - @emitter = Emitter.new @stopped = false @started = false @dependency_collection = dependency_collection @@ -27,7 +26,7 @@ def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: tru @worker = Telemetry::Worker.new( enabled: @enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, - emitter: @emitter + emitter: Emitter.new ) end @@ -41,7 +40,7 @@ def started! @worker.start - @emitter.request(Event::AppDependenciesLoaded.new) if @dependency_collection + @worker.enqueue(Event::AppDependenciesLoaded.new) if @dependency_collection @started = true end @@ -49,27 +48,28 @@ def started! def emit_closing! return if !@enabled || forked? - @emitter.request(Event::AppClosing.new) + @worker.enqueue(Event::AppClosing.new) end def stop! return if @stopped - @worker.stop(true, 0) + # gracefully stop the worker and send leftover events + @worker.stop @stopped = true end def integrations_change! return if !@enabled || forked? - @emitter.request(Event::AppIntegrationsChange.new) + @worker.enqueue(Event::AppIntegrationsChange.new) end # Report configuration changes caused by Remote Configuration. def client_configuration_change!(changes) return if !@enabled || forked? - @emitter.request(Event::AppClientConfigurationChange.new(changes, 'remote_config')) + @worker.enqueue(Event::AppClientConfigurationChange.new(changes, 'remote_config')) end end end diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index a837259c3f..93fccab9a6 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -13,7 +13,15 @@ class Worker include Core::Workers::Queue include Core::Workers::Polling - def initialize(heartbeat_interval_seconds:, emitter:, enabled: true) + DEFAULT_BUFFER_MAX_SIZE = 1000 + + def initialize( + heartbeat_interval_seconds:, + emitter:, + enabled: true, + shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT, + buffer_size: DEFAULT_BUFFER_MAX_SIZE + ) @emitter = emitter @sent_started_event = false @@ -23,6 +31,11 @@ def initialize(heartbeat_interval_seconds:, emitter:, enabled: true) # Workers::IntervalLoop settings self.loop_base_interval = heartbeat_interval_seconds self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP + + @shutdown_timeout = shutdown_timeout + @buffer_size = buffer_size + + self.buffer = buffer_klass.new(@buffer_size) end def start @@ -32,6 +45,16 @@ def start perform end + def stop(force_stop = false, timeout = @shutdown_timeout) + buffer.close if running? + + super + end + + def enqueue(event) + buffer.push(event) + end + def sent_started_event? @sent_started_event end @@ -85,6 +108,18 @@ def send_event(event) Datadog.logger.debug { "Received response: #{response}" } response end + + def dequeue + buffer.pop + end + + def buffer_klass + if Core::Environment::Ext::RUBY_ENGINE == 'ruby' + Core::Buffer::CRuby + else + Core::Buffer::ThreadSafe + end + end end end end diff --git a/sig/datadog/core/telemetry/client.rbs b/sig/datadog/core/telemetry/client.rbs index e1646cb1eb..007050163a 100644 --- a/sig/datadog/core/telemetry/client.rbs +++ b/sig/datadog/core/telemetry/client.rbs @@ -6,14 +6,13 @@ module Datadog @dependency_collection: bool @started: bool @stopped: bool - @emitter: Datadog::Core::Telemetry::Emitter @worker: Datadog::Core::Telemetry::Worker attr_reader enabled: bool include Core::Utils::Forking - def initialize: (heartbeat_interval_seconds: Numeric, dependency_collection: bool, enabled: bool) -> void + def initialize: (heartbeat_interval_seconds: Numeric, dependency_collection: bool, ?enabled: bool) -> void def disable!: () -> void diff --git a/sig/datadog/core/telemetry/worker.rbs b/sig/datadog/core/telemetry/worker.rbs index 64b3de884d..9220dfeea0 100644 --- a/sig/datadog/core/telemetry/worker.rbs +++ b/sig/datadog/core/telemetry/worker.rbs @@ -8,15 +8,21 @@ module Datadog include Core::Workers::IntervalLoop include Core::Workers::Queue + DEFAULT_BUFFER_MAX_SIZE: 1000 + @emitter: Emitter @sent_started_event: bool + @shutdown_timeout: Integer + @buffer_size: Integer - def initialize: (?enabled: bool, heartbeat_interval_seconds: Numeric, emitter: Emitter) -> void + def initialize: (?enabled: bool, heartbeat_interval_seconds: Numeric, emitter: Emitter, ?shutdown_timeout: Integer, ?buffer_size: Integer) -> void def start: () -> void def sent_started_event?: () -> bool + def enqueue: (Event::Base event) -> void + private def heartbeat!: () -> void @@ -26,6 +32,8 @@ module Datadog def flush_events: (Array[Event::Base] events) -> void def send_event: (Event::Base event) -> Datadog::Core::Telemetry::Http::Adapters::Net::Response + + def buffer_klass: () -> untyped end end end diff --git a/sig/datadog/core/workers/polling.rbs b/sig/datadog/core/workers/polling.rbs index 7f4d8f9c55..43c1360a92 100644 --- a/sig/datadog/core/workers/polling.rbs +++ b/sig/datadog/core/workers/polling.rbs @@ -2,7 +2,7 @@ module Datadog module Core module Workers module Polling - SHUTDOWN_TIMEOUT: 1 + DEFAULT_SHUTDOWN_TIMEOUT: 1 def self.included: (Class | Module base) -> void diff --git a/spec/datadog/core/telemetry/client_spec.rb b/spec/datadog/core/telemetry/client_spec.rb index ed5d991feb..2e1a0a34bc 100644 --- a/spec/datadog/core/telemetry/client_spec.rb +++ b/spec/datadog/core/telemetry/client_spec.rb @@ -14,15 +14,15 @@ let(:enabled) { true } let(:heartbeat_interval_seconds) { 0 } let(:dependency_collection) { true } - let(:emitter) { double(Datadog::Core::Telemetry::Emitter) } - let(:response) { double(Datadog::Core::Telemetry::Http::Adapters::Net::Response) } + let(:worker) { double(Datadog::Core::Telemetry::Worker) } let(:not_found) { false } before do - allow(Datadog::Core::Telemetry::Emitter).to receive(:new).and_return(emitter) - allow(emitter).to receive(:request).and_return(response) - allow(response).to receive(:not_found?).and_return(not_found) - allow(response).to receive(:ok?).and_return(!not_found) + allow(Datadog::Core::Telemetry::Worker).to receive(:new).and_return(worker) + allow(worker).to receive(:start) + allow(worker).to receive(:enqueue) + allow(worker).to receive(:stop) + allow(worker).to receive(:"enabled=") end describe '#initialize' do @@ -62,6 +62,12 @@ end it { expect { client.disable! }.to change { client.enabled }.from(true).to(false) } + + it 'disables worker' do + client.disable! + + expect(worker).to have_received(:"enabled=").with(false) + end end describe '#started!' do @@ -75,7 +81,8 @@ let(:enabled) { false } it do started! - expect(emitter).to_not have_received(:request) + + expect(worker).to_not have_received(:start) end end @@ -84,13 +91,11 @@ context 'when dependency_collection is true' do it do - dependencies = double - allow(Datadog::Core::Telemetry::Event::AppDependenciesLoaded) - .to receive(:new).with(no_args).and_return(dependencies) - started! - expect(emitter).to have_received(:request).with(dependencies) + expect(worker).to have_received(:enqueue).with( + an_instance_of(Datadog::Core::Telemetry::Event::AppDependenciesLoaded) + ) end end @@ -98,13 +103,9 @@ let(:dependency_collection) { false } it do - dependencies = double - allow(Datadog::Core::Telemetry::Event::AppDependenciesLoaded) - .to receive(:new).with(no_args).and_return(dependencies) - started! - expect(emitter).to_not have_received(:request).with(dependencies) + expect(worker).not_to have_received(:enqueue) end end end @@ -115,8 +116,9 @@ it do client expect_in_fork do - expect(emitter).to_not receive(:request) client.started! + + expect(worker).to_not have_received(:start) end end end @@ -133,21 +135,20 @@ let(:enabled) { false } it do emit_closing! - expect(emitter).to_not have_received(:request) + + expect(worker).not_to have_received(:enqueue) end end context 'when enabled' do let(:enabled) { true } it do - double = double() - allow(Datadog::Core::Telemetry::Event::AppClosing).to receive(:new).with(no_args).and_return(double) - emit_closing! - expect(emitter).to have_received(:request).with(double) - end - it { is_expected.to be(response) } + expect(worker).to have_received(:enqueue).with( + an_instance_of(Datadog::Core::Telemetry::Event::AppClosing) + ) + end end context 'when in fork' do @@ -156,8 +157,9 @@ it do client expect_in_fork do - expect(emitter).to_not receive(:request) client.started! + + expect(worker).not_to have_received(:enqueue) end end end @@ -165,32 +167,12 @@ describe '#stop!' do subject(:stop!) { client.stop! } - let(:worker) { instance_double(Datadog::Core::Telemetry::Worker) } - - before do - allow(Datadog::Core::Telemetry::Worker).to receive(:new) - .with(enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, emitter: emitter) - .and_return(worker) - allow(worker).to receive(:start) - allow(worker).to receive(:stop) - end - context 'when disabled' do - let(:enabled) { false } - it 'does not raise error' do - stop! - end - end + it 'stops worker once' do + stop! + stop! - context 'when enabled' do - let(:enabled) { true } - - context 'when stop! has been called already' do - it 'does not raise error' do - stop! - stop! - end - end + expect(worker).to have_received(:stop).once end end @@ -205,21 +187,20 @@ let(:enabled) { false } it do integrations_change! - expect(emitter).to_not have_received(:request) + + expect(worker).not_to have_received(:enqueue) end end context 'when enabled' do let(:enabled) { true } it do - double = double() - allow(Datadog::Core::Telemetry::Event::AppIntegrationsChange).to receive(:new).with(no_args).and_return(double) - integrations_change! - expect(emitter).to have_received(:request).with(double) - end - it { is_expected.to be(response) } + expect(worker).to have_received(:enqueue).with( + an_instance_of(Datadog::Core::Telemetry::Event::AppIntegrationsChange) + ) + end end context 'when in fork' do @@ -228,8 +209,9 @@ it do client expect_in_fork do - expect(emitter).to_not receive(:request) client.started! + + expect(worker).not_to have_received(:enqueue) end end end @@ -247,24 +229,20 @@ let(:enabled) { false } it do client_configuration_change! - expect(emitter).to_not have_received(:request) + + expect(worker).not_to have_received(:enqueue) end end context 'when enabled' do let(:enabled) { true } it do - double = double() - allow(Datadog::Core::Telemetry::Event::AppClientConfigurationChange).to receive(:new).with( - changes, - 'remote_config' - ).and_return(double) - client_configuration_change! - expect(emitter).to have_received(:request).with(double) - end - it { is_expected.to be(response) } + expect(worker).to have_received(:enqueue).with( + an_instance_of(Datadog::Core::Telemetry::Event::AppClientConfigurationChange) + ) + end end context 'when in fork' do @@ -273,8 +251,9 @@ it do client expect_in_fork do - expect(emitter).to_not receive(:request) client.started! + + expect(worker).not_to have_received(:enqueue) end end end diff --git a/spec/datadog/core/telemetry/worker_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb index 8b8aba3178..ba389c2380 100644 --- a/spec/datadog/core/telemetry/worker_spec.rb +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -11,10 +11,34 @@ let(:heartbeat_interval_seconds) { 0.5 } let(:emitter) { double(Datadog::Core::Telemetry::Emitter) } + let(:backend_supports_telemetry?) { true } + let(:response) do + double( + Datadog::Core::Telemetry::Http::Adapters::Net::Response, + not_found?: !backend_supports_telemetry?, + ok?: backend_supports_telemetry? + ) + end + before do logger = double(Datadog::Core::Logger) allow(logger).to receive(:debug).with(any_args) allow(Datadog).to receive(:logger).and_return(logger) + + @received_started = false + @received_heartbeat = false + + allow(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppStarted)) do + @received_started = true + + response + end + + allow(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do + @received_heartbeat = true + + response + end end after do @@ -36,31 +60,6 @@ describe '#start' do context 'when enabled' do - let(:response) do - double( - Datadog::Core::Telemetry::Http::Adapters::Net::Response, - not_found?: !backend_supports_telemetry?, - ok?: backend_supports_telemetry? - ) - end - - before do - @received_started = false - @received_heartbeat = false - - allow(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppStarted)) do - @received_started = true - - response - end - - allow(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do - @received_heartbeat = true - - response - end - end - context "when backend doesn't support telemetry" do let(:backend_supports_telemetry?) { false } @@ -137,4 +136,24 @@ end end end + + describe '#enqueue' do + it 'adds events to the buffer and flushes them later' do + events_received = 0 + allow(emitter).to receive(:request).with( + an_instance_of(Datadog::Core::Telemetry::Event::AppIntegrationsChange) + ) do + events_received += 1 + end + + worker.start + + events_sent = 3 + events_sent.times do + worker.enqueue(Datadog::Core::Telemetry::Event::AppIntegrationsChange.new) + end + + try_wait_until { events_received == events_sent } + end + end end