Skip to content

Commit

Permalink
Small cleanup for batchprocessor split logic (open-telemetry#4284)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Oct 28, 2021
1 parent eb3601a commit 87405dd
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 196 deletions.
83 changes: 31 additions & 52 deletions processor/batchprocessor/splitlogs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,58 +127,6 @@ func TestSplitLogsMultipleResourceLogs_split_size_greater_than_log_size(t *testi
assert.Equal(t, "test-log-int-1-4", split.ResourceLogs().At(1).InstrumentationLibraryLogs().At(0).Logs().At(4).Name())
}

func BenchmarkSplitLogs(b *testing.B) {
md := pdata.NewLogs()
rms := md.ResourceLogs()
for i := 0; i < 20; i++ {
testdata.GenerateLogsManyLogRecordsSameResource(20).ResourceLogs().MoveAndAppendTo(md.ResourceLogs())
ms := rms.At(rms.Len() - 1).InstrumentationLibraryLogs().At(0).Logs()
for i := 0; i < ms.Len(); i++ {
ms.At(i).SetName(getTestLogName(1, i))
}
}

if b.N > 100000 {
b.Skipf("SKIP: b.N too high, set -benchtine=<n>x with n < 100000")
}

clones := make([]pdata.Logs, b.N)
for n := 0; n < b.N; n++ {
clones[n] = md.Clone()
}

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
cloneReq := clones[n]
split := splitLogs(128, cloneReq)
if split.LogRecordCount() != 128 || cloneReq.LogRecordCount() != 400-128 {
b.Fail()
}
}
}

func BenchmarkCloneLogs(b *testing.B) {
md := pdata.NewLogs()
rms := md.ResourceLogs()
for i := 0; i < 20; i++ {
testdata.GenerateLogsManyLogRecordsSameResource(20).ResourceLogs().MoveAndAppendTo(md.ResourceLogs())
ms := rms.At(rms.Len() - 1).InstrumentationLibraryLogs().At(0).Logs()
for i := 0; i < ms.Len(); i++ {
ms.At(i).SetName(getTestLogName(1, i))
}
}

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
cloneReq := md.Clone()
if cloneReq.LogRecordCount() != 400 {
b.Fail()
}
}
}

func TestSplitLogsMultipleILL(t *testing.T) {
td := testdata.GenerateLogsManyLogRecordsSameResource(20)
logs := td.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs()
Expand Down Expand Up @@ -208,3 +156,34 @@ func TestSplitLogsMultipleILL(t *testing.T) {
assert.Equal(t, "test-log-int-0-0", split.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().At(0).Name())
assert.Equal(t, "test-log-int-0-4", split.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().At(4).Name())
}

func BenchmarkSplitLogs(b *testing.B) {
md := pdata.NewLogs()
rms := md.ResourceLogs()
for i := 0; i < 20; i++ {
testdata.GenerateLogsManyLogRecordsSameResource(20).ResourceLogs().MoveAndAppendTo(md.ResourceLogs())
ms := rms.At(rms.Len() - 1).InstrumentationLibraryLogs().At(0).Logs()
for i := 0; i < ms.Len(); i++ {
ms.At(i).SetName(getTestLogName(1, i))
}
}

if b.N > 100000 {
b.Skipf("SKIP: b.N too high, set -benchtime=<n>x with n < 100000")
}

clones := make([]pdata.Logs, b.N)
for n := 0; n < b.N; n++ {
clones[n] = md.Clone()
}

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
cloneReq := clones[n]
split := splitLogs(128, cloneReq)
if split.LogRecordCount() != 128 || cloneReq.LogRecordCount() != 400-128 {
b.Fail()
}
}
}
126 changes: 64 additions & 62 deletions processor/batchprocessor/splitmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,42 +80,46 @@ func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {
}

