Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log records dropped by the BatchProcessor #5276

Merged
merged 11 commits into from
May 8, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add `RecordFactory` in `go.opentelemetry.io/otel/log/logtest` to facilitate testing the bridge implementations. (#5263)
- Add `RecordFactory` in `go.opentelemetry.io/otel/sdk/log/logtest` to facilitate testing the exporter and processor implementations. (#5258)
- Add example for `go.opentelemetry.io/otel/exporters/stdout/stdoutlog`. (#5242)
- The count of dropped records from the `BatchProcessor` in `go.opentelemetry.io/otel/sdk/log` is logged. (#5276)

### Changed

Expand Down
14 changes: 14 additions & 0 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"
"sync/atomic"
"time"

"go.opentelemetry.io/otel/internal/global"
)

const (
Expand Down Expand Up @@ -148,6 +150,10 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {
return
}

if d := b.q.Dropped(); d > 0 {
global.Warn("dropped log records", "dropped", d)
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
}

qLen := b.q.TryDequeue(buf, func(r []Record) bool {
ok := b.exporter.EnqueueExport(r)
if ok {
Expand Down Expand Up @@ -253,6 +259,7 @@ func (b *BatchProcessor) ForceFlush(ctx context.Context) error {
type queue struct {
sync.Mutex

dropped atomic.Uint64
cap, len int
read, write *ring
}
Expand All @@ -266,6 +273,12 @@ func newQueue(size int) *queue {
}
}

// Dropped returns the number of Records dropped during enqueueing since the
// last time Dropped was called.
func (q *queue) Dropped() uint64 {
return q.dropped.Swap(0)
}

// Enqueue adds r to the queue. The queue size, including the addition of r, is
// returned.
//
Expand All @@ -283,6 +296,7 @@ func (q *queue) Enqueue(r Record) int {
// Overflow. Advance read to be the new "oldest".
q.len = q.cap
q.read = q.read.Next()
q.dropped.Add(1)
}
return q.len
}
Expand Down
51 changes: 51 additions & 0 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@
package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"bytes"
"context"
stdlog "log"
"slices"
"strconv"
"sync"
"testing"
"time"
"unsafe"

"github.com/go-logr/stdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/log"
)

Expand Down Expand Up @@ -413,6 +417,41 @@ func TestBatchProcessor(t *testing.T) {
})
})

t.Run("DroppedLogs", func(t *testing.T) {
orig := global.GetLogger()
t.Cleanup(func() { global.SetLogger(orig) })
buf := new(bytes.Buffer)
stdr.SetVerbosity(1)
global.SetLogger(stdr.New(stdlog.New(buf, "", 0)))

e := newTestExporter(nil)
e.ExportTrigger = make(chan struct{})

b := NewBatchProcessor(
e,
WithMaxQueueSize(1),
WithExportMaxBatchSize(1),
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
var r Record
assert.NoError(t, b.OnEmit(ctx, r), "queued")
assert.NoError(t, b.OnEmit(ctx, r), "dropped")

var n int
require.Eventually(t, func() bool {
n = e.ExportN()
return n > 0
}, 2*time.Second, time.Microsecond, "blocked export not attempted")

got := buf.String()
want := `"level"=1 "msg"="dropped log records" "dropped"=1`
assert.Contains(t, got, want)

close(e.ExportTrigger)
_ = b.Shutdown(ctx)
})

t.Run("ConcurrentSafe", func(t *testing.T) {
const goRoutines = 10

Expand Down Expand Up @@ -488,6 +527,18 @@ func TestQueue(t *testing.T) {
assert.Equal(t, []Record{r, r}, q.Flush(), "flushed Records")
})

t.Run("Dropped", func(t *testing.T) {
q := newQueue(1)

_ = q.Enqueue(r)
_ = q.Enqueue(r)
assert.Equal(t, uint64(1), q.Dropped(), "fist")

_ = q.Enqueue(r)
_ = q.Enqueue(r)
assert.Equal(t, uint64(2), q.Dropped(), "second")
})

t.Run("Flush", func(t *testing.T) {
const size = 2
q := newQueue(size)
Expand Down
2 changes: 1 addition & 1 deletion sdk/log/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.21

require (
github.com/go-logr/logr v1.4.1
github.com/go-logr/stdr v1.2.2
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel v1.26.0
go.opentelemetry.io/otel/log v0.2.0-alpha
Expand All @@ -13,7 +14,6 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel/metric v1.26.0 // indirect
golang.org/x/sys v0.20.0 // indirect
Expand Down