Skip to content

Commit fe3046f

Browse files
authored
Avoid race conditions between contributing and writing to a stat aggregate (#9473)
* Avoid race conditions between contributing and writing to a stat aggregate * Apply suggestions * improve comments
1 parent 67f94ae commit fe3046f

File tree

1 file changed

+45
-19
lines changed

1 file changed

+45
-19
lines changed

dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import datadog.trace.common.metrics.SignalItem.StopSignal;
66
import datadog.trace.core.util.LRUCache;
7-
import java.util.Iterator;
7+
import java.util.HashSet;
88
import java.util.Map;
99
import java.util.Queue;
1010
import java.util.Set;
@@ -121,12 +121,20 @@ public void accept(InboxItem item) {
121121
signal.ignore();
122122
}
123123
} else if (item instanceof Batch && !stopped) {
124-
Batch batch = (Batch) item;
124+
final Batch batch = (Batch) item;
125125
MetricKey key = batch.getKey();
126126
// important that it is still *this* batch pending, must not remove otherwise
127127
pending.remove(key, batch);
128-
AggregateMetric aggregate = aggregates.computeIfAbsent(key, k -> new AggregateMetric());
129-
batch.contributeTo(aggregate);
128+
// operations concerning the aggregates should be atomic not to potentially loose points.
129+
aggregates.compute(
130+
key,
131+
(k, v) -> {
132+
if (v == null) {
133+
v = new AggregateMetric();
134+
}
135+
batch.contributeTo(v);
136+
return v;
137+
});
130138
dirty = true;
131139
// return the batch for reuse
132140
batchPool.offer(batch);
@@ -138,13 +146,20 @@ private void report(long when, SignalItem signal) {
138146
boolean skipped = true;
139147
if (dirty) {
140148
try {
141-
expungeStaleAggregates();
142-
if (!aggregates.isEmpty()) {
149+
final Set<MetricKey> validKeys = expungeStaleAggregates();
150+
if (!validKeys.isEmpty()) {
143151
skipped = false;
144-
writer.startBucket(aggregates.size(), when, reportingIntervalNanos);
145-
for (Map.Entry<MetricKey, AggregateMetric> aggregate : aggregates.entrySet()) {
146-
writer.add(aggregate.getKey(), aggregate.getValue());
147-
aggregate.getValue().clear();
152+
writer.startBucket(validKeys.size(), when, reportingIntervalNanos);
153+
for (MetricKey key : validKeys) {
154+
// operations concerning the aggregates should be atomic not to potentially loose
155+
// points.
156+
aggregates.computeIfPresent(
157+
key,
158+
(k, v) -> {
159+
writer.add(k, v);
160+
v.clear();
161+
return v;
162+
});
148163
}
149164
// note that this may do IO and block
150165
writer.finishBucket();
@@ -161,16 +176,27 @@ private void report(long when, SignalItem signal) {
161176
}
162177
}
163178

164-
private void expungeStaleAggregates() {
165-
Iterator<Map.Entry<MetricKey, AggregateMetric>> it = aggregates.entrySet().iterator();
166-
while (it.hasNext()) {
167-
Map.Entry<MetricKey, AggregateMetric> pair = it.next();
168-
AggregateMetric metric = pair.getValue();
169-
if (metric.getHitCount() == 0) {
170-
it.remove();
171-
commonKeys.remove(pair.getKey());
172-
}
179+
/**
180+
* Remove keys whose values have zeroed metrics.
181+
*
182+
* @return a set containing the keys still valid.
183+
*/
184+
private Set<MetricKey> expungeStaleAggregates() {
185+
final HashSet<MetricKey> ret = new HashSet<>();
186+
for (MetricKey metricKey : new HashSet<>(aggregates.keySet())) {
187+
// operations concerning the aggregates should be atomic not to potentially loose points.
188+
aggregates.computeIfPresent(
189+
metricKey,
190+
(k, v) -> {
191+
if (v.getHitCount() == 0) {
192+
commonKeys.remove(k);
193+
return null;
194+
}
195+
ret.add(k);
196+
return v;
197+
});
173198
}
199+
return ret;
174200
}
175201

176202
private long wallClockTime() {

0 commit comments

Comments
 (0)