Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,9 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
SPAN_KINDS.computeIfAbsent(
spanKind, UTF8BytesString::create), // save repeated utf8 conversions
getPeerTags(span, spanKind.toString()));
boolean isNewKey = false;
MetricKey key = keys.putIfAbsent(newKey, newKey);
if (null == key) {
key = newKey;
isNewKey = true;
}
long tag = (span.getError() > 0 ? ERROR_TAG : 0L) | (isTopLevel ? TOP_LEVEL_TAG : 0L);
long durationNanos = span.getDurationNano();
Expand All @@ -340,7 +338,6 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
}
// recycle the older key
key = batch.getKey();
isNewKey = false;
}
batch = newBatch(key);
batch.add(tag, durationNanos);
Expand All @@ -349,8 +346,8 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
pending.put(key, batch);
// must offer to the queue after adding to pending
inbox.offer(batch);
// force keep keys we haven't seen before or errors
return isNewKey || span.getError() > 0;
// force keep keys if there are errors
return span.getError() > 0;
}

private List<UTF8BytesString> getPeerTags(CoreSpan<?> span, String spanKind) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,45 +661,6 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
aggregator.close()
}

def "aggregator should force keep the first of each key it sees"() {
setup:
int maxAggregates = 10
MetricWriter writer = Mock(MetricWriter)
Sink sink = Stub(Sink)
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS)
long duration = 100
aggregator.start()

when:
def overrides = new boolean[10]
for (int i = 0; i < 5; ++i) {
overrides[i] = aggregator.publish([
new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, duration, HTTP_OK)
])
}
for (int i = 0; i < 5; ++i) {
overrides[i + 5] = aggregator.publish([
new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, duration, HTTP_OK)
])
}

then: "override only the first of each point in the interval"
for (int i = 0; i < 5; ++i) {
assert overrides[i]
}
// these were all repeats, so should be ignored
for (int i = 5; i < 10; ++i) {
assert !overrides[i]
}

cleanup:
aggregator.close()
}

def "should be resilient to serialization errors"() {
setup:
int maxAggregates = 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ class MetricsReliabilityTest extends DDCoreSpecification {
then: "should have sent statistics and informed the agent that we calculate the stats"
assert state.receivedClientComputedHeader
assert state.receivedStats
// 1 trace processed. not a p0 drop (first time we see it). No errors
assertMetrics(healthMetrics, 1, 0, 1, 0, 0)
// 1 trace processed. 1 p0 drop No errors
assertMetrics(healthMetrics, 1, 1, 1, 0, 0)


when: "simulate an agent downgrade"
Expand All @@ -95,8 +95,8 @@ class MetricsReliabilityTest extends DDCoreSpecification {
then: "a discovery should have done - we do not support anymore stats calculation"
state.latch.await()
assert !featuresDiscovery.supportsMetrics()
// 2 traces processed. 1 p0 dropped. 2 requests and 1 downgrade no errors
assertMetrics(healthMetrics, 2, 1, 2, 0, 1)
// 2 traces processed. 2 p0 dropped. 2 requests and 1 downgrade no errors
assertMetrics(healthMetrics, 2, 2, 2, 0, 1)


when: "a span is published"
Expand All @@ -109,7 +109,7 @@ class MetricsReliabilityTest extends DDCoreSpecification {
assert !state.receivedClientComputedHeader
assert !state.receivedStats
// 2 traces processed. 1 p0 dropped. 2 requests and 1 downgrade no errors
assertMetrics(healthMetrics, 2, 1, 2, 0, 1)
assertMetrics(healthMetrics, 2, 2, 2, 0, 1)

when: "we detect that the agent can calculate the stats again"
state.reset(true)
Expand All @@ -128,7 +128,7 @@ class MetricsReliabilityTest extends DDCoreSpecification {
assert state.receivedClientComputedHeader
assert state.receivedStats
// 3 traces processed. 2 p0 dropped. 3 requests and 1 downgrade no errors
assertMetrics(healthMetrics, 3, 2, 3, 0, 1)
assertMetrics(healthMetrics, 3, 3, 3, 0, 1)

when: "an error occurred on the agent stats endpoint"
state.reset(true, 500)
Expand All @@ -140,7 +140,7 @@ class MetricsReliabilityTest extends DDCoreSpecification {
assert state.receivedClientComputedHeader
assert state.receivedStats
// 4 traces processed. 3 p0 dropped. 4 requests and 1 downgrade - 1 error
assertMetrics(healthMetrics, 4, 3, 4, 1, 1)
assertMetrics(healthMetrics, 4, 4, 4, 1, 1)

when: "the next call succeed"
state.reset(true)
Expand All @@ -153,7 +153,7 @@ class MetricsReliabilityTest extends DDCoreSpecification {
assert state.receivedStats
// 5 traces processed. 3 p0 dropped (this one is errored so it's not dropped).
// 5 requests and 1 downgrade - 1 error
assertMetrics(healthMetrics, 5, 3, 5, 1, 1)
assertMetrics(healthMetrics, 5, 4, 5, 1, 1)

cleanup:
tracer.close()
Expand Down
Loading