Skip to content

Commit

Permalink
Merge pull request #3749 from DataDog/anmarchenko/send_telemetry_in_b…
Browse files Browse the repository at this point in the history
…atch

[SDTEST-409] Send telemetry events in batches
  • Loading branch information
anmarchenko authored Jul 2, 2024
2 parents 7611172 + 8d02f3e commit 8081b9d
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 50 deletions.
99 changes: 73 additions & 26 deletions lib/datadog/core/telemetry/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@
module Datadog
module Core
module Telemetry
# Collection of telemetry events
class Event
extend Core::Utils::Forking

# returns sequence that increments every time the configuration changes
def self.configuration_sequence
after_fork! { @sequence = Datadog::Core::Utils::Sequence.new(1) }
@sequence ||= Datadog::Core::Utils::Sequence.new(1)
end

# Base class for all Telemetry V2 events.
class Base
# The type of the event.
Expand All @@ -12,8 +21,7 @@ class Base
def type; end

# The JSON payload for the event.
# @param seq_id [Integer] The sequence ID for the event.
def payload(seq_id)
def payload
{}
end
end
Expand All @@ -24,8 +32,7 @@ def type
'app-started'
end

def payload(seq_id)
@seq_id = seq_id
def payload
{
products: products,
configuration: configuration,
Expand Down Expand Up @@ -80,16 +87,19 @@ def products
].freeze

# rubocop:disable Metrics/AbcSize
# rubocop:disable Metrics/MethodLength
def configuration
config = Datadog.configuration
seq_id = Event.configuration_sequence.next

list = [
conf_value('DD_AGENT_HOST', config.agent.host),
conf_value('DD_AGENT_TRANSPORT', agent_transport(config)),
conf_value('DD_TRACE_SAMPLE_RATE', to_value(config.tracing.sampling.default_rate)),
conf_value('DD_AGENT_HOST', config.agent.host, seq_id),
conf_value('DD_AGENT_TRANSPORT', agent_transport(config), seq_id),
conf_value('DD_TRACE_SAMPLE_RATE', to_value(config.tracing.sampling.default_rate), seq_id),
conf_value(
'DD_TRACE_REMOVE_INTEGRATION_SERVICE_NAMES_ENABLED',
config.tracing.contrib.global_default_service_name.enabled
config.tracing.contrib.global_default_service_name.enabled,
seq_id
),
]

Expand All @@ -98,32 +108,45 @@ def configuration
peer_service_mapping = config.tracing.contrib.peer_service_mapping
peer_service_mapping_str = peer_service_mapping.map { |key, value| "#{key}:#{value}" }.join(',')
end
list << conf_value('DD_TRACE_PEER_SERVICE_MAPPING', peer_service_mapping_str)
list << conf_value('DD_TRACE_PEER_SERVICE_MAPPING', peer_service_mapping_str, seq_id)

# Whitelist of configuration options to send in additional payload object
TARGET_OPTIONS.each do |option|
split_option = option.split('.')
list << conf_value(option, to_value(config.dig(*split_option)))
list << conf_value(option, to_value(config.dig(*split_option)), seq_id)
end

