-
Notifications
You must be signed in to change notification settings - Fork 134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[POC] Client side aggregation. #139
Conversation
c320bd6
to
c1804f4
Compare
4c0ea20
to
436113a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great POC! A few nits.
|
||
func (a *aggregator) count(name string, value int64, tags []string, rate float64) error { | ||
context := name + strings.Join(tags, "") | ||
a.countsM.RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider thread 1 after executing the instruction at line 102 and thread 2 at line 79
Thread 1 takes the lock and run until line 105, execute a.countsM.RUnlock()
and stop (before executing count.sample(value)
)
As lock was released, thread 2 can run until line 97.
Thread 1 execute count.sample(value)
. It is increment the value in the map counts
(counts := a.counts
) but as values were already flushed (metrics = append(metrics, c.flushUnsafe())
) the increment count.sample(value)
is not take into account.
Maybe this is the reason of the name flushUnsafe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, we should not release the read lock until we sampled the metric. This should be fine as we won't slow down the sampling.
6d11097
to
0260f5c
Compare
436113a
to
8256491
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
} | ||
|
||
func (s *setMetric) sample(v string) { | ||
s.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As sample
is used inside a block a.countsM.RLock()
/ a.countsM.RUnlock()
, s.Lock()
is not required anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is to avoid concurrent write to the map, no ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well-written PR!
This should reduce by a lot the number of packets (and therefore the number of packets drop) between the client and the Agent. This should also improve performances in hot path when sampling the same metrics. Using fnv1a hash for the aggregator gives us a average ~12% improvement on the aggregation benchmarks.
ce91945
to
0b67271
Compare
First, early try of client side aggregation.
For now this looks promising. We're testing both best case scenarion (1 metric sampled a lot) and worst case scenario (a lot of metrics sampled only once).