Skip to content

Commit

Permalink
Add memory mode support to OTLP exporters (#6430)
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg authored May 9, 2024
1 parent 715211e commit 0d2d67e
Show file tree
Hide file tree
Showing 34 changed files with 499 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSender;
import io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSender;
Expand Down Expand Up @@ -67,7 +68,7 @@ public void export(

private static ManagedChannel defaultGrpcChannel;

private static GrpcExporter<TraceRequestMarshaler> upstreamGrpcExporter;
private static GrpcExporter<Marshaler> upstreamGrpcExporter;
private static GrpcExporter<TraceRequestMarshaler> okhttpGrpcSender;
private static HttpExporter<TraceRequestMarshaler> httpExporter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@

import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler;
import io.opentelemetry.exporter.internal.otlp.logs.LowAllocationLogsRequestMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.StringJoiner;
import javax.annotation.concurrent.ThreadSafe;

/**
Expand All @@ -22,14 +28,18 @@
@ThreadSafe
public final class OtlpHttpLogRecordExporter implements LogRecordExporter {

private final HttpExporterBuilder<LogsRequestMarshaler> builder;
private final HttpExporter<LogsRequestMarshaler> delegate;
private final Deque<LowAllocationLogsRequestMarshaler> marshalerPool = new ArrayDeque<>();
private final HttpExporterBuilder<Marshaler> builder;
private final HttpExporter<Marshaler> delegate;
private final MemoryMode memoryMode;

OtlpHttpLogRecordExporter(
HttpExporterBuilder<LogsRequestMarshaler> builder,
HttpExporter<LogsRequestMarshaler> delegate) {
HttpExporterBuilder<Marshaler> builder,
HttpExporter<Marshaler> delegate,
MemoryMode memoryMode) {
this.builder = builder;
this.delegate = delegate;
this.memoryMode = memoryMode;
}

/**
Expand Down Expand Up @@ -61,7 +71,7 @@ public static OtlpHttpLogRecordExporterBuilder builder() {
* @since 1.29.0
*/
public OtlpHttpLogRecordExporterBuilder toBuilder() {
return new OtlpHttpLogRecordExporterBuilder(builder.copy());
return new OtlpHttpLogRecordExporterBuilder(builder.copy(), memoryMode);
}

/**
Expand All @@ -72,8 +82,24 @@ public OtlpHttpLogRecordExporterBuilder toBuilder() {
*/
@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
LogsRequestMarshaler exportRequest = LogsRequestMarshaler.create(logs);
return delegate.export(exportRequest, logs.size());
if (memoryMode == MemoryMode.REUSABLE_DATA) {
LowAllocationLogsRequestMarshaler marshaler = marshalerPool.poll();
if (marshaler == null) {
marshaler = new LowAllocationLogsRequestMarshaler();
}
LowAllocationLogsRequestMarshaler exportMarshaler = marshaler;
exportMarshaler.initialize(logs);
return delegate
.export(exportMarshaler, logs.size())
.whenComplete(
() -> {
exportMarshaler.reset();
marshalerPool.add(exportMarshaler);
});
}
// MemoryMode == MemoryMode.IMMUTABLE_DATA
LogsRequestMarshaler request = LogsRequestMarshaler.create(logs);
return delegate.export(request, logs.size());
}

@Override
Expand All @@ -89,6 +115,9 @@ public CompletableResultCode shutdown() {

@Override
public String toString() {
return "OtlpHttpLogRecordExporter{" + builder.toString(false) + "}";
StringJoiner joiner = new StringJoiner(", ", "OtlpHttpLogRecordExporter{", "}");
joiner.add(builder.toString(false));
joiner.add("memoryMode=" + memoryMode);
return joiner.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
import io.opentelemetry.exporter.internal.compression.CompressorProvider;
import io.opentelemetry.exporter.internal.compression.CompressorUtil;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.common.export.ProxyOptions;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.time.Duration;
Expand All @@ -33,16 +34,19 @@
public final class OtlpHttpLogRecordExporterBuilder {

private static final String DEFAULT_ENDPOINT = "http://localhost:4318/v1/logs";
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA;

private final HttpExporterBuilder<LogsRequestMarshaler> delegate;
private final HttpExporterBuilder<Marshaler> delegate;
private MemoryMode memoryMode;

OtlpHttpLogRecordExporterBuilder(HttpExporterBuilder<LogsRequestMarshaler> delegate) {
OtlpHttpLogRecordExporterBuilder(HttpExporterBuilder<Marshaler> delegate, MemoryMode memoryMode) {
this.delegate = delegate;
this.memoryMode = memoryMode;
OtlpUserAgent.addUserAgentHeader(delegate::addConstantHeaders);
}

OtlpHttpLogRecordExporterBuilder() {
this(new HttpExporterBuilder<>("otlp", "log", DEFAULT_ENDPOINT));
this(new HttpExporterBuilder<>("otlp", "log", DEFAULT_ENDPOINT), DEFAULT_MEMORY_MODE);
}

/**
Expand Down Expand Up @@ -206,12 +210,19 @@ public OtlpHttpLogRecordExporterBuilder setMeterProvider(
return this;
}

/** Set the {@link MemoryMode}. */
OtlpHttpLogRecordExporterBuilder setMemoryMode(MemoryMode memoryMode) {
requireNonNull(memoryMode, "memoryMode");
this.memoryMode = memoryMode;
return this;
}

/**
* Constructs a new instance of the exporter based on the builder's values.
*
* @return a new exporter's instance
*/
public OtlpHttpLogRecordExporter build() {
return new OtlpHttpLogRecordExporter(delegate, delegate.build());
return new OtlpHttpLogRecordExporter(delegate, delegate.build(), memoryMode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.metrics.LowAllocationMetricsRequestMarshaler;
import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.MemoryMode;
Expand All @@ -17,7 +19,9 @@
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.StringJoiner;
import javax.annotation.concurrent.ThreadSafe;

Expand All @@ -29,15 +33,16 @@
@ThreadSafe
public final class OtlpHttpMetricExporter implements MetricExporter {

private final HttpExporterBuilder<MetricsRequestMarshaler> builder;
private final HttpExporter<MetricsRequestMarshaler> delegate;
private final Deque<LowAllocationMetricsRequestMarshaler> marshalerPool = new ArrayDeque<>();
private final HttpExporterBuilder<Marshaler> builder;
private final HttpExporter<Marshaler> delegate;
private final AggregationTemporalitySelector aggregationTemporalitySelector;
private final DefaultAggregationSelector defaultAggregationSelector;
private final MemoryMode memoryMode;

OtlpHttpMetricExporter(
HttpExporterBuilder<MetricsRequestMarshaler> builder,
HttpExporter<MetricsRequestMarshaler> delegate,
HttpExporterBuilder<Marshaler> builder,
HttpExporter<Marshaler> delegate,
AggregationTemporalitySelector aggregationTemporalitySelector,
DefaultAggregationSelector defaultAggregationSelector,
MemoryMode memoryMode) {
Expand Down Expand Up @@ -103,8 +108,24 @@ public MemoryMode getMemoryMode() {
*/
@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
MetricsRequestMarshaler exportRequest = MetricsRequestMarshaler.create(metrics);
return delegate.export(exportRequest, metrics.size());
if (memoryMode == MemoryMode.REUSABLE_DATA) {
LowAllocationMetricsRequestMarshaler marshaler = marshalerPool.poll();
if (marshaler == null) {
marshaler = new LowAllocationMetricsRequestMarshaler();
}
LowAllocationMetricsRequestMarshaler exportMarshaler = marshaler;
exportMarshaler.initialize(metrics);
return delegate
.export(exportMarshaler, metrics.size())
.whenComplete(
() -> {
exportMarshaler.reset();
marshalerPool.add(exportMarshaler);
});
}
// MemoryMode == MemoryMode.IMMUTABLE_DATA
MetricsRequestMarshaler request = MetricsRequestMarshaler.create(metrics);
return delegate.export(request, metrics.size());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import io.opentelemetry.exporter.internal.compression.CompressorProvider;
import io.opentelemetry.exporter.internal.compression.CompressorUtil;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.common.export.ProxyOptions;
Expand Down Expand Up @@ -42,16 +42,15 @@ public final class OtlpHttpMetricExporterBuilder {
AggregationTemporalitySelector.alwaysCumulative();
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA;

private final HttpExporterBuilder<MetricsRequestMarshaler> delegate;
private final HttpExporterBuilder<Marshaler> delegate;
private AggregationTemporalitySelector aggregationTemporalitySelector =
DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR;

private DefaultAggregationSelector defaultAggregationSelector =
DefaultAggregationSelector.getDefault();
private MemoryMode memoryMode;

OtlpHttpMetricExporterBuilder(
HttpExporterBuilder<MetricsRequestMarshaler> delegate, MemoryMode memoryMode) {
OtlpHttpMetricExporterBuilder(HttpExporterBuilder<Marshaler> delegate, MemoryMode memoryMode) {
this.delegate = delegate;
this.memoryMode = memoryMode;
delegate.setMeterProvider(MeterProvider::noop);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@

import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.traces.LowAllocationTraceRequestMarshaler;
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.StringJoiner;
import javax.annotation.concurrent.ThreadSafe;

/**
Expand All @@ -22,14 +28,18 @@
@ThreadSafe
public final class OtlpHttpSpanExporter implements SpanExporter {

private final HttpExporterBuilder<TraceRequestMarshaler> builder;
private final HttpExporter<TraceRequestMarshaler> delegate;
private final Deque<LowAllocationTraceRequestMarshaler> marshalerPool = new ArrayDeque<>();
private final HttpExporterBuilder<Marshaler> builder;
private final HttpExporter<Marshaler> delegate;
private final MemoryMode memoryMode;

OtlpHttpSpanExporter(
HttpExporterBuilder<TraceRequestMarshaler> builder,
HttpExporter<TraceRequestMarshaler> delegate) {
HttpExporterBuilder<Marshaler> builder,
HttpExporter<Marshaler> delegate,
MemoryMode memoryMode) {
this.builder = builder;
this.delegate = delegate;
this.memoryMode = memoryMode;
}

/**
Expand Down Expand Up @@ -61,7 +71,7 @@ public static OtlpHttpSpanExporterBuilder builder() {
* @since 1.29.0
*/
public OtlpHttpSpanExporterBuilder toBuilder() {
return new OtlpHttpSpanExporterBuilder(builder.copy());
return new OtlpHttpSpanExporterBuilder(builder.copy(), memoryMode);
}

/**
Expand All @@ -72,8 +82,24 @@ public OtlpHttpSpanExporterBuilder toBuilder() {
*/
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
TraceRequestMarshaler exportRequest = TraceRequestMarshaler.create(spans);
return delegate.export(exportRequest, spans.size());
if (memoryMode == MemoryMode.REUSABLE_DATA) {
LowAllocationTraceRequestMarshaler marshaler = marshalerPool.poll();
if (marshaler == null) {
marshaler = new LowAllocationTraceRequestMarshaler();
}
LowAllocationTraceRequestMarshaler exportMarshaler = marshaler;
exportMarshaler.initialize(spans);
return delegate
.export(exportMarshaler, spans.size())
.whenComplete(
() -> {
exportMarshaler.reset();
marshalerPool.add(exportMarshaler);
});
}
// MemoryMode == MemoryMode.IMMUTABLE_DATA
TraceRequestMarshaler request = TraceRequestMarshaler.create(spans);
return delegate.export(request, spans.size());
}

/**
Expand All @@ -94,6 +120,9 @@ public CompletableResultCode shutdown() {

@Override
public String toString() {
return "OtlpHttpSpanExporter{" + builder.toString(false) + "}";
StringJoiner joiner = new StringJoiner(", ", "OtlpHttpSpanExporter{", "}");
joiner.add(builder.toString(false));
joiner.add("memoryMode=" + memoryMode);
return joiner.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
import io.opentelemetry.exporter.internal.compression.CompressorProvider;
import io.opentelemetry.exporter.internal.compression.CompressorUtil;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.common.export.ProxyOptions;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.time.Duration;
Expand All @@ -33,16 +34,19 @@
public final class OtlpHttpSpanExporterBuilder {

private static final String DEFAULT_ENDPOINT = "http://localhost:4318/v1/traces";
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA;

private final HttpExporterBuilder<TraceRequestMarshaler> delegate;
private final HttpExporterBuilder<Marshaler> delegate;
private MemoryMode memoryMode;

OtlpHttpSpanExporterBuilder(HttpExporterBuilder<TraceRequestMarshaler> delegate) {
OtlpHttpSpanExporterBuilder(HttpExporterBuilder<Marshaler> delegate, MemoryMode memoryMode) {
this.delegate = delegate;
this.memoryMode = memoryMode;
OtlpUserAgent.addUserAgentHeader(delegate::addConstantHeaders);
}

OtlpHttpSpanExporterBuilder() {
this(new HttpExporterBuilder<>("otlp", "span", DEFAULT_ENDPOINT));
this(new HttpExporterBuilder<>("otlp", "span", DEFAULT_ENDPOINT), DEFAULT_MEMORY_MODE);
}

/**
Expand Down Expand Up @@ -207,12 +211,19 @@ public OtlpHttpSpanExporterBuilder setMeterProvider(
return this;
}

/** Set the {@link MemoryMode}. */
OtlpHttpSpanExporterBuilder setMemoryMode(MemoryMode memoryMode) {
requireNonNull(memoryMode, "memoryMode");
this.memoryMode = memoryMode;
return this;
}

/**
* Constructs a new instance of the exporter based on the builder's values.
*
* @return a new exporter's instance
*/
public OtlpHttpSpanExporter build() {
return new OtlpHttpSpanExporter(delegate, delegate.build());
return new OtlpHttpSpanExporter(delegate, delegate.build(), memoryMode);
}
}
Loading

0 comments on commit 0d2d67e

Please sign in to comment.