Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/build-common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,15 @@ jobs:

- name: Test
run: ./gradlew ${{ matrix.module }}:smokeTest

- name: Create unique artifact name
if: failure()
run: |
echo "UPLOAD_ARTIFACT_NAME=${{ matrix.module }}" | sed 's/:/-/g' >> $GITHUB_ENV

- name: Upload smoke test reports
uses: actions/upload-artifact@v4
if: failure()
with:
name: ${{ env.UPLOAD_ARTIFACT_NAME }}
path: '**/build/reports/tests/smokeTest/**/*'
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,16 @@
import com.azure.monitor.opentelemetry.autoconfigure.implementation.logging.OperationLogger;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.models.TelemetryItem;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.QuickPulse;
import com.microsoft.applicationinsights.agent.internal.configuration.Configuration.SamplingOverride;
import com.microsoft.applicationinsights.agent.internal.sampling.AiFixedPercentageSampler;
import com.microsoft.applicationinsights.agent.internal.sampling.SamplingOverrides;
import com.microsoft.applicationinsights.agent.internal.telemetry.BatchItemProcessor;
import com.microsoft.applicationinsights.agent.internal.telemetry.TelemetryClient;
import com.microsoft.applicationinsights.agent.internal.telemetry.TelemetryObservers;
import io.opentelemetry.api.logs.LoggerProvider;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.javaagent.bootstrap.CallDepth;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import io.opentelemetry.sdk.trace.samplers.SamplingDecision;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
import io.opentelemetry.semconv.ExceptionAttributes;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
Expand All @@ -39,24 +32,13 @@ public class AgentLogExporter implements LogRecordExporter {
private static final OperationLogger exportingLogLogger =
new OperationLogger(AgentLogExporter.class, "Exporting log");

// TODO (trask) could implement this in a filtering LogExporter instead
private volatile int severityThreshold;

private final SamplingOverrides logSamplingOverrides;
private final SamplingOverrides exceptionSamplingOverrides;
private final LogDataMapper mapper;
private final Consumer<TelemetryItem> telemetryItemConsumer;

public AgentLogExporter(
int severityThreshold,
List<SamplingOverride> logSamplingOverrides,
List<SamplingOverride> exceptionSamplingOverrides,
LogDataMapper mapper,
@Nullable QuickPulse quickPulse,
BatchItemProcessor batchItemProcessor) {
this.severityThreshold = severityThreshold;
this.logSamplingOverrides = new SamplingOverrides(logSamplingOverrides);
this.exceptionSamplingOverrides = new SamplingOverrides(exceptionSamplingOverrides);
this.mapper = mapper;
telemetryItemConsumer =
telemetryItem -> {
Expand All @@ -70,10 +52,6 @@ public AgentLogExporter(
};
}

public void setSeverityThreshold(int severityThreshold) {
this.severityThreshold = severityThreshold;
}

@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
// incrementing CallDepth for LoggerProvider causes the OpenTelemetry Java agent logging
Expand Down Expand Up @@ -107,47 +85,12 @@ private CompletableResultCode internalExport(Collection<LogRecordData> logs) {

private void internalExport(LogRecordData log) {
try {
int severityNumber = log.getSeverity().getSeverityNumber();
if (severityNumber < severityThreshold) {
return;
}
logger.debug("exporting log: {}", log);

String stack = log.getAttributes().get(ExceptionAttributes.EXCEPTION_STACKTRACE);
Double sampleRate = log.getAttributes().get(AiSemanticAttributes.SAMPLE_RATE);

SamplingOverrides samplingOverrides =
stack != null ? exceptionSamplingOverrides : logSamplingOverrides;

SpanContext spanContext = log.getSpanContext();
Double parentSpanSampleRate = log.getAttributes().get(AiSemanticAttributes.SAMPLE_RATE);

AiFixedPercentageSampler sampler = samplingOverrides.getOverride(log.getAttributes());

boolean hasSamplingOverride = sampler != null;

if (!hasSamplingOverride
&& spanContext.isValid()
&& !spanContext.getTraceFlags().isSampled()) {
// if there is no sampling override, and the log is part of an unsampled trace,
// then don't capture it
return;
}

Double sampleRate = null;
if (hasSamplingOverride) {
SamplingResult samplingResult = sampler.shouldSampleLog(spanContext, parentSpanSampleRate);
if (samplingResult.getDecision() != SamplingDecision.RECORD_AND_SAMPLE) {
return;
}
sampleRate = samplingResult.getAttributes().get(AiSemanticAttributes.SAMPLE_RATE);
}

if (sampleRate == null) {
sampleRate = parentSpanSampleRate;
}

logger.debug("exporting log: {}", log);

// TODO (trask) no longer need to check AiSemanticAttributes.SAMPLE_RATE in map() method
// TODO (trask) get stack and sampleRate inside map() method instead of passing into
TelemetryItem telemetryItem = mapper.map(log, stack, sampleRate);
telemetryItemConsumer.accept(telemetryItem);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.microsoft.applicationinsights.agent.internal.init;

import com.azure.core.util.logging.ClientLogger;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.AiSemanticAttributes;
import com.microsoft.applicationinsights.agent.internal.configuration.Configuration;
import com.microsoft.applicationinsights.agent.internal.sampling.AiFixedPercentageSampler;
import com.microsoft.applicationinsights.agent.internal.sampling.SamplingOverrides;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.LocalRootSpan;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.AttributesMap;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
import io.opentelemetry.sdk.logs.ReadWriteLogRecord;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.samplers.SamplingDecision;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
import io.opentelemetry.semconv.ExceptionAttributes;
import java.lang.reflect.Field;
import java.util.List;
import javax.annotation.Nullable;

public class AzureMonitorLogFilteringProcessor implements LogRecordProcessor {

private static final ClientLogger logger = new ClientLogger(AzureMonitorLogProcessor.class);
private static final Field lockField;
private static final Field attributesMapField;

static {
Class<?> sdkReadWriteLogRecordClass = getSdkReadWriteLogRecordClass();
lockField = getLockField(sdkReadWriteLogRecordClass);
attributesMapField = getAttributesMapField(sdkReadWriteLogRecordClass);
}

private final SamplingOverrides logSamplingOverrides;
private final SamplingOverrides exceptionSamplingOverrides;
private final LogRecordProcessor batchLogRecordProcessor;

private volatile int severityThreshold;

public AzureMonitorLogFilteringProcessor(
List<Configuration.SamplingOverride> logSamplingOverrides,
List<Configuration.SamplingOverride> exceptionSamplingOverrides,
LogRecordProcessor batchLogRecordProcessor,
int severityThreshold) {

this.severityThreshold = severityThreshold;
this.logSamplingOverrides = new SamplingOverrides(logSamplingOverrides);
this.exceptionSamplingOverrides = new SamplingOverrides(exceptionSamplingOverrides);
this.batchLogRecordProcessor = batchLogRecordProcessor;
this.severityThreshold = severityThreshold;
}

public void setSeverityThreshold(int severityThreshold) {
this.severityThreshold = severityThreshold;
}

@Override
public void onEmit(Context context, ReadWriteLogRecord logRecord) {

int severityNumber = logRecord.getSeverity().getSeverityNumber();
if (severityNumber < severityThreshold) {
// quick return
return;
}

Double parentSpanSampleRate = null;
Span currentSpan = Span.fromContext(context);
if (currentSpan instanceof ReadableSpan) {
ReadableSpan readableSpan = (ReadableSpan) currentSpan;
parentSpanSampleRate = readableSpan.getAttribute(AiSemanticAttributes.SAMPLE_RATE);
}

// deal with sampling synchronously so that we only call setAttributeExceptionLogged()
// when we know we are emitting the exception (span sampling happens synchronously as well)

String stack = logRecord.getAttribute(ExceptionAttributes.EXCEPTION_STACKTRACE);

SamplingOverrides samplingOverrides =
stack != null ? exceptionSamplingOverrides : logSamplingOverrides;

SpanContext spanContext = logRecord.getSpanContext();

AiFixedPercentageSampler sampler = samplingOverrides.getOverride(logRecord.getAttributes());

boolean hasSamplingOverride = sampler != null;

if (!hasSamplingOverride && spanContext.isValid() && !spanContext.getTraceFlags().isSampled()) {
// if there is no sampling override, and the log is part of an unsampled trace,
// then don't capture it
return;
}

Double sampleRate = null;
if (hasSamplingOverride) {
SamplingResult samplingResult = sampler.shouldSampleLog(spanContext, parentSpanSampleRate);
if (samplingResult.getDecision() != SamplingDecision.RECORD_AND_SAMPLE) {
return;
}
sampleRate = samplingResult.getAttributes().get(AiSemanticAttributes.SAMPLE_RATE);
}

if (sampleRate == null) {
sampleRate = parentSpanSampleRate;
}

if (sampleRate != null) {
logRecord.setAttribute(AiSemanticAttributes.SAMPLE_RATE, sampleRate);
}

setAttributeExceptionLogged(LocalRootSpan.fromContext(context), logRecord);

batchLogRecordProcessor.onEmit(context, logRecord);
}

@Override
public CompletableResultCode shutdown() {
return batchLogRecordProcessor.shutdown();
}

@Override
public CompletableResultCode forceFlush() {
return batchLogRecordProcessor.forceFlush();
}

@Override
public void close() {
batchLogRecordProcessor.close();
}

@Nullable
private static Class<?> getSdkReadWriteLogRecordClass() {
try {
return Class.forName("io.opentelemetry.sdk.logs.SdkReadWriteLogRecord");
} catch (ClassNotFoundException e) {
return null;
}
}

@Nullable
private static Field getLockField(Class<?> sdkReadWriteLogRecordClass) {
if (sdkReadWriteLogRecordClass == null) {
return null;
}
try {
Field lockField = sdkReadWriteLogRecordClass.getDeclaredField("lock");
lockField.setAccessible(true);
return lockField;
} catch (NoSuchFieldException e) {
return null;
}
}

@Nullable
private static Field getAttributesMapField(Class<?> sdkReadWriteLogRecordClass) {
if (sdkReadWriteLogRecordClass == null) {
return null;
}
try {
Field attributesMapField = sdkReadWriteLogRecordClass.getDeclaredField("attributes");
attributesMapField.setAccessible(true);
return attributesMapField;
} catch (NoSuchFieldException e) {
return null;
}
}

private static void setAttributeExceptionLogged(Span span, ReadWriteLogRecord logRecord) {
if (lockField == null || attributesMapField == null) {
return;
}
String stacktrace = null;
try {
synchronized (lockField) {
// TODO add `getAttribute()` to `ReadWriteLogRecord` upstream
stacktrace =
((AttributesMap) attributesMapField.get(logRecord))
.get(ExceptionAttributes.EXCEPTION_STACKTRACE);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
if (stacktrace != null) {
span.setAttribute(AiSemanticAttributes.LOGGED_EXCEPTION, stacktrace);
}
}
}
Loading
Loading