# Add some more custom additional payload values here
list.push(
conf_value('tracing.auto_instrument.enabled', !defined?(Datadog::AutoInstrument::LOADED).nil?),
conf_value('tracing.writer_options.buffer_size', to_value(config.tracing.writer_options[:buffer_size])),
conf_value('tracing.writer_options.flush_interval', to_value(config.tracing.writer_options[:flush_interval])),
conf_value('tracing.opentelemetry.enabled', !defined?(Datadog::OpenTelemetry::LOADED).nil?),
conf_value('tracing.auto_instrument.enabled', !defined?(Datadog::AutoInstrument::LOADED).nil?, seq_id),
conf_value(
'tracing.writer_options.buffer_size',
to_value(config.tracing.writer_options[:buffer_size]),
seq_id
),
conf_value(
'tracing.writer_options.flush_interval',
to_value(config.tracing.writer_options[:flush_interval]),
seq_id
),
conf_value(
'tracing.opentelemetry.enabled',
!defined?(Datadog::OpenTelemetry::LOADED).nil?,
seq_id
),
)
list << conf_value('logger.instance', config.logger.instance.class.to_s) if config.logger.instance
list << conf_value('logger.instance', config.logger.instance.class.to_s, seq_id) if config.logger.instance
if config.respond_to?('appsec')
list << conf_value('appsec.enabled', config.dig('appsec', 'enabled'))
list << conf_value('appsec.sca_enabled', config.dig('appsec', 'sca_enabled'))
list << conf_value('appsec.enabled', config.dig('appsec', 'enabled'), seq_id)
list << conf_value('appsec.sca_enabled', config.dig('appsec', 'sca_enabled'), seq_id)
end
list << conf_value('ci.enabled', config.dig('ci', 'enabled')) if config.respond_to?('ci')
list << conf_value('ci.enabled', config.dig('ci', 'enabled'), seq_id) if config.respond_to?('ci')

list.reject! { |entry| entry[:value].nil? }
list
end
# rubocop:enable Metrics/AbcSize
# rubocop:enable Metrics/MethodLength

def agent_transport(config)
adapter = Core::Configuration::AgentSettingsResolver.call(config).adapter
Expand All @@ -134,12 +157,12 @@ def agent_transport(config)
end
end

def conf_value(name, value, origin = 'code')
def conf_value(name, value, seq_id, origin = 'code')
{
name: name,
value: value,
origin: origin,
seq_id: @seq_id,
seq_id: seq_id,
}
end

Expand Down Expand Up @@ -169,7 +192,7 @@ def type
'app-dependencies-loaded'
end

def payload(seq_id)
def payload
{ dependencies: dependencies }
end

Expand All @@ -192,7 +215,7 @@ def type
'app-integrations-change'
end

def payload(seq_id)
def payload
{ integrations: integrations }
end

Expand Down Expand Up @@ -245,18 +268,20 @@ def initialize(changes, origin)
@origin = origin
end

def payload(seq_id)
{ configuration: configuration(seq_id) }
def payload
{ configuration: configuration }
end

def configuration(seq_id)
def configuration
config = Datadog.configuration
seq_id = Event.configuration_sequence.next

res = @changes.map do |name, value|
{
name: name,
value: value,
origin: @origin,
seq_id: seq_id,
}
end

Expand Down Expand Up @@ -299,7 +324,7 @@ def initialize(namespace, metric_series)
@metric_series = metric_series
end

