diff --git a/model/otlp/pb_marshaler.go b/model/otlp/pb_marshaler.go index 73740861a57..c35e43e3a87 100644 --- a/model/otlp/pb_marshaler.go +++ b/model/otlp/pb_marshaler.go @@ -19,27 +19,32 @@ import ( "go.opentelemetry.io/collector/model/pdata" ) -// NewProtobufTracesMarshaler returns a model.TracesMarshaler. Marshals to OTLP binary protobuf bytes. +// NewProtobufTracesMarshaler returns a pdata.TracesMarshaler. Marshals to OTLP binary protobuf bytes. func NewProtobufTracesMarshaler() pdata.TracesMarshaler { return newPbMarshaler() } -// NewProtobufMetricsMarshaler returns a model.MetricsMarshaler. Marshals to OTLP binary protobuf bytes. +// NewProtobufMetricsMarshaler returns a pdata.MetricsMarshaler. Marshals to OTLP binary protobuf bytes. func NewProtobufMetricsMarshaler() pdata.MetricsMarshaler { return newPbMarshaler() } -// NewProtobufLogsMarshaler returns a model.LogsMarshaler. Marshals to OTLP binary protobuf bytes. +// NewProtobufLogsMarshaler returns a pdata.LogsMarshaler. Marshals to OTLP binary protobuf bytes. func NewProtobufLogsMarshaler() pdata.LogsMarshaler { return newPbMarshaler() } +// TODO(#3842): Figure out how we want to represent/return *Sizers. type pbMarshaler struct{} func newPbMarshaler() *pbMarshaler { return &pbMarshaler{} } +var _ pdata.TracesSizer = (*pbMarshaler)(nil) +var _ pdata.MetricsSizer = (*pbMarshaler)(nil) +var _ pdata.LogsSizer = (*pbMarshaler)(nil) + func (e *pbMarshaler) MarshalLogs(ld pdata.Logs) ([]byte, error) { return internal.LogsToOtlp(ld.InternalRep()).Marshal() } @@ -51,3 +56,15 @@ func (e *pbMarshaler) MarshalMetrics(md pdata.Metrics) ([]byte, error) { func (e *pbMarshaler) MarshalTraces(td pdata.Traces) ([]byte, error) { return internal.TracesToOtlp(td.InternalRep()).Marshal() } + +func (e *pbMarshaler) TracesSize(td pdata.Traces) int { + return internal.TracesToOtlp(td.InternalRep()).Size() +} + +func (e *pbMarshaler) MetricsSize(md pdata.Metrics) int { + return internal.MetricsToOtlp(md.InternalRep()).Size() +} + +func (e *pbMarshaler) LogsSize(ld pdata.Logs) int { + return internal.LogsToOtlp(ld.InternalRep()).Size() +} diff --git a/model/otlp/pb_test.go b/model/otlp/pb_test.go index 87713bab614..b3e284403de 100644 --- a/model/otlp/pb_test.go +++ b/model/otlp/pb_test.go @@ -42,6 +42,65 @@ func TestProtobufTracesUnmarshaler_error(t *testing.T) { assert.Error(t, err) } +func TestProtobufTracesSizer(t *testing.T) { + sizer := NewProtobufTracesMarshaler().(pdata.TracesSizer) + marshaler := NewProtobufTracesMarshaler() + td := pdata.NewTraces() + rms := td.ResourceSpans() + rms.AppendEmpty().InstrumentationLibrarySpans().AppendEmpty().Spans().AppendEmpty().SetName("foo") + + size := sizer.TracesSize(td) + + bytes, err := marshaler.MarshalTraces(td) + require.NoError(t, err) + assert.Equal(t, len(bytes), size) +} + +func TestProtobufTracesSizer_withNil(t *testing.T) { + sizer := NewProtobufTracesMarshaler().(pdata.TracesSizer) + + assert.Equal(t, 0, sizer.TracesSize(pdata.NewTraces())) +} + +func TestProtobufMetricsSizer(t *testing.T) { + sizer := NewProtobufMetricsMarshaler().(pdata.MetricsSizer) + marshaler := NewProtobufMetricsMarshaler() + md := pdata.NewMetrics() + md.ResourceMetrics().AppendEmpty().InstrumentationLibraryMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("foo") + + size := sizer.MetricsSize(md) + + bytes, err := marshaler.MarshalMetrics(md) + require.NoError(t, err) + assert.Equal(t, len(bytes), size) +} + +func TestProtobufMetricsSizer_withNil(t *testing.T) { + sizer := NewProtobufMetricsMarshaler().(pdata.MetricsSizer) + + assert.Equal(t, 0, sizer.MetricsSize(pdata.NewMetrics())) +} + +func TestProtobufLogsSizer(t *testing.T) { + sizer := NewProtobufLogsMarshaler().(pdata.LogsSizer) + marshaler := NewProtobufLogsMarshaler() + ld := pdata.NewLogs() + ld.ResourceLogs().AppendEmpty().InstrumentationLibraryLogs().AppendEmpty().Logs().AppendEmpty().SetName("foo") + + size := sizer.LogsSize(ld) + + bytes, err := marshaler.MarshalLogs(ld) + require.NoError(t, err) + assert.Equal(t, len(bytes), size) + +} + +func TestProtobufLogsSizer_withNil(t *testing.T) { + sizer := NewProtobufLogsMarshaler().(pdata.LogsSizer) + + assert.Equal(t, 0, sizer.LogsSize(pdata.NewLogs())) +} + func BenchmarkLogsToProtobuf(b *testing.B) { marshaler := NewProtobufLogsMarshaler() logs := generateBenchmarkLogs(128) diff --git a/model/pdata/logs.go b/model/pdata/logs.go index 87d82c23055..cb57c394eae 100644 --- a/model/pdata/logs.go +++ b/model/pdata/logs.go @@ -24,7 +24,7 @@ import ( type LogsMarshaler interface { // MarshalLogs the given pdata.Logs into bytes. // If the error is not nil, the returned bytes slice cannot be used. - MarshalLogs(td Logs) ([]byte, error) + MarshalLogs(ld Logs) ([]byte, error) } // LogsUnmarshaler unmarshalls bytes into pdata.Logs. @@ -34,6 +34,12 @@ type LogsUnmarshaler interface { UnmarshalLogs(buf []byte) (Logs, error) } +// LogsSizer returns the size of a Logs. +type LogsSizer interface { + // LogsSize returns the size in bytes of a Logs. + LogsSize(ld Logs) int +} + // Logs is the top-level struct that is propagated through the logs pipeline. // // This is a reference type (like builtin map). @@ -85,12 +91,6 @@ func (ld Logs) LogRecordCount() int { return logCount } -// OtlpProtoSize returns the size in bytes of this Logs encoded as OTLP Collector -// ExportLogsServiceRequest ProtoBuf bytes. -func (ld Logs) OtlpProtoSize() int { - return ld.orig.Size() -} - // ResourceLogs returns the ResourceLogsSlice associated with this Logs. func (ld Logs) ResourceLogs() ResourceLogsSlice { return newResourceLogsSlice(&ld.orig.ResourceLogs) diff --git a/model/pdata/metrics.go b/model/pdata/metrics.go index 92f9f5d55fb..2562cb1e57c 100644 --- a/model/pdata/metrics.go +++ b/model/pdata/metrics.go @@ -24,7 +24,7 @@ import ( type MetricsMarshaler interface { // MarshalMetrics the given pdata.Metrics into bytes. // If the error is not nil, the returned bytes slice cannot be used. - MarshalMetrics(td Metrics) ([]byte, error) + MarshalMetrics(md Metrics) ([]byte, error) } // MetricsUnmarshaler unmarshalls bytes into pdata.Metrics. @@ -34,6 +34,12 @@ type MetricsUnmarshaler interface { UnmarshalMetrics(buf []byte) (Metrics, error) } +// MetricsSizer returns the size of a Metrics. +type MetricsSizer interface { + // LogsSize returns the size in bytes of a Metrics. + MetricsSize(md Metrics) int +} + // Metrics is an opaque interface that allows transition to the new internal Metrics data, but also facilitates the // transition to the new components, especially for traces. // @@ -87,12 +93,6 @@ func (md Metrics) MetricCount() int { return metricCount } -// OtlpProtoSize returns the size in bytes of this Metrics encoded as OTLP Collector -// ExportMetricsServiceRequest ProtoBuf bytes. -func (md Metrics) OtlpProtoSize() int { - return md.orig.Size() -} - // DataPointCount calculates the total number of data points. func (md Metrics) DataPointCount() (dataPointCount int) { rms := md.ResourceMetrics() diff --git a/model/pdata/metrics_test.go b/model/pdata/metrics_test.go index 8ac678b29a4..a76cc14c92d 100644 --- a/model/pdata/metrics_test.go +++ b/model/pdata/metrics_test.go @@ -19,7 +19,6 @@ import ( gogoproto "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" goproto "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/emptypb" @@ -148,22 +147,6 @@ func TestMetricCount(t *testing.T) { assert.EqualValues(t, 6, md.MetricCount()) } -func TestMetricsSize(t *testing.T) { - assert.Equal(t, 0, NewMetrics().OtlpProtoSize()) - - md := generateMetricsEmptyDataPoints() - orig := md.orig - size := orig.Size() - bytes, err := orig.Marshal() - require.NoError(t, err) - assert.Equal(t, size, md.OtlpProtoSize()) - assert.Equal(t, len(bytes), md.OtlpProtoSize()) -} - -func TestMetricsSizeWithNil(t *testing.T) { - assert.Equal(t, 0, NewMetrics().OtlpProtoSize()) -} - func TestMetricCountWithEmpty(t *testing.T) { assert.EqualValues(t, 0, generateMetricsEmptyResource().MetricCount()) assert.EqualValues(t, 0, generateMetricsEmptyInstrumentation().MetricCount()) diff --git a/model/pdata/traces.go b/model/pdata/traces.go index 6d861894ac5..3b435631c72 100644 --- a/model/pdata/traces.go +++ b/model/pdata/traces.go @@ -34,6 +34,12 @@ type TracesUnmarshaler interface { UnmarshalTraces(buf []byte) (Traces, error) } +// TracesSizer returns the size of a Traces. +type TracesSizer interface { + // TracesSize returns the size in bytes of a Traces. + TracesSize(td Traces) int +} + // Traces is the top-level struct that is propagated through the traces pipeline. type Traces struct { orig *otlpcollectortrace.ExportTraceServiceRequest @@ -77,12 +83,6 @@ func (td Traces) SpanCount() int { return spanCount } -// OtlpProtoSize returns the size in bytes of this Traces encoded as OTLP Collector -// ExportTraceServiceRequest ProtoBuf bytes. -func (td Traces) OtlpProtoSize() int { - return td.orig.Size() -} - // ResourceSpans returns the ResourceSpansSlice associated with this Metrics. func (td Traces) ResourceSpans() ResourceSpansSlice { return newResourceSpansSlice(&td.orig.ResourceSpans) diff --git a/model/pdata/traces_test.go b/model/pdata/traces_test.go index 6e3161ec9b1..6ae3c3c8971 100644 --- a/model/pdata/traces_test.go +++ b/model/pdata/traces_test.go @@ -19,7 +19,6 @@ import ( gogoproto "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" goproto "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/emptypb" @@ -52,23 +51,6 @@ func TestSpanCount(t *testing.T) { assert.EqualValues(t, 6, md.SpanCount()) } -func TestTracesSize(t *testing.T) { - assert.Equal(t, 0, NewTraces().OtlpProtoSize()) - td := NewTraces() - rms := td.ResourceSpans() - rms.AppendEmpty().InstrumentationLibrarySpans().AppendEmpty().Spans().AppendEmpty().SetName("foo") - orig := td.orig - size := orig.Size() - bytes, err := orig.Marshal() - require.NoError(t, err) - assert.Equal(t, size, td.OtlpProtoSize()) - assert.Equal(t, len(bytes), td.OtlpProtoSize()) -} - -func TestTracesSizeWithNil(t *testing.T) { - assert.Equal(t, 0, NewTraces().OtlpProtoSize()) -} - func TestSpanCountWithEmpty(t *testing.T) { assert.EqualValues(t, 0, Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{ ResourceSpans: []*otlptrace.ResourceSpans{{}}, diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index bcde61c8521..c7ccab58003 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/model/otlp" "go.opentelemetry.io/collector/model/pdata" ) @@ -223,10 +224,11 @@ type batchTraces struct { nextConsumer consumer.Traces traceData pdata.Traces spanCount int + sizer pdata.TracesSizer } func newBatchTraces(nextConsumer consumer.Traces) *batchTraces { - return &batchTraces{nextConsumer: nextConsumer, traceData: pdata.NewTraces()} + return &batchTraces{nextConsumer: nextConsumer, traceData: pdata.NewTraces(), sizer: otlp.NewProtobufTracesMarshaler().(pdata.TracesSizer)} } // add updates current batchTraces by adding new TraceData object @@ -259,17 +261,18 @@ func (bt *batchTraces) itemCount() int { } func (bt *batchTraces) size() int { - return bt.traceData.OtlpProtoSize() + return bt.sizer.TracesSize(bt.traceData) } type batchMetrics struct { nextConsumer consumer.Metrics metricData pdata.Metrics dataPointCount int + sizer pdata.MetricsSizer } func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics { - return &batchMetrics{nextConsumer: nextConsumer, metricData: pdata.NewMetrics()} + return &batchMetrics{nextConsumer: nextConsumer, metricData: pdata.NewMetrics(), sizer: otlp.NewProtobufMetricsMarshaler().(pdata.MetricsSizer)} } func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int) error { @@ -290,7 +293,7 @@ func (bm *batchMetrics) itemCount() int { } func (bm *batchMetrics) size() int { - return bm.metricData.OtlpProtoSize() + return bm.sizer.MetricsSize(bm.metricData) } func (bm *batchMetrics) add(item interface{}) { @@ -308,10 +311,11 @@ type batchLogs struct { nextConsumer consumer.Logs logData pdata.Logs logCount int + sizer pdata.LogsSizer } func newBatchLogs(nextConsumer consumer.Logs) *batchLogs { - return &batchLogs{nextConsumer: nextConsumer, logData: pdata.NewLogs()} + return &batchLogs{nextConsumer: nextConsumer, logData: pdata.NewLogs(), sizer: otlp.NewProtobufLogsMarshaler().(pdata.LogsSizer)} } func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int) error { @@ -332,7 +336,7 @@ func (bl *batchLogs) itemCount() int { } func (bl *batchLogs) size() int { - return bl.logData.OtlpProtoSize() + return bl.sizer.LogsSize(bl.logData) } func (bl *batchLogs) add(item interface{}) { diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index 08c282b738c..49f890a437e 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -31,6 +31,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/testdata" + "go.opentelemetry.io/collector/model/otlp" "go.opentelemetry.io/collector/model/pdata" ) @@ -119,6 +120,7 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { } func TestBatchProcessorSentBySize(t *testing.T) { + sizer := otlp.NewProtobufTracesMarshaler().(pdata.TracesSizer) views := MetricViews() require.NoError(t, view.Register(views...)) defer view.Unregister(views...) @@ -140,7 +142,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { sizeSum := 0 for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTracesManySpansSameResource(spansPerRequest) - sizeSum += td.OtlpProtoSize() + sizeSum += sizer.TracesSize(td) assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) } @@ -306,6 +308,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { } func TestBatchMetricProcessor_BatchSize(t *testing.T) { + sizer := otlp.NewProtobufMetricsMarshaler().(pdata.MetricsSizer) views := MetricViews() require.NoError(t, view.Register(views...)) defer view.Unregister(views...) @@ -333,7 +336,7 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) { size := 0 for requestNum := 0; requestNum < requestCount; requestNum++ { md := testdata.GenerateMetricsManyMetricsSameResource(metricsPerRequest) - size += md.OtlpProtoSize() + size += sizer.MetricsSize(md) assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) } require.NoError(t, batcher.Shutdown(context.Background())) @@ -508,9 +511,10 @@ func getTestMetricName(requestNum, index int) string { } func BenchmarkTraceSizeBytes(b *testing.B) { + sizer := otlp.NewProtobufTracesMarshaler().(pdata.TracesSizer) td := testdata.GenerateTracesManySpansSameResource(8192) for n := 0; n < b.N; n++ { - fmt.Println(td.OtlpProtoSize()) + fmt.Println(sizer.TracesSize(td)) } } @@ -620,6 +624,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { } func TestBatchLogProcessor_BatchSize(t *testing.T) { + sizer := otlp.NewProtobufLogsMarshaler().(pdata.LogsSizer) views := MetricViews() require.NoError(t, view.Register(views...)) defer view.Unregister(views...) @@ -645,7 +650,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { size := 0 for requestNum := 0; requestNum < requestCount; requestNum++ { ld := testdata.GenerateLogsManyLogRecordsSameResource(logsPerRequest) - size += ld.OtlpProtoSize() + size += sizer.LogsSize(ld) assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) } require.NoError(t, batcher.Shutdown(context.Background()))