Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDTEST-409] Send telemetry events in batches #3749

Merged
merged 8 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions lib/datadog/core/telemetry/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,28 @@ def type
'distributions'
end
end

# Telemetry class for the 'message-batch' event
class MessageBatch
attr_reader :events

def type
'message-batch'
end

def initialize(events)
@events = events
end

def payload(seq_id)
@events.map do |event|
{
request_type: event.type,
payload: event.payload(seq_id),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it OK that seq id is the same for all of the events?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question, I need to look it up in the spec.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec says nothing about it and also it does not mention that seq_id should be part of telemetry events payloads at all (it is supposed to be part of common headers)! I will investigate why our payload generation code even depends on seq_id...

Copy link
Member Author

@anmarchenko anmarchenko Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found where this seq_id is defined in specs: https://github.com/DataDog/instrumentation-telemetry-api-docs/blob/main/GeneratedDocumentation/ApiDocs/v2/SchemaDocumentation/Schemas/conf_key_value.md

This field should incremented each time a new configuration key-value pair is applied in order to track the > active set of configurations. It can be a global value across all configurations keys or local to each key..

This seq_id is in fact different from the seq_id that is used for events:
https://github.com/DataDog/instrumentation-telemetry-api-docs/blob/main/GeneratedDocumentation/ApiDocs/v2/development.md#required-top-level-json-keys

Counter that should be auto incremented every time an API call is being made (our pipeline may not always work in a sequential order.This field will help us identify situations where some messages were being lost. seq_id should start at 1 so that the go backend can detect between non set fields.

I will see how to decouple them from each other in our implementation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solved this by introducing another sequence in Core::Telemetry::Event class for the configuration changes

}
end
end
end
end
end
end
Expand Down
8 changes: 3 additions & 5 deletions lib/datadog/core/telemetry/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,17 @@ def perform(*events)
end

def flush_events(events)
return if events.nil?
return if events.nil? || events.empty?
return if !enabled? || !sent_started_event?

Datadog.logger.debug { "Sending #{events&.count} telemetry events" }
events.each do |event|
send_event(event)
end
send_event(Event::MessageBatch.new(events))
end

def heartbeat!
return if !enabled? || !sent_started_event?

send_event(Event::AppHeartbeat.new)
enqueue(Event::AppHeartbeat.new)
end

def started!
Expand Down
9 changes: 8 additions & 1 deletion sig/datadog/core/telemetry/event.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module Datadog
module Telemetry
class Event
class Base
def payload: (int seq_id) -> Hash[Symbol, untyped]
def payload: (int seq_id) -> (Hash[Symbol, untyped] | Array[Hash[Symbol, untyped]])
def type: -> String?
end

Expand Down Expand Up @@ -65,6 +65,13 @@ module Datadog

class Distributions < GenerateMetrics
end

class MessageBatch < Base
attr_reader events: Array[Datadog::Core::Telemetry::Event::Base]
@events: Array[Datadog::Core::Telemetry::Event::Base]

def initialize: (Array[Datadog::Core::Telemetry::Event::Base] events) -> void
end
end
end
end
Expand Down
21 changes: 21 additions & 0 deletions spec/datadog/core/telemetry/event_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -252,4 +252,25 @@ def contain_configuration(*array)
)
end
end

context 'MessageBatch' do
let(:event) { described_class::MessageBatch.new(events) }

let(:events) { [described_class::AppClosing.new, described_class::AppHeartbeat.new] }

it do
is_expected.to eq(
[
{
request_type: 'app-closing',
payload: {}
},
{
request_type: 'app-heartbeat',
payload: {}
}
]
)
end
end
end
30 changes: 21 additions & 9 deletions spec/datadog/core/telemetry/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
response
end

allow(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do
@received_heartbeat = true
allow(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::MessageBatch)) do |event|
subevent = event.events.first

@received_heartbeat = true if subevent.is_a?(Datadog::Core::Telemetry::Event::AppHeartbeat)

response
end
Expand Down Expand Up @@ -106,7 +108,7 @@

it 'always sends heartbeat event after started event' do
sent_hearbeat = false
allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do
allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::MessageBatch)) do
# app-started was already sent by now
expect(worker.sent_started_event?).to be(true)

Expand Down Expand Up @@ -138,7 +140,7 @@
end

sent_hearbeat = false
allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do
allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::MessageBatch)) do
# app-started was already sent by now
expect(@received_started).to be(true)

Expand Down Expand Up @@ -222,15 +224,20 @@
context 'several workers running' do
it 'sends single started event' do
started_events = 0
mutex = Mutex.new
allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppStarted)) do
started_events += 1

response
end

heartbeat_events = 0
allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do
heartbeat_events += 1
allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::MessageBatch)) do |event|
event.events.each do |subevent|
mutex.synchronize do
heartbeat_events += 1 if subevent.is_a?(Datadog::Core::Telemetry::Event::AppHeartbeat)
end
end

response
end
Expand Down Expand Up @@ -284,10 +291,15 @@
describe '#enqueue' do
it 'adds events to the buffer and flushes them later' do
events_received = 0
mutex = Mutex.new
allow(emitter).to receive(:request).with(
an_instance_of(Datadog::Core::Telemetry::Event::AppIntegrationsChange)
) do
events_received += 1
an_instance_of(Datadog::Core::Telemetry::Event::MessageBatch)
) do |event|
event.events.each do |subevent|
mutex.synchronize do
events_received += 1 if subevent.is_a?(Datadog::Core::Telemetry::Event::AppIntegrationsChange)
end
end

response
end
Expand Down
Loading