def payload(_)
def payload
{
namespace: @namespace,
series: @metric_series.map(&:to_h)
Expand All @@ -313,6 +338,28 @@ def type
'distributions'
end
end

# Telemetry class for the 'message-batch' event
class MessageBatch
attr_reader :events

def type
'message-batch'
end

def initialize(events)
@events = events
end

def payload
@events.map do |event|
{
request_type: event.type,
payload: event.payload,
}
end
end
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/core/telemetry/request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def build_payload(event, seq_id)
application: application,
debug: false,
host: host,
payload: event.payload(seq_id),
payload: event.payload,
request_type: event.type,
runtime_id: Core::Environment::Identity.id,
seq_id: seq_id,
Expand Down
6 changes: 2 additions & 4 deletions lib/datadog/core/telemetry/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,11 @@ def perform(*events)
end

def flush_events(events)
return if events.nil?
return if events.nil? || events.empty?
return if !enabled? || !sent_started_event?

Datadog.logger.debug { "Sending #{events&.count} telemetry events" }
events.each do |event|
send_event(event)
end
send_event(Event::MessageBatch.new(events))
end

def heartbeat!
Expand Down
2 changes: 1 addition & 1 deletion sig/datadog/core/telemetry/emitter.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module Datadog
module Core
module Telemetry
class Emitter
@sequence: Datadog::Core::Utils::Sequence
self.@sequence: Datadog::Core::Utils::Sequence

attr_reader http_transport: untyped

Expand Down
21 changes: 16 additions & 5 deletions sig/datadog/core/telemetry/event.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ module Datadog
module Core
module Telemetry
class Event
extend Core::Utils::Forking

self.@sequence: Datadog::Core::Utils::Sequence

def self.configuration_sequence: () -> Datadog::Core::Utils::Sequence

class Base
def payload: (int seq_id) -> Hash[Symbol, untyped]
def payload: () -> (Hash[Symbol, untyped] | Array[Hash[Symbol, untyped]])
def type: -> String?
end

class AppStarted < Base
TARGET_OPTIONS: Array[String]

@seq_id: int

private

def products: -> Hash[Symbol, untyped]
Expand All @@ -20,7 +24,7 @@ module Datadog

def agent_transport: (untyped config) -> String

def conf_value: (String name, Object value, ?String origin) -> Hash[Symbol, untyped]
def conf_value: (String name, Object value, Integer seq_id, ?String origin) -> Hash[Symbol, untyped]

def to_value: (Object value) -> Object

Expand All @@ -47,7 +51,7 @@ module Datadog

def initialize: (Enumerable[[String, Numeric | bool | String]] changes, String origin) -> void

def configuration: (int seq_id) -> Array[Hash[Symbol, untyped]]
def configuration: () -> Array[Hash[Symbol, untyped]]
end

class AppHeartbeat < Base
Expand All @@ -65,6 +69,13 @@ module Datadog

class Distributions < GenerateMetrics
end

class MessageBatch < Base
attr_reader events: Array[Datadog::Core::Telemetry::Event::Base]
@events: Array[Datadog::Core::Telemetry::Event::Base]

def initialize: (Array[Datadog::Core::Telemetry::Event::Base] events) -> void
end
end
end
end
Expand Down
32 changes: 30 additions & 2 deletions spec/datadog/core/telemetry/event_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

RSpec.describe Datadog::Core::Telemetry::Event do
let(:id) { double('seq_id') }
subject(:payload) { event.payload(id) }
subject(:payload) { event.payload }

context 'AppStarted' do
let(:event) { described_class::AppStarted.new }
Expand All @@ -14,6 +14,8 @@
end

before do
allow_any_instance_of(Datadog::Core::Utils::Sequence).to receive(:next).and_return(id)

Datadog.configure do |c|
c.agent.host = '1.2.3.4'
c.tracing.sampling.default_rate = 0.5
Expand Down Expand Up @@ -164,12 +166,17 @@ def contain_configuration(*array)
let(:name) { 'key' }
let(:value) { 'value' }

before do
allow_any_instance_of(Datadog::Core::Utils::Sequence).to receive(:next).and_return(id)
end

it 'has a list of client configurations' do
is_expected.to eq(
configuration: [{
name: name,
value: value,
origin: origin,
seq_id: id
}]
)
end
Expand All @@ -185,7 +192,7 @@ def contain_configuration(*array)
is_expected.to eq(
configuration:
[
{ name: name, value: value, origin: origin },
{ name: name, value: value, origin: origin, seq_id: id },
{ name: 'appsec.sca_enabled', value: false, origin: 'code', seq_id: id }
]
)
Expand Down Expand Up @@ -252,4 +259,25 @@ def contain_configuration(*array)
)
end
end

context 'MessageBatch' do
let(:event) { described_class::MessageBatch.new(events) }

let(:events) { [described_class::AppClosing.new, described_class::AppHeartbeat.new] }

it do
is_expected.to eq(
[
{
request_type: 'app-closing',
payload: {}
},
{
request_type: 'app-heartbeat',
payload: {}
}
]
)
end
end
end
Loading

0 comments on commit 8081b9d

Please sign in to comment.