Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC] Change Processor.OnEmit to return record passed to next registered processor #5470

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {
}

// OnEmit batches provided log record.
func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error {
func (b *BatchProcessor) OnEmit(_ context.Context, r Record) (Record, error) {
if b.stopped.Load() || b.q == nil {
return nil
return r, nil
}
if n := b.q.Enqueue(r); n >= b.batchSize {
select {
Expand All @@ -189,7 +189,7 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error {
// records.
}
}
return nil
return r, nil
}

// Enabled returns if b is enabled.
Expand Down
48 changes: 32 additions & 16 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ func TestEmptyBatchConfig(t *testing.T) {
var bp BatchProcessor
ctx := context.Background()
var record Record
assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit")
record.SetBody(log.StringValue("message"))
r, err := bp.OnEmit(ctx, record)
assert.Equal(t, record, r, "OnEmit record")
assert.NoError(t, err, "OnEmit err")
assert.False(t, bp.Enabled(ctx, record), "Enabled")
assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush")
assert.NoError(t, bp.Shutdown(ctx), "Shutdown")
Expand Down Expand Up @@ -198,7 +201,8 @@ func TestBatchProcessor(t *testing.T) {
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, size) {
assert.NoError(t, b.OnEmit(ctx, r))
_, err := b.OnEmit(ctx, r)
assert.NoError(t, err)
}
var got []Record
assert.Eventually(t, func() bool {
Expand All @@ -221,7 +225,8 @@ func TestBatchProcessor(t *testing.T) {
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, 10*batch) {
assert.NoError(t, b.OnEmit(ctx, r))
_, err := b.OnEmit(ctx, r)
assert.NoError(t, err)
}
assert.Eventually(t, func() bool {
return e.ExportN() > 1
Expand All @@ -244,7 +249,8 @@ func TestBatchProcessor(t *testing.T) {
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, 2*batch) {
assert.NoError(t, b.OnEmit(ctx, r))
_, err := b.OnEmit(ctx, r)
assert.NoError(t, err)
}

var n int
Expand All @@ -255,7 +261,7 @@ func TestBatchProcessor(t *testing.T) {

var err error
require.Eventually(t, func() bool {
err = b.OnEmit(ctx, Record{})
_, err = b.OnEmit(ctx, Record{})
return true
}, time.Second, time.Microsecond, "OnEmit blocked")
assert.NoError(t, err)
Expand Down Expand Up @@ -303,15 +309,17 @@ func TestBatchProcessor(t *testing.T) {
assert.NoError(t, b.Shutdown(ctx))

want := e.ExportN()
assert.NoError(t, b.OnEmit(ctx, Record{}))
_, err := b.OnEmit(ctx, Record{})
assert.NoError(t, err)
assert.Equal(t, want, e.ExportN(), "Export called after shutdown")
})

t.Run("ForceFlush", func(t *testing.T) {
e := newTestExporter(nil)
b := NewBatchProcessor(e)

assert.NoError(t, b.OnEmit(ctx, Record{}))
_, err := b.OnEmit(ctx, Record{})
assert.NoError(t, err)
assert.NoError(t, b.Shutdown(ctx))

assert.NoError(t, b.ForceFlush(ctx))
Expand Down Expand Up @@ -346,7 +354,8 @@ func TestBatchProcessor(t *testing.T) {

var r Record
r.SetBody(log.BoolValue(true))
require.NoError(t, b.OnEmit(ctx, r))
_, err := b.OnEmit(ctx, r)
require.NoError(t, err)

assert.ErrorIs(t, b.ForceFlush(ctx), assert.AnError, "exporter error not returned")
assert.Equal(t, 1, e.ForceFlushN(), "exporter ForceFlush calls")
Expand Down Expand Up @@ -381,7 +390,8 @@ func TestBatchProcessor(t *testing.T) {

// Enqueue 10 x "batch size" amount of records.
for i := 0; i < 10*batch; i++ {
require.NoError(t, b.OnEmit(ctx, Record{}))
_, err := b.OnEmit(ctx, Record{})
require.NoError(t, err)
}
assert.Eventually(t, func() bool {
return e.ExportN() > 0 && len(b.exporter.input) == cap(b.exporter.input)
Expand Down Expand Up @@ -425,7 +435,7 @@ func TestBatchProcessor(t *testing.T) {

var r Record
r.SetBody(log.BoolValue(true))
_ = b.OnEmit(ctx, r)
_, _ = b.OnEmit(ctx, r)
t.Cleanup(func() { _ = b.Shutdown(ctx) })
t.Cleanup(func() { close(e.ExportTrigger) })

Expand Down Expand Up @@ -455,21 +465,26 @@ func TestBatchProcessor(t *testing.T) {
)
var r Record
// First record will be blocked by testExporter.Export
assert.NoError(t, b.OnEmit(ctx, r), "exported record")

_, err := b.OnEmit(ctx, r)
assert.NoError(t, err, "exported record")
require.Eventually(t, func() bool {
return e.ExportN() > 0
}, 2*time.Second, time.Microsecond, "blocked export not attempted")

// Second record will be written to export queue
assert.NoError(t, b.OnEmit(ctx, r), "export queue record")
_, err = b.OnEmit(ctx, r)
assert.NoError(t, err, "export queue record")
require.Eventually(t, func() bool {
return len(b.exporter.input) == cap(b.exporter.input)
}, 2*time.Second, time.Microsecond, "blocked queue read not attempted")

// Third record will be written to BatchProcessor.q
assert.NoError(t, b.OnEmit(ctx, r), "first queued")
_, err = b.OnEmit(ctx, r)
assert.NoError(t, err, "first queued")
// The previous record will be dropped, as the new one will be written to BatchProcessor.q
assert.NoError(t, b.OnEmit(ctx, r), "second queued")
_, err = b.OnEmit(ctx, r)
assert.NoError(t, err, "second queued")

wantMsg := `"level"=1 "msg"="dropped log records" "dropped"=1`
assert.Eventually(t, func() bool {
Expand Down Expand Up @@ -497,7 +512,8 @@ func TestBatchProcessor(t *testing.T) {
case <-ctx.Done():
return
default:
assert.NoError(t, b.OnEmit(ctx, Record{}))
_, err := b.OnEmit(ctx, Record{})
assert.NoError(t, err)
// Ignore partial flush errors.
_ = b.ForceFlush(ctx)
}
Expand Down Expand Up @@ -663,7 +679,7 @@ func BenchmarkBatchProcessorOnEmit(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
var err error
for pb.Next() {
err = bp.OnEmit(ctx, r)
r, err = bp.OnEmit(ctx, r)
}
_ = err
})
Expand Down
86 changes: 63 additions & 23 deletions sdk/log/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,47 +16,67 @@ import (
func BenchmarkProcessor(b *testing.B) {
for _, tc := range []struct {
name string
f func() Processor
f func() []Processor
}{
{
name: "Simple",
f: func() Processor {
return NewSimpleProcessor(noopExporter{})
f: func() []Processor {
return []Processor{
NewSimpleProcessor(noopExporter{}),
}
},
},
{
name: "Batch",
f: func() Processor {
return NewBatchProcessor(noopExporter{})
f: func() []Processor {
return []Processor{
NewBatchProcessor(noopExporter{}),
}
},
},
{
name: "ModifyTimestampSimple",
f: func() Processor {
return timestampDecorator{NewSimpleProcessor(noopExporter{})}
f: func() []Processor {
return []Processor{
timestampSetter{},
NewSimpleProcessor(noopExporter{}),
}
},
},
{
name: "ModifyTimestampBatch",
f: func() Processor {
return timestampDecorator{NewBatchProcessor(noopExporter{})}
f: func() []Processor {
return []Processor{
timestampSetter{},
NewBatchProcessor(noopExporter{}),
}
},
},
{
name: "ModifyAttributesSimple",
f: func() Processor {
return attrDecorator{NewSimpleProcessor(noopExporter{})}
f: func() []Processor {
return []Processor{
attrSetter{},
NewSimpleProcessor(noopExporter{}),
}
},
},
{
name: "ModifyAttributesBatch",
f: func() Processor {
return attrDecorator{NewBatchProcessor(noopExporter{})}
f: func() []Processor {
return []Processor{
attrSetter{},
NewBatchProcessor(noopExporter{}),
}
},
},
} {
b.Run(tc.name, func(b *testing.B) {
provider := NewLoggerProvider(WithProcessor(tc.f()))
var opts []LoggerProviderOption
for _, p := range tc.f() {
opts = append(opts, WithProcessor(p))
}
provider := NewLoggerProvider(opts...)
b.Cleanup(func() { assert.NoError(b, provider.Shutdown(context.Background())) })
logger := provider.Logger(b.Name())

Expand All @@ -79,22 +99,42 @@ func BenchmarkProcessor(b *testing.B) {
}
}

type timestampDecorator struct {
Processor
}
type timestampSetter struct{}

func (e timestampDecorator) OnEmit(ctx context.Context, r Record) error {
func (e timestampSetter) OnEmit(ctx context.Context, r Record) (Record, error) {
r = r.Clone()
r.SetObservedTimestamp(time.Date(1988, time.November, 17, 0, 0, 0, 0, time.UTC))
return e.Processor.OnEmit(ctx, r)
return r, nil
}

func (e timestampSetter) Enabled(context.Context, Record) bool {
return true
}

type attrDecorator struct {
Processor
func (e timestampSetter) Shutdown(ctx context.Context) error {
return nil
}

func (e attrDecorator) OnEmit(ctx context.Context, r Record) error {
func (e timestampSetter) ForceFlush(ctx context.Context) error {
return nil
}

type attrSetter struct{}

func (e attrSetter) OnEmit(ctx context.Context, r Record) (Record, error) {
r = r.Clone()
r.SetAttributes(log.String("replace", "me"))
return e.Processor.OnEmit(ctx, r)
return r, nil
}

func (e attrSetter) Enabled(context.Context, Record) bool {
return true
}

func (e attrSetter) Shutdown(ctx context.Context) error {
return nil
}

func (e attrSetter) ForceFlush(ctx context.Context) error {
return nil
}
5 changes: 3 additions & 2 deletions sdk/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ func newLogger(p *LoggerProvider, scope instrumentation.Scope) *logger {
}

func (l *logger) Emit(ctx context.Context, r log.Record) {
newRecord := l.newRecord(ctx, r)
record := l.newRecord(ctx, r)
for _, p := range l.provider.processors {
if err := p.OnEmit(ctx, newRecord); err != nil {
var err error
if record, err = p.OnEmit(ctx, record); err != nil {
otel.Handle(err)
}
}
Expand Down
4 changes: 3 additions & 1 deletion sdk/log/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ type Processor interface {
//
// Before modifying a Record, the implementation must use Record.Clone
// to create a copy that shares no state with the original.
OnEmit(ctx context.Context, record Record) error
//
// The returned record is passed to the next registered processor.
OnEmit(ctx context.Context, record Record) (Record, error)
// Enabled returns whether the Processor will process for the given context
// and record.
//
Expand Down
6 changes: 3 additions & 3 deletions sdk/log/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ func newProcessor(name string) *processor {
return &processor{Name: name, enabled: true}
}

func (p *processor) OnEmit(ctx context.Context, r Record) error {
func (p *processor) OnEmit(ctx context.Context, r Record) (Record, error) {
if p.Err != nil {
return p.Err
return r, p.Err
}

p.records = append(p.records, r)
return nil
return r, nil
}

func (p *processor) Enabled(context.Context, Record) bool {
Expand Down
4 changes: 2 additions & 2 deletions sdk/log/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func NewSimpleProcessor(exporter Exporter, _ ...SimpleProcessorOption) *SimplePr
}

// OnEmit batches provided log record.
func (s *SimpleProcessor) OnEmit(ctx context.Context, r Record) error {
return s.exporter.Export(ctx, []Record{r})
func (s *SimpleProcessor) OnEmit(ctx context.Context, r Record) (Record, error) {
return r, s.exporter.Export(ctx, []Record{r})
}

// Enabled returns true.
Expand Down
7 changes: 4 additions & 3 deletions sdk/log/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ func TestSimpleProcessorOnEmit(t *testing.T) {

var r log.Record
r.SetSeverityText("test")
_ = s.OnEmit(context.Background(), r)
nextR, _ := s.OnEmit(context.Background(), r)

require.True(t, e.exportCalled, "exporter Export not called")
assert.Equal(t, []log.Record{r}, e.records)
assert.Equal(t, r, nextR)
}

func TestSimpleProcessorEnabled(t *testing.T) {
Expand Down Expand Up @@ -83,7 +84,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) {
go func() {
defer wg.Done()

_ = s.OnEmit(ctx, r)
_, _ = s.OnEmit(ctx, r)
_ = s.Enabled(ctx, r)
_ = s.Shutdown(ctx)
_ = s.ForceFlush(ctx)
Expand All @@ -105,7 +106,7 @@ func BenchmarkSimpleProcessorOnEmit(b *testing.B) {
var out error

for pb.Next() {
out = s.OnEmit(ctx, r)
r, out = s.OnEmit(ctx, r)
}

_ = out
Expand Down
Loading