Skip to content

Commit

Permalink
enqueue events to be sent later by worker instead of sending them syn…
Browse files Browse the repository at this point in the history
…chronously
  • Loading branch information
anmarchenko committed Jun 14, 2024
1 parent a248e25 commit c03e392
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 107 deletions.
3 changes: 2 additions & 1 deletion lib/datadog/core/configuration/components.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,9 @@ def shutdown!(replacement = nil)
unused_statsd = (old_statsd - (old_statsd & new_statsd))
unused_statsd.each(&:close)

telemetry.stop!
# enqueue closing event before stopping telemetry so it will be send out on shutdown
telemetry.emit_closing! unless replacement
telemetry.stop!
end
end
end
Expand Down
14 changes: 7 additions & 7 deletions lib/datadog/core/telemetry/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ class Client
# @param [Boolean] dependency_collection Whether to send the `app-dependencies-loaded` event
def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: true)
@enabled = enabled
@emitter = Emitter.new
@stopped = false
@started = false
@dependency_collection = dependency_collection

@worker = Telemetry::Worker.new(
enabled: @enabled,
heartbeat_interval_seconds: heartbeat_interval_seconds,
emitter: @emitter
emitter: Emitter.new
)
end

Expand All @@ -41,35 +40,36 @@ def started!

@worker.start

@emitter.request(Event::AppDependenciesLoaded.new) if @dependency_collection
@worker.enqueue(Event::AppDependenciesLoaded.new) if @dependency_collection

@started = true
end

def emit_closing!
return if !@enabled || forked?

@emitter.request(Event::AppClosing.new)
@worker.enqueue(Event::AppClosing.new)
end

def stop!
return if @stopped

@worker.stop(true, 0)
# gracefully stop the worker and send leftover events
@worker.stop
@stopped = true
end

def integrations_change!
return if !@enabled || forked?

@emitter.request(Event::AppIntegrationsChange.new)
@worker.enqueue(Event::AppIntegrationsChange.new)
end

# Report configuration changes caused by Remote Configuration.
def client_configuration_change!(changes)
return if !@enabled || forked?

@emitter.request(Event::AppClientConfigurationChange.new(changes, 'remote_config'))
@worker.enqueue(Event::AppClientConfigurationChange.new(changes, 'remote_config'))
end
end
end
Expand Down
37 changes: 36 additions & 1 deletion lib/datadog/core/telemetry/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@ class Worker
include Core::Workers::Queue
include Core::Workers::Polling

def initialize(heartbeat_interval_seconds:, emitter:, enabled: true)
DEFAULT_BUFFER_MAX_SIZE = 1000

def initialize(
heartbeat_interval_seconds:,
emitter:,
enabled: true,
shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT,
buffer_size: DEFAULT_BUFFER_MAX_SIZE
)
@emitter = emitter

@sent_started_event = false
Expand All @@ -23,6 +31,11 @@ def initialize(heartbeat_interval_seconds:, emitter:, enabled: true)
# Workers::IntervalLoop settings
self.loop_base_interval = heartbeat_interval_seconds
self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP

@shutdown_timeout = shutdown_timeout
@buffer_size = buffer_size

self.buffer = buffer_klass.new(@buffer_size)
end

def start
Expand All @@ -32,6 +45,16 @@ def start
perform
end

def stop(force_stop = false, timeout = @shutdown_timeout)
buffer.close if running?

super
end

def enqueue(event)
buffer.push(event)
end

def sent_started_event?
@sent_started_event
end
Expand Down Expand Up @@ -85,6 +108,18 @@ def send_event(event)
Datadog.logger.debug { "Received response: #{response}" }
response
end

def dequeue
buffer.pop
end

def buffer_klass
if Core::Environment::Ext::RUBY_ENGINE == 'ruby'
Core::Buffer::CRuby
else
Core::Buffer::ThreadSafe
end
end
end
end
end
Expand Down
3 changes: 1 addition & 2 deletions sig/datadog/core/telemetry/client.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ module Datadog
@dependency_collection: bool
@started: bool
@stopped: bool
@emitter: Datadog::Core::Telemetry::Emitter
@worker: Datadog::Core::Telemetry::Worker

attr_reader enabled: bool

include Core::Utils::Forking

def initialize: (heartbeat_interval_seconds: Numeric, dependency_collection: bool, enabled: bool) -> void
def initialize: (heartbeat_interval_seconds: Numeric, dependency_collection: bool, ?enabled: bool) -> void

def disable!: () -> void

Expand Down
10 changes: 9 additions & 1 deletion sig/datadog/core/telemetry/worker.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,21 @@ module Datadog
include Core::Workers::IntervalLoop
include Core::Workers::Queue

DEFAULT_BUFFER_MAX_SIZE: 1000

@emitter: Emitter
@sent_started_event: bool
@shutdown_timeout: Integer
@buffer_size: Integer

def initialize: (?enabled: bool, heartbeat_interval_seconds: Numeric, emitter: Emitter) -> void
def initialize: (?enabled: bool, heartbeat_interval_seconds: Numeric, emitter: Emitter, ?shutdown_timeout: Integer, ?buffer_size: Integer) -> void

def start: () -> void

def sent_started_event?: () -> bool

def enqueue: (Event::Base event) -> void

private

def heartbeat!: () -> void
Expand All @@ -26,6 +32,8 @@ module Datadog
def flush_events: (Array[Event::Base] events) -> void

def send_event: (Event::Base event) -> Datadog::Core::Telemetry::Http::Adapters::Net::Response

def buffer_klass: () -> untyped
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion sig/datadog/core/workers/polling.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module Datadog
module Core
module Workers
module Polling
SHUTDOWN_TIMEOUT: 1
DEFAULT_SHUTDOWN_TIMEOUT: 1

def self.included: (Class | Module base) -> void

Expand Down
Loading

0 comments on commit c03e392

Please sign in to comment.