Skip to content

Use a more efficient serializer and write bytes directly to disk #2138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions disk-buffering/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ val protos by configurations.creating
dependencies {
api("io.opentelemetry:opentelemetry-sdk")
implementation("io.opentelemetry:opentelemetry-api-incubator")
implementation("io.opentelemetry:opentelemetry-exporter-otlp-common")
compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")
signature("com.toasttab.android:gummy-bears-api-21:0.12.0:coreLib@signature")
Expand Down Expand Up @@ -63,9 +64,10 @@ wire {
}

root(
"opentelemetry.proto.trace.v1.TracesData",
"opentelemetry.proto.metrics.v1.MetricsData",
"opentelemetry.proto.logs.v1.LogsData",
// These are the types used by the Java SDK's OTLP exporters.
"opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest",
"opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest",
"opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest",
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.internal.exporter;

import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;

class NoopSerializer<T> implements SignalSerializer<T> {

@Override
public NoopSerializer<T> initialize(Collection<T> data) {
return this;
}

@Override
public void writeBinaryTo(OutputStream output) throws IOException {}

@Override
public int getBinarySerializedSize() {
return 0;
}

@Override
public void reset() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ public static <T> ToDiskExporterBuilder<T> builder(Storage storage) {
return new ToDiskExporterBuilder<>(storage);
}

public CompletableResultCode export(Collection<EXPORT_DATA> data) {
public synchronized CompletableResultCode export(Collection<EXPORT_DATA> data) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why sychronized?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serializer is stateful now, plus it seems like a bad idea to allow multiple exports to be writing to disk concurrently anyway.

logger.log("Intercepting exporter batch.", Level.FINER);
try {
if (storage.write(serializer.serialize(data))) {
serializer.initialize(data);
if (storage.write(serializer)) {
return CompletableResultCode.ofSuccess();
}
logger.log("Could not store batch in disk. Exporting it right away.");
Expand All @@ -52,6 +53,8 @@ public CompletableResultCode export(Collection<EXPORT_DATA> data) {
Level.WARNING,
e);
return exportFunction.apply(data);
} finally {
serializer.reset();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

public final class ToDiskExporterBuilder<T> {

private SignalSerializer<T> serializer = ts -> new byte[0];
private SignalSerializer<T> serializer = new NoopSerializer<T>();

private final Storage storage;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.ProtoLogsDataMapper;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.proto.logs.v1.LogsData;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import java.io.IOException;
import java.util.List;
Expand All @@ -24,7 +24,8 @@ static LogRecordDataDeserializer getInstance() {
@Override
public List<LogRecordData> deserialize(byte[] source) throws DeserializationException {
try {
return ProtoLogsDataMapper.getInstance().fromProto(LogsData.ADAPTER.decode(source));
return ProtoLogsDataMapper.getInstance()
.fromProto(ExportLogsServiceRequest.ADAPTER.decode(source));
} catch (IOException e) {
throw new DeserializationException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.metrics.ProtoMetricsDataMapper;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.proto.metrics.v1.MetricsData;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.io.IOException;
import java.util.List;
Expand All @@ -24,7 +24,8 @@ static MetricDataDeserializer getInstance() {
@Override
public List<MetricData> deserialize(byte[] source) throws DeserializationException {
try {
return ProtoMetricsDataMapper.getInstance().fromProto(MetricsData.ADAPTER.decode(source));
return ProtoMetricsDataMapper.getInstance()
.fromProto(ExportMetricsServiceRequest.ADAPTER.decode(source));
} catch (IOException e) {
throw new DeserializationException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.ProtoSpansDataMapper;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.proto.trace.v1.TracesData;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.util.List;
Expand All @@ -24,7 +24,8 @@ static SpanDataDeserializer getInstance() {
@Override
public List<SpanData> deserialize(byte[] source) throws DeserializationException {
try {
return ProtoSpansDataMapper.getInstance().fromProto(TracesData.ADAPTER.decode(source));
return ProtoSpansDataMapper.getInstance()
.fromProto(ExportTraceServiceRequest.ADAPTER.decode(source));
} catch (IOException e) {
throw new DeserializationException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ public static ByteStringMapper getInstance() {
}

public ByteString stringToProto(String source) {
return ByteString.encodeUtf8(source);
return ByteString.decodeHex(source);
}

public String protoToString(ByteString source) {
return source.utf8();
return source.hex();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs;

import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.common.BaseProtoSignalsDataMapper;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.logs.v1.LogRecord;
import io.opentelemetry.proto.logs.v1.LogsData;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.logs.v1.ScopeLogs;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
Expand All @@ -19,7 +19,7 @@

public final class ProtoLogsDataMapper
extends BaseProtoSignalsDataMapper<
LogRecordData, LogRecord, LogsData, ResourceLogs, ScopeLogs> {
LogRecordData, LogRecord, ExportLogsServiceRequest, ResourceLogs, ScopeLogs> {

private static final ProtoLogsDataMapper INSTANCE = new ProtoLogsDataMapper();

Expand All @@ -39,12 +39,12 @@ protected LogRecordData protoToSignalItem(
}

@Override
protected List<ResourceLogs> getProtoResources(LogsData logsData) {
protected List<ResourceLogs> getProtoResources(ExportLogsServiceRequest logsData) {
return logsData.resource_logs;
}

@Override
protected LogsData createProtoData(
protected ExportLogsServiceRequest createProtoData(
Map<Resource, Map<InstrumentationScopeInfo, List<LogRecord>>> itemsByResource) {
List<ResourceLogs> items = new ArrayList<>();
itemsByResource.forEach(
Expand All @@ -58,7 +58,7 @@ protected LogsData createProtoData(
}
items.add(resourceLogsBuilder.build());
});
return new LogsData.Builder().resource_logs(items).build();
return new ExportLogsServiceRequest.Builder().resource_logs(items).build();
}

private ScopeLogs.Builder createProtoScopeBuilder(InstrumentationScopeInfo scopeInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.metrics;

import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.common.BaseProtoSignalsDataMapper;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.MetricsData;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
Expand All @@ -19,7 +19,7 @@

public final class ProtoMetricsDataMapper
extends BaseProtoSignalsDataMapper<
MetricData, Metric, MetricsData, ResourceMetrics, ScopeMetrics> {
MetricData, Metric, ExportMetricsServiceRequest, ResourceMetrics, ScopeMetrics> {

private static final ProtoMetricsDataMapper INSTANCE = new ProtoMetricsDataMapper();

Expand All @@ -39,12 +39,12 @@ protected MetricData protoToSignalItem(
}

@Override
protected List<ResourceMetrics> getProtoResources(MetricsData protoData) {
protected List<ResourceMetrics> getProtoResources(ExportMetricsServiceRequest protoData) {
return protoData.resource_metrics;
}

@Override
protected MetricsData createProtoData(
protected ExportMetricsServiceRequest createProtoData(
Map<Resource, Map<InstrumentationScopeInfo, List<Metric>>> itemsByResource) {
List<ResourceMetrics> items = new ArrayList<>();
itemsByResource.forEach(
Expand All @@ -58,7 +58,7 @@ protected MetricsData createProtoData(
}
items.add(resourceMetricsBuilder.build());
});
return new MetricsData.Builder().resource_metrics(items).build();
return new ExportMetricsServiceRequest.Builder().resource_metrics(items).build();
}

private ScopeMetrics.Builder createProtoScopeBuilder(InstrumentationScopeInfo scopeInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans;

import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.common.BaseProtoSignalsDataMapper;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.proto.trace.v1.ScopeSpans;
import io.opentelemetry.proto.trace.v1.Span;
import io.opentelemetry.proto.trace.v1.TracesData;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.data.SpanData;
Expand All @@ -18,7 +18,8 @@
import java.util.Map;

public final class ProtoSpansDataMapper
extends BaseProtoSignalsDataMapper<SpanData, Span, TracesData, ResourceSpans, ScopeSpans> {
extends BaseProtoSignalsDataMapper<
SpanData, Span, ExportTraceServiceRequest, ResourceSpans, ScopeSpans> {

private static final ProtoSpansDataMapper INSTANCE = new ProtoSpansDataMapper();

Expand All @@ -32,7 +33,7 @@ protected Span signalItemToProto(SpanData sourceData) {
}

@Override
protected List<ResourceSpans> getProtoResources(TracesData protoData) {
protected List<ResourceSpans> getProtoResources(ExportTraceServiceRequest protoData) {
return protoData.resource_spans;
}

Expand All @@ -43,7 +44,7 @@ protected SpanData protoToSignalItem(
}

@Override
protected TracesData createProtoData(
protected ExportTraceServiceRequest createProtoData(
Map<Resource, Map<InstrumentationScopeInfo, List<Span>>> itemsByResource) {
List<ResourceSpans> items = new ArrayList<>();
itemsByResource.forEach(
Expand All @@ -57,7 +58,7 @@ protected TracesData createProtoData(
}
items.add(resourceSpansBuilder.build());
});
return new TracesData.Builder().resource_spans(items).build();
return new ExportTraceServiceRequest.Builder().resource_spans(items).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,41 @@

package io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers;

import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.ProtoLogsDataMapper;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.contrib.disk.buffering.internal.utils.ProtobufTools;
import io.opentelemetry.proto.logs.v1.LogsData;
import io.opentelemetry.exporter.internal.otlp.logs.LowAllocationLogsRequestMarshaler;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;

public final class LogRecordDataSerializer implements SignalSerializer<LogRecordData> {
private static final LogRecordDataSerializer INSTANCE = new LogRecordDataSerializer();

private LogRecordDataSerializer() {}
private final LowAllocationLogsRequestMarshaler marshaler =
new LowAllocationLogsRequestMarshaler();

static LogRecordDataSerializer getInstance() {
return INSTANCE;
LogRecordDataSerializer() {}

@CanIgnoreReturnValue
@Override
public LogRecordDataSerializer initialize(Collection<LogRecordData> data) {
marshaler.initialize(data);
return this;
}

@Override
public void writeBinaryTo(OutputStream output) throws IOException {
ProtobufTools.writeRawVarint32(marshaler.getBinarySerializedSize(), output);
marshaler.writeBinaryTo(output);
}

@Override
public int getBinarySerializedSize() {
return marshaler.getBinarySerializedSize();
}

@Override
public byte[] serialize(Collection<LogRecordData> logRecordData) {
LogsData proto = ProtoLogsDataMapper.getInstance().toProto(logRecordData);
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
int size = LogsData.ADAPTER.encodedSize(proto);
ProtobufTools.writeRawVarint32(size, out);
proto.encode(out);
return out.toByteArray();
} catch (IOException e) {
throw new IllegalStateException(e);
}
public void reset() {
marshaler.reset();
}
}
Loading
Loading