Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Metrics API and aggregator #2247

Merged
merged 17 commits into from
Mar 12, 2024
32 changes: 32 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,38 @@
- Add support for distributed tracing in `sentry-delayed_job` [#2233](https://github.com/getsentry/sentry-ruby/pull/2233)
- Fix warning about default gems on Ruby 3.3.0 ([#2225](https://github.com/getsentry/sentry-ruby/pull/2225))
- Add `hint:` support to `Sentry::Rails::ErrorSubscriber` [#2235](https://github.com/getsentry/sentry-ruby/pull/2235)
- Add [Metrics](https://docs.sentry.io/product/metrics/) support
- Add main APIs and `Aggregator` thread [#2247](https://github.com/getsentry/sentry-ruby/pull/2247)

The SDK now supports recording and aggregating metrics. A new thread will be started
for aggregation and will flush the pending data to Sentry every 5 seconds.

To enable this behavior, use:

```ruby
Sentry.init do |config|
# ...
config.metrics.enabled = true
end
```

And then in your application code, collect metrics as follows:

```ruby
# increment a simple counter
Sentry::Metrics.increment('button_click')
# set a value, unit and tags
Sentry::Metrics.increment('time', 5, unit: 'second', tags: { browser:' firefox' })

# distribution - get statistical aggregates from an array of observations
Sentry::Metrics.distribution('page_load', 15.0, unit: 'millisecond')

# gauge - record statistical aggregates directly on the SDK, more space efficient
Sentry::Metrics.gauge('page_load', 15.0, unit: 'millisecond')

# set - get unique counts of elements
Sentry::Metrics.set('user_view', 'jane')
```

### Bug Fixes

Expand Down
12 changes: 12 additions & 0 deletions sentry-ruby/lib/sentry-ruby.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
require "sentry/session_flusher"
require "sentry/backpressure_monitor"
require "sentry/cron/monitor_check_ins"
require "sentry/metrics"

[
"sentry/rake",
Expand Down Expand Up @@ -77,6 +78,10 @@ def exception_locals_tp
# @return [BackpressureMonitor, nil]
attr_reader :backpressure_monitor

# @!attribute [r] metrics_aggregator
# @return [Metrics::Aggregator, nil]
attr_reader :metrics_aggregator

##### Patch Registration #####

# @!visibility private
Expand Down Expand Up @@ -224,6 +229,7 @@ def init(&block)
@background_worker = Sentry::BackgroundWorker.new(config)
@session_flusher = config.session_tracking? ? Sentry::SessionFlusher.new(config, client) : nil
@backpressure_monitor = config.enable_backpressure_handling ? Sentry::BackpressureMonitor.new(config, client) : nil
@metrics_aggregator = config.metrics.enabled ? Sentry::Metrics::Aggregator.new(config, client) : nil
exception_locals_tp.enable if config.include_local_variables
at_exit { close }
end
Expand All @@ -244,6 +250,12 @@ def close
@backpressure_monitor = nil
end

if @metrics_aggregator
@metrics_aggregator.flush(force: true)
@metrics_aggregator.kill
@metrics_aggregator = nil
end

if client = get_current_client
client.transport.flush

Expand Down
6 changes: 6 additions & 0 deletions sentry-ruby/lib/sentry/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require "sentry/release_detector"
require "sentry/transport/configuration"
require "sentry/cron/configuration"
require "sentry/metrics/configuration"
require "sentry/linecache"
require "sentry/interfaces/stacktrace_builder"

Expand Down Expand Up @@ -235,6 +236,10 @@ def capture_exception_frame_locals=(value)
# @return [Cron::Configuration]
attr_reader :cron

# Metrics related configuration.
# @return [Metrics::Configuration]
attr_reader :metrics

# Take a float between 0.0 and 1.0 as the sample rate for tracing events (transactions).
# @return [Float, nil]
attr_reader :traces_sample_rate
Expand Down Expand Up @@ -386,6 +391,7 @@ def initialize

@transport = Transport::Configuration.new
@cron = Cron::Configuration.new
@metrics = Metrics::Configuration.new
@gem_specs = Hash[Gem::Specification.map { |spec| [spec.name, spec.version.to_s] }] if Gem::Specification.respond_to?(:map)

run_post_initialization_callbacks
Expand Down
11 changes: 6 additions & 5 deletions sentry-ruby/lib/sentry/envelope.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@ class Item
STACKTRACE_FRAME_LIMIT_ON_OVERSIZED_PAYLOAD = 500
MAX_SERIALIZED_PAYLOAD_SIZE = 1024 * 1000

attr_accessor :headers, :payload
attr_accessor :headers, :payload, :is_json

def initialize(headers, payload)
def initialize(headers, payload, is_json: true)
@headers = headers
@payload = payload
@is_json = is_json
end

def type
@headers[:type] || 'event'
end

def to_s
[JSON.generate(@headers), JSON.generate(@payload)].join("\n")
[JSON.generate(@headers), @is_json ? JSON.generate(@payload) : @payload].join("\n")
sl0thentr0py marked this conversation as resolved.
Show resolved Hide resolved
end

def serialize
Expand Down Expand Up @@ -78,8 +79,8 @@ def initialize(headers = {})
@items = []
end

def add_item(headers, payload)
@items << Item.new(headers, payload)
def add_item(headers, payload, is_json: true)
@items << Item.new(headers, payload, is_json: is_json)
end

def item_types
Expand Down
30 changes: 30 additions & 0 deletions sentry-ruby/lib/sentry/metrics.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true

require 'sentry/metrics/metric'
require 'sentry/metrics/counter_metric'
require 'sentry/metrics/distribution_metric'
require 'sentry/metrics/gauge_metric'
require 'sentry/metrics/set_metric'
require 'sentry/metrics/aggregator'

module Sentry
module Metrics
class << self
def increment(key, value = 1.0, unit: 'none', tags: {}, timestamp: nil)
Sentry.metrics_aggregator&.add(:c, key, value, unit: unit, tags: tags, timestamp: timestamp)
end

def distribution(key, value, unit: 'none', tags: {}, timestamp: nil)
Sentry.metrics_aggregator&.add(:d, key, value, unit: unit, tags: tags, timestamp: timestamp)
end

def set(key, value, unit: 'none', tags: {}, timestamp: nil)
Sentry.metrics_aggregator&.add(:s, key, value, unit: unit, tags: tags, timestamp: timestamp)
end

def gauge(key, value, unit: 'none', tags: {}, timestamp: nil)
Sentry.metrics_aggregator&.add(:g, key, value, unit: unit, tags: tags, timestamp: timestamp)
end
end
end
end
185 changes: 185 additions & 0 deletions sentry-ruby/lib/sentry/metrics/aggregator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
# frozen_string_literal: true

module Sentry
module Metrics
class Aggregator
include LoggingHelper

FLUSH_INTERVAL = 5
ROLLUP_IN_SECONDS = 10

KEY_SANITIZATION_REGEX = /[^a-zA-Z0-9_\/.-]+/
VALUE_SANITIZATION_REGEX = /[^[[:word:]][[:digit:]][[:space:]]_:\/@\.{}\[\]$-]+/

METRIC_TYPES = {
c: CounterMetric,
d: DistributionMetric,
g: GaugeMetric,
s: SetMetric
}

# exposed only for testing
attr_reader :thread, :buckets, :flush_shift

def initialize(configuration, client)
@client = client
@logger = configuration.logger

@default_tags = {}
@default_tags['release'] = configuration.release if configuration.release
@default_tags['environment'] = configuration.environment if configuration.environment
sl0thentr0py marked this conversation as resolved.
Show resolved Hide resolved

@thread = nil
@exited = false
@mutex = Mutex.new

# buckets are a nested hash of timestamp -> bucket keys -> Metric instance
@buckets = {}

# the flush interval needs to be shifted once per startup to create jittering
@flush_shift = Random.rand * ROLLUP_IN_SECONDS
end

def add(type,
key,
value,
unit: 'none',
tags: {},
timestamp: nil)
return unless ensure_thread
return unless METRIC_TYPES.keys.include?(type)

timestamp = timestamp.to_i if timestamp.is_a?(Time)
timestamp ||= Sentry.utc_now.to_i

# this is integer division and thus takes the floor of the division
# and buckets into 10 second intervals
bucket_timestamp = (timestamp / ROLLUP_IN_SECONDS) * ROLLUP_IN_SECONDS

serialized_tags = serialize_tags(get_updated_tags(tags))
bucket_key = [type, key, unit, serialized_tags]

@mutex.synchronize do
@buckets[bucket_timestamp] ||= {}

if @buckets[bucket_timestamp][bucket_key]
@buckets[bucket_timestamp][bucket_key].add(value)
else
@buckets[bucket_timestamp][bucket_key] = METRIC_TYPES[type].new(value)
end
end
end
sl0thentr0py marked this conversation as resolved.
Show resolved Hide resolved

def flush(force: false)
flushable_buckets = get_flushable_buckets!(force)
return if flushable_buckets.empty?

payload = serialize_buckets(flushable_buckets)
envelope = Envelope.new
envelope.add_item(
{ type: 'statsd', length: payload.bytesize },
payload,
is_json: false
)

Sentry.background_worker.perform do
@client.transport.send_envelope(envelope)
end
end

def kill
log_debug('[Metrics::Aggregator] killing thread')

@exited = true
@thread&.kill
end

private

def ensure_thread
sl0thentr0py marked this conversation as resolved.
Show resolved Hide resolved
return false if @exited
return true if @thread&.alive?

@thread = Thread.new do
loop do
# TODO-neel-metrics use event for force flush later
sleep(FLUSH_INTERVAL)
flush

Check warning on line 107 in sentry-ruby/lib/sentry/metrics/aggregator.rb

View check run for this annotation

Codecov / codecov/patch

sentry-ruby/lib/sentry/metrics/aggregator.rb#L107

Added line #L107 was not covered by tests
end
end

true
rescue ThreadError
log_debug('[Metrics::Aggregator] thread creation failed')
@exited = true
false
end

# important to sort for key consistency
def serialize_tags(tags)
tags.flat_map do |k, v|
if v.is_a?(Array)
v.map { |x| [k.to_s, x.to_s] }
else
[[k.to_s, v.to_s]]
end
end.sort
end

def get_flushable_buckets!(force)
@mutex.synchronize do
flushable_buckets = {}

if force
flushable_buckets = @buckets
@buckets = {}
else
cutoff = Sentry.utc_now.to_i - ROLLUP_IN_SECONDS - @flush_shift
flushable_buckets = @buckets.select { |k, _| k <= cutoff }
@buckets.reject! { |k, _| k <= cutoff }
end

sl0thentr0py marked this conversation as resolved.
Show resolved Hide resolved
flushable_buckets
end
end

# serialize buckets to statsd format
def serialize_buckets(buckets)
buckets.map do |timestamp, timestamp_buckets|
timestamp_buckets.map do |metric_key, metric|
type, key, unit, tags = metric_key
values = metric.serialize.join(':')
sanitized_tags = tags.map { |k, v| "#{sanitize_key(k)}:#{sanitize_value(v)}" }.join(',')

"#{sanitize_key(key)}@#{unit}:#{values}|#{type}|\##{sanitized_tags}|T#{timestamp}"
sl0thentr0py marked this conversation as resolved.
Show resolved Hide resolved
end
end.flatten.join("\n")
end

def sanitize_key(key)
key.gsub(KEY_SANITIZATION_REGEX, '_')
end

def sanitize_value(value)
value.gsub(VALUE_SANITIZATION_REGEX, '')
end

def get_transaction_name
scope = Sentry.get_current_scope
return nil unless scope && scope.transaction_name
return nil if scope.transaction_source_low_quality?

scope.transaction_name
end

def get_updated_tags(tags)
updated_tags = @default_tags.merge(tags)

transaction_name = get_transaction_name
updated_tags['transaction'] = transaction_name if transaction_name

updated_tags
end
end
end
end
17 changes: 17 additions & 0 deletions sentry-ruby/lib/sentry/metrics/configuration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

module Sentry
module Metrics
class Configuration
# Enable metrics usage
# Starts a new {Sentry::Metrics::Aggregator} instance to aggregate metrics
# and a thread to aggregate flush every 5 seconds.
# @return [Boolean]
attr_accessor :enabled

def initialize
@enabled = false
end
end
end
end
Loading
Loading