// resourceMetricsDataPointCount calculates the total number of data points.
func resourceMetricsDataPointCount(rs pdata.ResourceMetrics) (dataPointCount int) {
for k := 0; k < rs.InstrumentationLibraryMetrics().Len(); k++ {
dataPointCount += metricSliceDataPointCount(rs.InstrumentationLibraryMetrics().At(k).Metrics())
func resourceMetricsDataPointCount(rs pdata.ResourceMetrics) int {
dataPointCount := 0
ilms := rs.InstrumentationLibraryMetrics()
for k := 0; k < ilms.Len(); k++ {
dataPointCount += metricSliceDataPointCount(ilms.At(k).Metrics())
}
return
return dataPointCount
}

// metricSliceDataPointCount calculates the total number of data points.
func metricSliceDataPointCount(ms pdata.MetricSlice) (dataPointCount int) {
func metricSliceDataPointCount(ms pdata.MetricSlice) int {
dataPointCount := 0
for k := 0; k < ms.Len(); k++ {
dataPointCount += metricDataPointCount(ms.At(k))
}
return
return dataPointCount
}

// metricDataPointCount calculates the total number of data points.
func metricDataPointCount(ms pdata.Metric) (dataPointCount int) {
func metricDataPointCount(ms pdata.Metric) int {
switch ms.DataType() {
case pdata.MetricDataTypeGauge:
dataPointCount = ms.Gauge().DataPoints().Len()
return ms.Gauge().DataPoints().Len()
case pdata.MetricDataTypeSum:
dataPointCount = ms.Sum().DataPoints().Len()
return ms.Sum().DataPoints().Len()
case pdata.MetricDataTypeHistogram:
dataPointCount = ms.Histogram().DataPoints().Len()
return ms.Histogram().DataPoints().Len()
case pdata.MetricDataTypeSummary:
dataPointCount = ms.Summary().DataPoints().Len()
return ms.Summary().DataPoints().Len()
}
return
return 0
}

// splitMetric removes metric points from the input data and moves data of the specified size to destination.
// Returns size of moved data and boolean describing, whether the metric should be removed from original slice.
func splitMetric(ms, dest pdata.Metric, size int) (int, bool) {
if metricDataPointCount(ms) <= size {
mdDPC := metricDataPointCount(ms)
if mdDPC <= size {
ms.MoveTo(dest)
return metricDataPointCount(dest), true
return mdDPC, true
}

dest.SetDataType(ms.DataType())
Expand All @@ -125,57 +129,55 @@ func splitMetric(ms, dest pdata.Metric, size int) (int, bool) {

switch ms.DataType() {
case pdata.MetricDataTypeGauge:
src := ms.Gauge().DataPoints()
dst := dest.Gauge().DataPoints()
dst.EnsureCapacity(size)
i := 0
src.RemoveIf(func(dp pdata.NumberDataPoint) bool {
defer func() { i++ }()
if i < size {
dp.MoveTo(dst.AppendEmpty())
return true
}
return false
})
return splitNumberDataPoints(ms.Gauge().DataPoints(), dest.Gauge().DataPoints(), size)
case pdata.MetricDataTypeSum:
src := ms.Sum().DataPoints()
dst := dest.Sum().DataPoints()
dst.EnsureCapacity(size)
i := 0
src.RemoveIf(func(dp pdata.NumberDataPoint) bool {
defer func() { i++ }()
if i < size {
dp.MoveTo(dst.AppendEmpty())
return true
}
return false
})
return splitNumberDataPoints(ms.Sum().DataPoints(), dest.Sum().DataPoints(), size)
case pdata.MetricDataTypeHistogram:
src := ms.Histogram().DataPoints()
dst := dest.Histogram().DataPoints()
dst.EnsureCapacity(size)
i := 0
src.RemoveIf(func(dp pdata.HistogramDataPoint) bool {
defer func() { i++ }()
if i < size {
dp.MoveTo(dst.AppendEmpty())
return true
}
return false
})
return splitHistogramDataPoints(ms.Histogram().DataPoints(), dest.Histogram().DataPoints(), size)
case pdata.MetricDataTypeSummary:
src := ms.Summary().DataPoints()
dst := dest.Summary().DataPoints()
dst.EnsureCapacity(size)
i := 0
src.RemoveIf(func(dp pdata.SummaryDataPoint) bool {
defer func() { i++ }()
if i < size {
dp.MoveTo(dst.AppendEmpty())
return true
}
return false
})
return splitSummaryDataPoints(ms.Summary().DataPoints(), dest.Summary().DataPoints(), size)
}
return size, false
}

func splitNumberDataPoints(src, dst pdata.NumberDataPointSlice, size int) (int, bool) {
dst.EnsureCapacity(size)
i := 0
src.RemoveIf(func(dp pdata.NumberDataPoint) bool {
if i < size {
dp.MoveTo(dst.AppendEmpty())
i++
return true
}
return false
})
return size, false
}

func splitHistogramDataPoints(src, dst pdata.HistogramDataPointSlice, size int) (int, bool) {
dst.EnsureCapacity(size)
i := 0
src.RemoveIf(func(dp pdata.HistogramDataPoint) bool {
if i < size {
dp.MoveTo(dst.AppendEmpty())
i++
return true
}
return false
})
return size, false
}

func splitSummaryDataPoints(src, dst pdata.SummaryDataPointSlice, size int) (int, bool) {
dst.EnsureCapacity(size)
i := 0
src.RemoveIf(func(dp pdata.SummaryDataPoint) bool {
if i < size {
dp.MoveTo(dst.AppendEmpty())
i++
return true
}
return false
})
return size, false
}
83 changes: 31 additions & 52 deletions processor/batchprocessor/splitmetrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,58 +136,6 @@ func TestSplitMetricsMultipleResourceSpans_SplitSizeGreaterThanMetricSize(t *tes
assert.Equal(t, "test-metric-int-1-4", split.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())
}

func BenchmarkSplitMetrics(b *testing.B) {
md := pdata.NewMetrics()
rms := md.ResourceMetrics()
for i := 0; i < 20; i++ {
testdata.GenerateMetricsManyMetricsSameResource(20).ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics())
ms := rms.At(rms.Len() - 1).InstrumentationLibraryMetrics().At(0).Metrics()
for i := 0; i < ms.Len(); i++ {
ms.At(i).SetName(getTestMetricName(1, i))
}
}

if b.N > 100000 {
b.Skipf("SKIP: b.N too high, set -benchtine=<n>x with n < 100000")
}

dataPointCount := metricDataPointCount(md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0))
clones := make([]pdata.Metrics, b.N)
for n := 0; n < b.N; n++ {
clones[n] = md.Clone()
}
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
cloneReq := clones[n]
split := splitMetrics(128*dataPointCount, cloneReq)
if split.MetricCount() != 128 || cloneReq.MetricCount() != 400-128 {
b.Fail()
}
}
}

func BenchmarkCloneMetrics(b *testing.B) {
md := pdata.NewMetrics()
rms := md.ResourceMetrics()
for i := 0; i < 20; i++ {
testdata.GenerateMetricsManyMetricsSameResource(20).ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics())
ms := rms.At(rms.Len() - 1).InstrumentationLibraryMetrics().At(0).Metrics()
for i := 0; i < ms.Len(); i++ {
ms.At(i).SetName(getTestMetricName(1, i))
}
}

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
cloneReq := md.Clone()
if cloneReq.MetricCount() != 400 {
b.Fail()
}
}
}

func TestSplitMetricsUneven(t *testing.T) {
md := testdata.GenerateMetricsManyMetricsSameResource(10)
metrics := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()
Expand Down Expand Up @@ -274,3 +222,34 @@ func TestSplitMetricsMultipleILM(t *testing.T) {
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-4", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())
}

func BenchmarkSplitMetrics(b *testing.B) {
md := pdata.NewMetrics()
rms := md.ResourceMetrics()
for i := 0; i < 20; i++ {
testdata.GenerateMetricsManyMetricsSameResource(20).ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics())
ms := rms.At(rms.Len() - 1).InstrumentationLibraryMetrics().At(0).Metrics()
for i := 0; i < ms.Len(); i++ {
ms.At(i).SetName(getTestMetricName(1, i))
}
}

if b.N > 100000 {
b.Skipf("SKIP: b.N too high, set -benchtime=<n>x with n < 100000")
}

dataPointCount := metricDataPointCount(md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0))
clones := make([]pdata.Metrics, b.N)
for n := 0; n < b.N; n++ {
clones[n] = md.Clone()
}
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
cloneReq := clones[n]
split := splitMetrics(128*dataPointCount, cloneReq)
if split.MetricCount() != 128 || cloneReq.MetricCount() != 400-128 {
b.Fail()
}
}
}
Loading

0 comments on commit 87405dd

Please sign in to comment.