-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[prometheusremotewriteexporter] reduce allocations in createAttributes #35184
base: main
Are you sure you want to change the base?
[prometheusremotewriteexporter] reduce allocations in createAttributes #35184
Conversation
6a5d2ca
to
c4d233a
Compare
// best to keep it around for the lifetime of the Go process. Due to this shared | ||
// state, PrometheusConverter is NOT thread-safe and is only intended to be used by | ||
// a single go-routine at a time. | ||
// Each FromMetrics call should be followed by a Reset when the metrics can be safely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we emit a warning log or something if someone calls FromMetrics without Resetting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the reset to always be called inside FromMetrics so this is no longer a user concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that the user doesn't need to call reset, should we remove this part of the comment?
// Each FromMetrics call should be followed by a Reset.....
@@ -27,8 +27,7 @@ type Settings struct { | |||
} | |||
|
|||
// FromMetrics converts pmetric.Metrics to Prometheus remote write format. | |||
func FromMetrics(md pmetric.Metrics, settings Settings) (map[string]*prompb.TimeSeries, error) { | |||
c := newPrometheusConverter() | |||
func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings) (map[string]*prompb.TimeSeries, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that users create a struct before calling FromMetrics, we should consider moving any configuration or settings to the creation of the converter to simplify usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense, but now it's in global sync.Pool rather than tied to a specific exporter instance so it needs to be reusable across instances.
We don't plan to keep this forever, right? Ideally we'll be able to shard this to improve throughput, we're just hardcoding this to 1 because OTel's exporter helper doesn't ensure ordering. On the other hand, I agree that we shouldn't block optimizations based on something we want to do in the future 😬. @edma2, knowing that we'll eventually shard the output, any suggestions on how to do this without sacrificing your optimization? |
I wonder also since you can have multiple pipelines with multiple remote write exporters (i.e. sending data from dev cluster to 2 destination, dev and prod) if that would break this too. |
@ArthurSens my initial thought here is maybe wrap things in a
@jmichalek132 Each exporter would have its own instance of |
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
@ArthurSens @dashpole @jmichalek132 I addressed comments and also changed the implementation so it's now in a sync.Pool. This now supports concurrent access from the exporter class in case it ever supports more than 1 worker at a time. Please take a look! |
b337721
to
6f941e3
Compare
Awesome edma! I'm struggling a bit to find time to review this one, just wanted to let you know that this is on my list :) |
open-telemetry#57) createAttributes was allocating a new label slice for every series, which generates mucho garbage (~30-40% of all allocations). Keep around a re-usable underlying array of labels to reduce allocations on the hot path.
6f941e3
to
928529a
Compare
l := c.labelsMap | ||
clear(l) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also clear labelMap
when we call reset()
? I think it would be cleaner if we reset the state in one single place instead, or is there any particular reason to do it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the same reason as https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/35184/files/928529a1cf587e8e5b29bd4880f2c36157eb8194#r1829677356 we want to isolate the contents of this map between calls to createAttributes
, so we do that by clearing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, thank you! Could we add a comment explaining it?
// best to keep it around for the lifetime of the Go process. Due to this shared | ||
// state, PrometheusConverter is NOT thread-safe and is only intended to be used by | ||
// a single go-routine at a time. | ||
// Each FromMetrics call should be followed by a Reset when the metrics can be safely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that the user doesn't need to call reset, should we remove this part of the comment?
// Each FromMetrics call should be followed by a Reset.....
labels = labels[:0] | ||
startIndex := len(c.labels) | ||
for k, v := range l { | ||
labels = append(labels, prompb.Label{Name: k, Value: v}) | ||
c.labels = append(c.labels, prompb.Label{Name: k, Value: v}) | ||
} | ||
|
||
return labels | ||
return c.labels[startIndex:] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any possibility that len(c.labels)
is not 0 here? It's reset every time we call FromMetrics and I couldn't find any other place in the code where we write to this array, so why not just return c.labels
and not worry about startIndex? I might be missing something but it feels like we're overcomplicating things here
for k, v := range l {
c.labels = append(c.labels, prompb.Label{Name: k, Value: v})
}
return c.labels
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
startIndex
is important for keeping the returned slices isolated from each other while sharing the same underlying array within a single FromMetrics
call. It is 0 only for the first series of a batch.
Here is how it works: FromMetrics
is called once per batch, and createAttributes
for every series within the batch. We want to re-use the backing array of the labels
slice for all series within a single batch. We do that by appending the labels of each series to the end of the slice. Finally we return only starting from startIndex
so the caller doesn't see labels from other series (while reusing the same backing array which naturally grows up to the size needed to fit a single FromMetrics
call).
For example, if X1...X4 are labels from series X and Y1...Y3 are labels from series Y, then the backing array of c.labels
will look like [X1, X2, X3, X4, Y1, Y2, Y3]
after calling createAttributes
twice (this is a simplification as the backing array will probably have excess capacity from resizing or previous calls). Meanwhile, the first call to createAttributes
will have returned [X1, X2, X3, X4]
and the second call returned [Y1, Y2, Y3]
. On the next FromMetrics
call the index is reset to 0 and we can re-use the entire array with zero allocations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect, thank you for the explanation :)
Now what I'm thinking is if we have tests that assure concurrency works. Mostly to make sure we don't break the non-thread-safe promise by accident
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, thanks for the patience and for kindly explaining everything here :)
Regarding all the code so far, it LGTM! Let's just add an extra test that certifies we're not calling FromMetrics concurrently in other areas of the codebase. This test will probably live in the exporters that uses the translator pkg, e.g. prometheusremotewriteexporter
Maybe something like this(pseudo-code):
func generateMetric(i int) pmetric.Metrics {
m := GenerateMetricsOneMetric()
m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().AppendEmpty().SetIntValue(i)
return m
}
func TestPushMetricsConcurrency(t testint.T) {
// Create mock where prwexporter sends metric to
// Create prwexporter
n := 1000
metrics := make([]pmetric.Metrics, 0, n)
for i<n {
metrics = append(metrics, generateMetric(i)
}
var wg sync.WaitGroup
wg.Add(n)
for range metrics {
go func() {
prwexporter.PushMetrics(background, metrics[i])
wg.Done()
}()
}
wg.Wait()
//assert Metrics arrived in the mock.
}
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: 'breaking' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's an enhancement, right? Not a breaking change
Description:
While profiling the collector, we found that the
createAttributes
function was responsible for a significant chunk of allocations (30-40%) which was leading to a high CPU usage spent in GC.createAttributes
is responsible for converting attributes of a given data point to Prometheus labels. For simplicity, it allocates a new labels slice for every data point. We found that reducing allocations here significantly reduced GC time in our environment (in some deployments as much as ~50%).The strategy in this PR is to reuse the slice array as much as possible. The backing array will automatically resize as needed (batching with a batch processor will effectively set an upper bound). Note: we don't need to synchronize access to this (e.g.
sync.Pool
) since the exporter is configured with 1 consumer.Link to tracking Issue:
Testing:
Modified unit tests and ran benchmarks locally.
Works in our production environment.
benchstat output
Documentation: