44
55import datadog .trace .common .metrics .SignalItem .StopSignal ;
66import datadog .trace .core .util .LRUCache ;
7- import java .util .HashSet ;
7+ import java .util .Iterator ;
88import java .util .Map ;
99import java .util .Queue ;
1010import java .util .Set ;
@@ -121,20 +121,12 @@ public void accept(InboxItem item) {
121121 signal .ignore ();
122122 }
123123 } else if (item instanceof Batch && !stopped ) {
124- final Batch batch = (Batch ) item ;
124+ Batch batch = (Batch ) item ;
125125 MetricKey key = batch .getKey ();
126126 // important that it is still *this* batch pending, must not remove otherwise
127127 pending .remove (key , batch );
128- // operations concerning the aggregates should be atomic not to potentially loose points.
129- aggregates .compute (
130- key ,
131- (k , v ) -> {
132- if (v == null ) {
133- v = new AggregateMetric ();
134- }
135- batch .contributeTo (v );
136- return v ;
137- });
128+ AggregateMetric aggregate = aggregates .computeIfAbsent (key , k -> new AggregateMetric ());
129+ batch .contributeTo (aggregate );
138130 dirty = true ;
139131 // return the batch for reuse
140132 batchPool .offer (batch );
@@ -146,20 +138,13 @@ private void report(long when, SignalItem signal) {
146138 boolean skipped = true ;
147139 if (dirty ) {
148140 try {
149- final Set < MetricKey > validKeys = expungeStaleAggregates ();
150- if (!validKeys .isEmpty ()) {
141+ expungeStaleAggregates ();
142+ if (!aggregates .isEmpty ()) {
151143 skipped = false ;
152- writer .startBucket (validKeys .size (), when , reportingIntervalNanos );
153- for (MetricKey key : validKeys ) {
154- // operations concerning the aggregates should be atomic not to potentially loose
155- // points.
156- aggregates .computeIfPresent (
157- key ,
158- (k , v ) -> {
159- writer .add (k , v );
160- v .clear ();
161- return v ;
162- });
144+ writer .startBucket (aggregates .size (), when , reportingIntervalNanos );
145+ for (Map .Entry <MetricKey , AggregateMetric > aggregate : aggregates .entrySet ()) {
146+ writer .add (aggregate .getKey (), aggregate .getValue ());
147+ aggregate .getValue ().clear ();
163148 }
164149 // note that this may do IO and block
165150 writer .finishBucket ();
@@ -176,27 +161,16 @@ private void report(long when, SignalItem signal) {
176161 }
177162 }
178163
179- /**
180- * Remove keys whose values have zeroed metrics.
181- *
182- * @return a set containing the keys still valid.
183- */
184- private Set <MetricKey > expungeStaleAggregates () {
185- final HashSet <MetricKey > ret = new HashSet <>();
186- for (MetricKey metricKey : new HashSet <>(aggregates .keySet ())) {
187- // operations concerning the aggregates should be atomic not to potentially loose points.
188- aggregates .computeIfPresent (
189- metricKey ,
190- (k , v ) -> {
191- if (v .getHitCount () == 0 ) {
192- commonKeys .remove (k );
193- return null ;
194- }
195- ret .add (k );
196- return v ;
197- });
164+ private void expungeStaleAggregates () {
165+ Iterator <Map .Entry <MetricKey , AggregateMetric >> it = aggregates .entrySet ().iterator ();
166+ while (it .hasNext ()) {
167+ Map .Entry <MetricKey , AggregateMetric > pair = it .next ();
168+ AggregateMetric metric = pair .getValue ();
169+ if (metric .getHitCount () == 0 ) {
170+ it .remove ();
171+ commonKeys .remove (pair .getKey ());
172+ }
198173 }
199- return ret ;
200174 }
201175
202176 private long wallClockTime () {
0 commit comments