Skip to content

Commit

Permalink
aggregator add impl
Browse files Browse the repository at this point in the history
  • Loading branch information
sl0thentr0py committed Feb 21, 2024
1 parent eb3e7a4 commit ac951a0
Showing 1 changed file with 37 additions and 7 deletions.
44 changes: 37 additions & 7 deletions sentry-ruby/lib/sentry/metrics/aggregator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ class Aggregator
FLUSH_INTERVAL = 5
ROLLUP_IN_SECONDS = 10

METRIC_TYPES = {
c: CounterMetric,
d: DistributionMetric,
g: GaugeMetric,
s: SetMetric
}

def initialize(configuration, client)
@client = client
@logger = configuration.logger
Expand All @@ -16,8 +23,13 @@ def initialize(configuration, client)

@thread = nil
@exited = false
@mutex = Mutex.new

# buckets are a nested hash of timestamp -> bucket keys -> Metric instance
@buckets = {}

# the flush interval needs to be shifted once per startup to create jittering
@flush_shift = Random.rand * ROLLUP_IN_SECONDS
end

def add(type,
Expand All @@ -34,19 +46,30 @@ def add(type,
# this is integer division and thus takes the floor of the division
# and buckets into 10 second intervals
bucket_timestamp = (timestamp / ROLLUP_IN_SECONDS) * ROLLUP_IN_SECONDS

serialized_tags = serialize_tags(tags)
bucket_key = [type, key, unit, serialized_tags]

# TODO lock and add to bucket
42
@mutex.synchronize do
@buckets[bucket_timestamp] ||= {}

if @buckets[bucket_timestamp][bucket_key]
@buckets[bucket_timestamp][bucket_key].add(value)
else
@buckets[bucket_timestamp][bucket_key] = METRIC_TYPES[type].new(value)
end
end
end

def flush
# TODO
@mutex.synchronize do
log_debug("[Metrics::Aggregator] current bucket state: #{@buckets}")
# TODO
end
end

def kill
log_debug("[Metrics::Aggregator] killing thread")
log_debug('[Metrics::Aggregator] killing thread')

@exited = true
@thread&.kill
Expand All @@ -68,15 +91,22 @@ def ensure_thread

true
rescue ThreadError
log_debug("[Metrics::Aggregator] thread creation failed")
log_debug('[Metrics::Aggregator] thread creation failed')
@exited = true
false
end

def serialize_tags(tags)
# TODO support array tags
return [] unless tags
tags.map { |k, v| [k.to_s, v.to_s] }

# important to sort for key consistency
tags.map do |k, v|
if v.is_a?(Array)
v.map { |x| [k.to_s, x.to_s] }
else
[k.to_s, v.to_s]
end
end.flatten.sort
end
end
end
Expand Down

0 comments on commit ac951a0

Please sign in to comment.