Skip to content

Commit

Permalink
Merge pull request quarkusio#43983 from alesj/simplespanprocessor_i29448
Browse files Browse the repository at this point in the history
Add SimpleSpanProcessor support
  • Loading branch information
brunobat authored Nov 11, 2024
2 parents 7a6b0ee + 95fea0b commit e7fb440
Show file tree
Hide file tree
Showing 24 changed files with 368 additions and 52 deletions.
8 changes: 8 additions & 0 deletions docs/src/main/asciidoc/_includes/opentelemetry-config.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,11 @@ quarkus.otel.exporter.otlp.logs.endpoint=http://logs-uri:4317 // <3>
<1> The endpoint for the traces exporter.
<2> The endpoint for the metrics exporter.
<3> The endpoint for the logs exporter.

If you need that your spans and logs to be exported directly as they finish
(e.g. in a serverless environment / application), you can set this property to `true`.
This replaces the default batching of data.
[source,properties]
----
quarkus.otel.simple=true
----
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ AdditionalBeanBuildItem ensureProducerIsRetained() {
return AdditionalBeanBuildItem.builder()
.setUnremovable()
.addBeanClasses(
AutoConfiguredOpenTelemetrySdkBuilderCustomizer.SimpleLogRecordProcessorCustomizer.class,
AutoConfiguredOpenTelemetrySdkBuilderCustomizer.TracingResourceCustomizer.class,
AutoConfiguredOpenTelemetrySdkBuilderCustomizer.SamplerCustomizer.class,
AutoConfiguredOpenTelemetrySdkBuilderCustomizer.TracerProviderCustomizer.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterConfigBuilder;
import io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterRuntimeConfig;
import io.quarkus.opentelemetry.runtime.exporter.otlp.OTelExporterRecorder;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundBatchSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundSpanProcessor;
import io.quarkus.tls.TlsConfigurationRegistry;
import io.quarkus.tls.TlsRegistryBuildItem;
import io.quarkus.vertx.core.deployment.CoreVertxBuildItem;
Expand Down Expand Up @@ -84,7 +84,8 @@ void config(BuildProducer<RunTimeConfigBuilderBuildItem> runTimeConfigBuilderPro
@BuildStep(onlyIf = OtlpExporterProcessor.OtlpTracingExporterEnabled.class)
@Record(ExecutionTime.RUNTIME_INIT)
@Consume(TlsRegistryBuildItem.class)
void createBatchSpanProcessor(OTelExporterRecorder recorder,
void createSpanProcessor(OTelExporterRecorder recorder,
OTelBuildConfig oTelBuildConfig,
OTelRuntimeConfig otelRuntimeConfig,
OtlpExporterRuntimeConfig exporterRuntimeConfig,
CoreVertxBuildItem vertxBuildItem,
Expand All @@ -95,16 +96,16 @@ void createBatchSpanProcessor(OTelExporterRecorder recorder,
return;
}
syntheticBeanBuildItemBuildProducer.produce(SyntheticBeanBuildItem
.configure(LateBoundBatchSpanProcessor.class)
.configure(LateBoundSpanProcessor.class)
.types(SpanProcessor.class)
.setRuntimeInit()
.scope(Singleton.class)
.unremovable()
.addInjectionPoint(ParameterizedType.create(DotName.createSimple(Instance.class),
new Type[] { ClassType.create(DotName.createSimple(SpanExporter.class.getName())) }, null))
.addInjectionPoint(ClassType.create(DotName.createSimple(TlsConfigurationRegistry.class)))
.createWith(recorder.batchSpanProcessorForOtlp(otelRuntimeConfig, exporterRuntimeConfig,
vertxBuildItem.getVertx()))
.createWith(recorder.spanProcessorForOtlp(oTelBuildConfig, otelRuntimeConfig,
exporterRuntimeConfig, vertxBuildItem.getVertx()))
.done());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.quarkus.opentelemetry.runtime.config.build.OTelBuildConfig;
import io.quarkus.opentelemetry.runtime.config.runtime.OTelRuntimeConfig;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundBatchSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundSpanProcessor;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.vertx.HttpInstrumenterVertxTracer;
import io.quarkus.test.QuarkusUnitTest;
import io.vertx.core.spi.observability.HttpRequest;
Expand All @@ -26,7 +26,7 @@ public class OpenTelemetryDisabledSdkTest {
.overrideConfigKey("quarkus.otel.sdk.disabled", "true");

@Inject
LateBoundBatchSpanProcessor batchSpanProcessor;
LateBoundSpanProcessor spanProcessor;

@Inject
OpenTelemetry openTelemetry;
Expand All @@ -40,7 +40,7 @@ public class OpenTelemetryDisabledSdkTest {
@Test
void testNoTracer() {
// The OTel API doesn't provide a clear way to check if a tracer is an effective NOOP tracer.
Assertions.assertTrue(batchSpanProcessor.isDelegateNull(), "BatchSpanProcessor delegate must not be set");
Assertions.assertTrue(spanProcessor.isDelegateNull(), "SpanProcessor delegate must not be set");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundBatchSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundSpanProcessor;
import io.quarkus.test.QuarkusUnitTest;

public class OtlpTraceExporterDisabledTest {
Expand All @@ -25,15 +25,15 @@ public class OtlpTraceExporterDisabledTest {
OpenTelemetry openTelemetry;

@Inject
Instance<LateBoundBatchSpanProcessor> lateBoundBatchSpanProcessorInstance;
Instance<LateBoundSpanProcessor> lateBoundSpanProcessorInstance;

@Inject
Instance<MetricExporter> metricExporters;

@Test
void testOpenTelemetryButNoBatchSpanProcessor() {
assertNotNull(openTelemetry);
assertFalse(lateBoundBatchSpanProcessorInstance.isResolvable());
assertFalse(lateBoundSpanProcessorInstance.isResolvable());
assertFalse(metricExporters.isResolvable());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.lang.reflect.InvocationTargetException;
import java.util.Locale;

import jakarta.inject.Inject;

Expand Down Expand Up @@ -36,6 +37,7 @@ public class OpenTelemetrySamplerConfigTest {
void test() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException, NoSuchMethodException {
Sampler sampler = TestUtil.getSampler(openTelemetry);

assertEquals(String.format("TraceIdRatioBased{%.6f}", 0.5d), sampler.getDescription());
// Fix the locale to ROOT, so we don't get 0,500000
assertEquals(String.format(Locale.ROOT, "TraceIdRatioBased{%.6f}", 0.5d), sampler.getDescription());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
import io.opentelemetry.sdk.logs.export.BatchLogRecordProcessor;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.IdGenerator;
Expand All @@ -27,7 +31,7 @@
import io.quarkus.arc.All;
import io.quarkus.opentelemetry.runtime.config.build.OTelBuildConfig;
import io.quarkus.opentelemetry.runtime.config.runtime.OTelRuntimeConfig;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.RemoveableLateBoundBatchSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.RemoveableLateBoundSpanProcessor;
import io.quarkus.opentelemetry.runtime.propagation.TextMapPropagatorCustomizer;
import io.quarkus.opentelemetry.runtime.tracing.DelayedAttributes;
import io.quarkus.opentelemetry.runtime.tracing.DropTargetsSampler;
Expand All @@ -39,6 +43,47 @@ public interface AutoConfiguredOpenTelemetrySdkBuilderCustomizer {

void customize(AutoConfiguredOpenTelemetrySdkBuilder builder);

@Singleton
final class SimpleLogRecordProcessorCustomizer implements AutoConfiguredOpenTelemetrySdkBuilderCustomizer {
private SimpleLogRecordProcessorBiFunction biFunction;

public SimpleLogRecordProcessorCustomizer(
OTelBuildConfig oTelBuildConfig,
Instance<LogRecordExporter> ilre) {
if (oTelBuildConfig.simple() && ilre.isResolvable()) {
LogRecordProcessor lrp = SimpleLogRecordProcessor.create(ilre.get());
this.biFunction = new SimpleLogRecordProcessorBiFunction(lrp);
}
}

@Override
public void customize(AutoConfiguredOpenTelemetrySdkBuilder builder) {
if (biFunction != null) {
builder.addLogRecordProcessorCustomizer(biFunction);
}
}
}

class SimpleLogRecordProcessorBiFunction
implements BiFunction<LogRecordProcessor, ConfigProperties, LogRecordProcessor> {

private final LogRecordProcessor logRecordProcessor;

public SimpleLogRecordProcessorBiFunction(LogRecordProcessor logRecordProcessor) {
this.logRecordProcessor = logRecordProcessor;
}

@Override
public LogRecordProcessor apply(LogRecordProcessor lrp, ConfigProperties cp) {
// only change batch lrp, leave others
if (lrp instanceof BatchLogRecordProcessor) {
return logRecordProcessor;
} else {
return lrp;
}
}
}

@Singleton
final class TracingResourceCustomizer implements AutoConfiguredOpenTelemetrySdkBuilderCustomizer {

Expand Down Expand Up @@ -174,7 +219,7 @@ public SdkTracerProviderBuilder apply(SdkTracerProviderBuilder tracerProviderBui
spanProcessors.stream().filter(new Predicate<SpanProcessor>() {
@Override
public boolean test(SpanProcessor sp) {
return !(sp instanceof RemoveableLateBoundBatchSpanProcessor);
return !(sp instanceof RemoveableLateBoundSpanProcessor);
}
})
.forEach(tracerProviderBuilder::addSpanProcessor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ public interface OTelBuildConfig {
@WithDefault("true")
boolean enabled();

/**
* Should we use simple processor for spans and log records.
* This will disable batch processing and the exporter will send
* telemetry data right away.
* This is recommended for serverless applications.
* <p>
* Defaults to <code>false</code>.
*/
@WithDefault("false")
boolean simple();

/**
* Trace exporter configurations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregationUtil;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessorBuilder;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.quarkus.arc.SyntheticCreationalContext;
import io.quarkus.opentelemetry.runtime.config.build.OTelBuildConfig;
import io.quarkus.opentelemetry.runtime.config.runtime.BatchSpanProcessorConfig;
import io.quarkus.opentelemetry.runtime.config.runtime.OTelRuntimeConfig;
import io.quarkus.opentelemetry.runtime.config.runtime.exporter.*;
import io.quarkus.opentelemetry.runtime.exporter.otlp.logs.NoopLogRecordExporter;
Expand All @@ -48,8 +52,8 @@
import io.quarkus.opentelemetry.runtime.exporter.otlp.metrics.VertxHttpMetricsExporter;
import io.quarkus.opentelemetry.runtime.exporter.otlp.sender.VertxGrpcSender;
import io.quarkus.opentelemetry.runtime.exporter.otlp.sender.VertxHttpSender;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundBatchSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.RemoveableLateBoundBatchSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.RemoveableLateBoundSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.VertxGrpcSpanExporter;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.VertxHttpSpanExporter;
import io.quarkus.runtime.annotations.Recorder;
Expand All @@ -68,23 +72,24 @@ public class OTelExporterRecorder {
public static final String BASE2EXPONENTIAL_AGGREGATION_NAME = AggregationUtil
.aggregationName(Aggregation.base2ExponentialBucketHistogram());

public Function<SyntheticCreationalContext<LateBoundBatchSpanProcessor>, LateBoundBatchSpanProcessor> batchSpanProcessorForOtlp(
public Function<SyntheticCreationalContext<LateBoundSpanProcessor>, LateBoundSpanProcessor> spanProcessorForOtlp(
OTelBuildConfig oTelBuildConfig,
OTelRuntimeConfig otelRuntimeConfig,
OtlpExporterRuntimeConfig exporterRuntimeConfig,
Supplier<Vertx> vertx) {
URI baseUri = getTracesUri(exporterRuntimeConfig); // do the creation and validation here in order to preserve backward compatibility
return new Function<>() {
@Override
public LateBoundBatchSpanProcessor apply(
SyntheticCreationalContext<LateBoundBatchSpanProcessor> context) {
public LateBoundSpanProcessor apply(
SyntheticCreationalContext<LateBoundSpanProcessor> context) {
if (otelRuntimeConfig.sdkDisabled() || baseUri == null) {
return RemoveableLateBoundBatchSpanProcessor.INSTANCE;
return RemoveableLateBoundSpanProcessor.INSTANCE;
}
// Only create the OtlpGrpcSpanExporter if an endpoint was set in runtime config and was properly validated at startup
Instance<SpanExporter> spanExporters = context.getInjectedReference(new TypeLiteral<>() {
});
if (!spanExporters.isUnsatisfied()) {
return RemoveableLateBoundBatchSpanProcessor.INSTANCE;
return RemoveableLateBoundSpanProcessor.INSTANCE;
}

try {
Expand All @@ -93,15 +98,21 @@ public LateBoundBatchSpanProcessor apply(
var spanExporter = createSpanExporter(exporterRuntimeConfig, vertx.get(), baseUri,
tlsConfigurationRegistry);

BatchSpanProcessorBuilder processorBuilder = BatchSpanProcessor.builder(spanExporter);
if (oTelBuildConfig.simple()) {
SimpleSpanProcessorBuilder processorBuilder = SimpleSpanProcessor.builder(spanExporter);
return new LateBoundSpanProcessor(processorBuilder.build());
} else {
BatchSpanProcessorBuilder processorBuilder = BatchSpanProcessor.builder(spanExporter);

processorBuilder.setScheduleDelay(otelRuntimeConfig.bsp().scheduleDelay());
processorBuilder.setMaxQueueSize(otelRuntimeConfig.bsp().maxQueueSize());
processorBuilder.setMaxExportBatchSize(otelRuntimeConfig.bsp().maxExportBatchSize());
processorBuilder.setExporterTimeout(otelRuntimeConfig.bsp().exportTimeout());
// processorBuilder.setMeterProvider() // TODO add meter provider to span processor.
BatchSpanProcessorConfig bspc = otelRuntimeConfig.bsp();
processorBuilder.setScheduleDelay(bspc.scheduleDelay());
processorBuilder.setMaxQueueSize(bspc.maxQueueSize());
processorBuilder.setMaxExportBatchSize(bspc.maxExportBatchSize());
processorBuilder.setExporterTimeout(bspc.exportTimeout());
// processorBuilder.setMeterProvider() // TODO add meter provider to span processor.

return new LateBoundBatchSpanProcessor(processorBuilder.build());
return new LateBoundSpanProcessor(processorBuilder.build());
}
} catch (IllegalArgumentException iae) {
throw new IllegalStateException("Unable to install OTLP Exporter", iae);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,19 @@
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;

/**
* Class to facilitate a delay in when the worker thread inside {@link BatchSpanProcessor}
* Class to facilitate a delay in when the worker thread inside {@link SpanProcessor}
* is started, enabling Quarkus to instantiate a {@link io.opentelemetry.api.trace.TracerProvider}
* during static initialization and set a {@link BatchSpanProcessor} delegate during runtime initialization.
* during static initialization and set a {@link SpanProcessor} delegate during runtime initialization.
*/
public class LateBoundBatchSpanProcessor implements SpanProcessor {
private static final Logger log = Logger.getLogger(LateBoundBatchSpanProcessor.class);
public class LateBoundSpanProcessor implements SpanProcessor {
private static final Logger log = Logger.getLogger(LateBoundSpanProcessor.class);

private boolean warningLogged = false;
private BatchSpanProcessor delegate;
private SpanProcessor delegate;

public LateBoundBatchSpanProcessor(BatchSpanProcessor delegate) {
public LateBoundSpanProcessor(SpanProcessor delegate) {
this.delegate = delegate;
}

Expand Down Expand Up @@ -104,7 +103,7 @@ private void resetDelegate() {
*/
private void logDelegateNotFound() {
if (!warningLogged) {
log.warn("No BatchSpanProcessor delegate specified, no action taken.");
log.warn("No SpanProcessor delegate specified, no action taken.");
warningLogged = true;
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.quarkus.opentelemetry.runtime.exporter.otlp.tracing;

import io.quarkus.opentelemetry.runtime.AutoConfiguredOpenTelemetrySdkBuilderCustomizer.TracerProviderCustomizer;

/**
* The only point in having this class is to allow {@link TracerProviderCustomizer}
* to easily ignore the configured {@link LateBoundSpanProcessor}.
*/
public final class RemoveableLateBoundSpanProcessor extends LateBoundSpanProcessor {

public static final RemoveableLateBoundSpanProcessor INSTANCE = new RemoveableLateBoundSpanProcessor();

private RemoveableLateBoundSpanProcessor() {
super(null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.quarkus.it.opentelemetry.vertx.exporter;

import java.util.Map;

import io.quarkus.test.junit.QuarkusTestProfile;

public final class SimpleProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
return Map.of("quarkus.otel.simple", "true");
}
}
Loading

0 comments on commit e7fb440

Please sign in to comment.