Skip to content

Commit 8e65c05

Browse files
authored
Merge pull request #243 from go-faster/fix/telemetry-leveling
fix(autologs): filter otlp logs by level enabler
2 parents 997b125 + 5cb8444 commit 8e65c05

File tree

4 files changed

+164
-8
lines changed

4 files changed

+164
-8
lines changed

autologs/autologs.go

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.opentelemetry.io/otel/log/noop"
1515
sdklog "go.opentelemetry.io/otel/sdk/log"
1616
"go.uber.org/zap"
17+
"go.uber.org/zap/zapcore"
1718

1819
"github.com/go-faster/sdk/zctx"
1920
)
@@ -58,8 +59,8 @@ type ShutdownFunc func(ctx context.Context) error
5859

5960
// NewLoggerProvider initializes new [log.LoggerProvider] with the given options from environment variables.
6061
func NewLoggerProvider(ctx context.Context, options ...Option) (
61-
meterProvider log.LoggerProvider,
62-
meterShutdown ShutdownFunc,
62+
logProvider log.LoggerProvider,
63+
logShutdown ShutdownFunc,
6364
err error,
6465
) {
6566
cfg := newConfig(options)
@@ -68,11 +69,16 @@ func NewLoggerProvider(ctx context.Context, options ...Option) (
6869
if cfg.res != nil {
6970
logOptions = append(logOptions, sdklog.WithResource(cfg.res))
7071
}
72+
7173
ret := func(e sdklog.Exporter) (log.LoggerProvider, func(ctx context.Context) error, error) {
72-
logOptions = append(logOptions, sdklog.WithProcessor(
73-
sdklog.NewBatchProcessor(e),
74-
))
75-
return sdklog.NewLoggerProvider(logOptions...), e.Shutdown, nil
74+
logOptions = append(logOptions,
75+
sdklog.WithProcessor(&levelFilterProcessor{
76+
next: sdklog.NewBatchProcessor(e),
77+
severity: zapLevelToOTelSeverity(lg.Level()),
78+
}),
79+
)
80+
provider := sdklog.NewLoggerProvider(logOptions...)
81+
return provider, provider.Shutdown, nil
7682
}
7783
exporter := strings.TrimSpace(getEnvOr("OTEL_LOGS_EXPORTER", expOTLP))
7884
switch exporter {
@@ -134,3 +140,57 @@ func NewLoggerProvider(ctx context.Context, options ...Option) (
134140
}
135141
return nil, nil, errors.Errorf("unsupported OTEL_LOGS_EXPORTER %q", exporter)
136142
}
143+
144+
// levelFilterProcessor implements level filtering, since otlplog does not.
145+
//
146+
// Fuck you too, OpenTelemetry.
147+
type levelFilterProcessor struct {
148+
next sdklog.Processor
149+
severity log.Severity
150+
}
151+
152+
var (
153+
_ sdklog.FilterProcessor = (*levelFilterProcessor)(nil)
154+
_ sdklog.Processor = (*levelFilterProcessor)(nil)
155+
)
156+
157+
// Enabled implements [sdklog.FilterProcessor].
158+
func (l *levelFilterProcessor) Enabled(ctx context.Context, param sdklog.EnabledParameters) bool {
159+
return param.Severity >= l.severity
160+
}
161+
162+
// OnEmit implements [sdklog.Processor].
163+
func (l *levelFilterProcessor) OnEmit(ctx context.Context, record *sdklog.Record) error {
164+
return l.next.OnEmit(ctx, record)
165+
}
166+
167+
// ForceFlush implements [sdklog.Processor].
168+
func (l *levelFilterProcessor) ForceFlush(ctx context.Context) error {
169+
return l.next.ForceFlush(ctx)
170+
}
171+
172+
// Shutdown implements [sdklog.Processor].
173+
func (l *levelFilterProcessor) Shutdown(ctx context.Context) error {
174+
return l.next.Shutdown(ctx)
175+
}
176+
177+
func zapLevelToOTelSeverity(level zapcore.Level) log.Severity {
178+
switch level {
179+
case zapcore.DebugLevel:
180+
return log.SeverityDebug
181+
case zapcore.InfoLevel:
182+
return log.SeverityInfo
183+
case zapcore.WarnLevel:
184+
return log.SeverityWarn
185+
case zapcore.ErrorLevel:
186+
return log.SeverityError
187+
case zapcore.DPanicLevel:
188+
return log.SeverityFatal1
189+
case zapcore.PanicLevel:
190+
return log.SeverityFatal2
191+
case zapcore.FatalLevel:
192+
return log.SeverityFatal3
193+
default:
194+
return log.SeverityUndefined
195+
}
196+
}

autologs/autologs_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package autologs_test
2+
3+
import (
4+
"context"
5+
"slices"
6+
"sync"
7+
"sync/atomic"
8+
"testing"
9+
10+
"github.com/go-faster/errors"
11+
"github.com/go-faster/sdk/autologs"
12+
"github.com/go-faster/sdk/zctx"
13+
"github.com/stretchr/testify/require"
14+
"go.opentelemetry.io/contrib/bridges/otelzap"
15+
sdklog "go.opentelemetry.io/otel/sdk/log"
16+
"go.uber.org/zap"
17+
"go.uber.org/zap/zaptest"
18+
)
19+
20+
func TestNewLoggerProviderLevel(t *testing.T) {
21+
ctx := context.Background()
22+
const testExporterName = "amongus"
23+
t.Setenv("OTEL_LOGS_EXPORTER", testExporterName)
24+
25+
baseLogger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))
26+
ctx = zctx.Base(ctx, baseLogger)
27+
28+
exporter := &testLogExporter{}
29+
provider, shutdown, err := autologs.NewLoggerProvider(ctx, autologs.WithLookupExporter(func(ctx context.Context, name string) (sdklog.Exporter, bool, error) {
30+
if name != testExporterName {
31+
return nil, false, errors.Errorf("wrong exporter %q", name)
32+
}
33+
return exporter, true, nil
34+
}))
35+
require.NoError(t, err)
36+
37+
otelCore := otelzap.NewCore("github.com/go-faster/sdk/app",
38+
otelzap.WithLoggerProvider(provider),
39+
)
40+
otelLg := zap.New(otelCore)
41+
otelLg.Debug("hot and lonely GPUs around you")
42+
otelLg.Info("information")
43+
otelLg.Warn("warning")
44+
45+
require.NoError(t, otelLg.Sync())
46+
require.NoError(t, shutdown(ctx))
47+
require.True(t, exporter.shutdown.Load())
48+
49+
var msgs []string
50+
for _, r := range exporter.Records() {
51+
msgs = append(msgs, r.Body().AsString())
52+
}
53+
require.Equal(t,
54+
[]string{
55+
"information",
56+
"warning",
57+
},
58+
msgs,
59+
)
60+
}
61+
62+
type testLogExporter struct {
63+
records []sdklog.Record
64+
recordsMux sync.Mutex
65+
shutdown atomic.Bool
66+
}
67+
68+
var _ sdklog.Exporter = (*testLogExporter)(nil)
69+
70+
func (t *testLogExporter) Records() []sdklog.Record {
71+
t.recordsMux.Lock()
72+
r := slices.Clone(t.records)
73+
t.recordsMux.Unlock()
74+
return r
75+
}
76+
77+
// Export implements [sdklog.Exporter].
78+
func (t *testLogExporter) Export(ctx context.Context, records []sdklog.Record) error {
79+
t.recordsMux.Lock()
80+
t.records = append(t.records, records...)
81+
t.recordsMux.Unlock()
82+
return nil
83+
}
84+
85+
// ForceFlush implements [sdklog.Exporter].
86+
func (t *testLogExporter) ForceFlush(ctx context.Context) error {
87+
return nil
88+
}
89+
90+
// Shutdown implements [sdklog.Exporter].
91+
func (t *testLogExporter) Shutdown(ctx context.Context) error {
92+
t.shutdown.Store(true)
93+
return nil
94+
}

autometer/autometer.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ func NewMeterProvider(ctx context.Context, options ...Option) (
7878

7979
ret := func(r sdkmetric.Reader) (metric.MeterProvider, func(ctx context.Context) error, error) {
8080
metricOptions = append(metricOptions, sdkmetric.WithReader(r))
81-
return sdkmetric.NewMeterProvider(metricOptions...), r.Shutdown, nil
81+
provider := sdkmetric.NewMeterProvider(metricOptions...)
82+
return provider, provider.Shutdown, nil
8283
}
8384

8485
// Metrics exporter.

autotracer/autotracer.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ func NewTracerProvider(ctx context.Context, options ...Option) (
7070
}
7171
ret := func(e sdktrace.SpanExporter) (trace.TracerProvider, func(ctx context.Context) error, error) {
7272
traceOptions = append(traceOptions, sdktrace.WithBatcher(e))
73-
return sdktrace.NewTracerProvider(traceOptions...), e.Shutdown, nil
73+
provider := sdktrace.NewTracerProvider(traceOptions...)
74+
return provider, provider.Shutdown, nil
7475
}
7576

7677
exporter := strings.TrimSpace(getEnvOr("OTEL_TRACES_EXPORTER", expOTLP))

0 commit comments

Comments
 (0)