Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[otelcol] Add a custom zapcore.Core for confmap logging #10056

Merged
merged 13 commits into from
May 10, 2024
25 changes: 25 additions & 0 deletions .chloggen/confmap-logger-wrapper-idea.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otelcol

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Enable logging during configuration resolution

# One or more tracking issues or pull requests related to the change
issues: [10056]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
81 changes: 81 additions & 0 deletions otelcol/buffered_core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// This logger implements zapcore.Core and is based on zaptest/observer.

package otelcol // import "go.opentelemetry.io/collector/otelcol"

import (
"fmt"
"sync"

"go.uber.org/zap/zapcore"
)

type loggedEntry struct {
zapcore.Entry
Context []zapcore.Field
}

func newBufferedCore(enab zapcore.LevelEnabler) *bufferedCore {
return &bufferedCore{LevelEnabler: enab}
}

var _ zapcore.Core = (*bufferedCore)(nil)

type bufferedCore struct {
zapcore.LevelEnabler
mu sync.RWMutex
logs []loggedEntry
context []zapcore.Field
logsTaken bool
}

func (bc *bufferedCore) Level() zapcore.Level {
return zapcore.LevelOf(bc.LevelEnabler)
}

func (bc *bufferedCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if bc.Enabled(ent.Level) {
return ce.AddCore(ent, bc)
}
return ce
}

func (bc *bufferedCore) With(fields []zapcore.Field) zapcore.Core {
return &bufferedCore{
LevelEnabler: bc.LevelEnabler,
logs: bc.logs,
logsTaken: bc.logsTaken,
context: append(bc.context, fields...),
}
}

func (bc *bufferedCore) Write(ent zapcore.Entry, fields []zapcore.Field) error {
bc.mu.Lock()
defer bc.mu.Unlock()
if bc.logsTaken {
return fmt.Errorf("the buffered logs have already been taken so writing is no longer supported")
}
all := make([]zapcore.Field, 0, len(fields)+len(bc.context))
all = append(all, bc.context...)
all = append(all, fields...)
bc.logs = append(bc.logs, loggedEntry{ent, all})
return nil
}

func (bc *bufferedCore) Sync() error {
return nil
}

func (bc *bufferedCore) TakeLogs() []loggedEntry {
if !bc.logsTaken {
bc.mu.Lock()
defer bc.mu.Unlock()
logs := bc.logs
bc.logs = nil
bc.logsTaken = true
return logs
}
return nil
}
107 changes: 107 additions & 0 deletions otelcol/buffered_core_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otelcol

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
)

func Test_bufferedCore_Level(t *testing.T) {
bc := newBufferedCore(zapcore.InfoLevel)
assert.Equal(t, zapcore.InfoLevel, bc.Level())
}

func Test_bufferedCore_Check(t *testing.T) {
t.Run("check passed", func(t *testing.T) {
bc := newBufferedCore(zapcore.InfoLevel)
e := zapcore.Entry{
Level: zapcore.InfoLevel,
}
expected := &zapcore.CheckedEntry{}
expected = expected.AddCore(e, bc)
ce := bc.Check(e, nil)
assert.Equal(t, expected, ce)
})

t.Run("check did not pass", func(t *testing.T) {
bc := newBufferedCore(zapcore.InfoLevel)
e := zapcore.Entry{
Level: zapcore.DebugLevel,
}
ce := bc.Check(e, nil)
assert.Nil(t, ce)
})
}

func Test_bufferedCore_With(t *testing.T) {
bc := newBufferedCore(zapcore.InfoLevel)
bc.logsTaken = true
bc.context = []zapcore.Field{
{Key: "original", String: "context"},
}
inputs := []zapcore.Field{
{Key: "test", String: "passed"},
}
expected := []zapcore.Field{
{Key: "original", String: "context"},
{Key: "test", String: "passed"},
}
newBC := bc.With(inputs)
assert.Equal(t, expected, newBC.(*bufferedCore).context)
assert.True(t, newBC.(*bufferedCore).logsTaken)
}

func Test_bufferedCore_Write(t *testing.T) {
bc := newBufferedCore(zapcore.InfoLevel)
e := zapcore.Entry{
Level: zapcore.DebugLevel,
Message: "test",
}
fields := []zapcore.Field{
{Key: "field1", String: "value1"},
}
err := bc.Write(e, fields)
require.NoError(t, err)

expected := loggedEntry{
e,
fields,
}
require.Equal(t, 1, len(bc.logs))
require.Equal(t, expected, bc.logs[0])
}

func Test_bufferedCore_Sync(t *testing.T) {
bc := newBufferedCore(zapcore.InfoLevel)
assert.NoError(t, bc.Sync())
}

func Test_bufferedCore_TakeLogs(t *testing.T) {
bc := newBufferedCore(zapcore.InfoLevel)
e := zapcore.Entry{
Level: zapcore.DebugLevel,
Message: "test",
}
fields := []zapcore.Field{
{Key: "field1", String: "value1"},
}
err := bc.Write(e, fields)
require.NoError(t, err)

expected := []loggedEntry{
{
e,
fields,
},
}
assert.Equal(t, expected, bc.TakeLogs())
assert.Nil(t, bc.logs)

assert.Error(t, bc.Write(e, fields))
assert.Nil(t, bc.TakeLogs())
}
59 changes: 54 additions & 5 deletions otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import (
"context"
"errors"
"fmt"
"os"
"os/signal"
Expand All @@ -15,6 +16,7 @@

"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
Expand Down Expand Up @@ -108,15 +110,20 @@
// signalsChannel is used to receive termination signals from the OS.
signalsChannel chan os.Signal
// asyncErrorChannel is used to signal a fatal error from any component.
asyncErrorChannel chan error
asyncErrorChannel chan error
bc *bufferedCore
updateConfigProviderLogger func(core zapcore.Core)
}

// NewCollector creates and returns a new instance of Collector.
func NewCollector(set CollectorSettings) (*Collector, error) {
var err error
configProvider := set.ConfigProvider

set.ConfigProviderSettings.ResolverSettings.ProviderSettings = confmap.ProviderSettings{Logger: zap.NewNop()}
bc := newBufferedCore(zapcore.DebugLevel)
cc := &collectorCore{core: bc}
options := append([]zap.Option{zap.WithCaller(true)}, set.LoggingOptions...)
set.ConfigProviderSettings.ResolverSettings.ProviderSettings = confmap.ProviderSettings{Logger: zap.New(cc, options...)}
set.ConfigProviderSettings.ResolverSettings.ConverterSettings = confmap.ConverterSettings{}

if configProvider == nil {
Expand All @@ -134,9 +141,11 @@
shutdownChan: make(chan struct{}),
// Per signal.Notify documentation, a size of the channel equaled with
// the number of signals getting notified on is recommended.
signalsChannel: make(chan os.Signal, 3),
asyncErrorChannel: make(chan error),
configProvider: configProvider,
signalsChannel: make(chan os.Signal, 3),
asyncErrorChannel: make(chan error),
configProvider: configProvider,
bc: bc,
updateConfigProviderLogger: cc.SetCore,
}, nil
}

Expand Down Expand Up @@ -202,6 +211,18 @@
if err != nil {
return err
}
if col.updateConfigProviderLogger != nil {
col.updateConfigProviderLogger(col.service.Logger().Core())
}
if col.bc != nil {
x := col.bc.TakeLogs()
for _, log := range x {
ce := col.service.Logger().Core().Check(log.Entry, nil)
if ce != nil {
ce.Write(log.Context...)

Check warning on line 222 in otelcol/collector.go

View check run for this annotation

Codecov / codecov/patch

otelcol/collector.go#L220-L222

Added lines #L220 - L222 were not covered by tests
}
}
}

if !col.set.SkipSettingGRPCLogger {
grpclog.SetLogger(col.service.Logger(), cfg.Service.Telemetry.Logs.Level)
Expand Down Expand Up @@ -243,12 +264,40 @@
return cfg.Validate()
}

func newFallbackLogger(options []zap.Option) (*zap.Logger, error) {
ec := zap.NewProductionEncoderConfig()
ec.EncodeTime = zapcore.ISO8601TimeEncoder
zapCfg := &zap.Config{
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
Encoding: "console",
EncoderConfig: ec,
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
}
return zapCfg.Build(options...)
}

// Run starts the collector according to the given configuration, and waits for it to complete.
// Consecutive calls to Run are not allowed, Run shouldn't be called once a collector is shut down.
// Sets up the control logic for config reloading and shutdown.
func (col *Collector) Run(ctx context.Context) error {
if err := col.setupConfigurationComponents(ctx); err != nil {
col.setCollectorState(StateClosed)
logger, loggerErr := newFallbackLogger(col.set.LoggingOptions)
if loggerErr != nil {
return errors.Join(err, fmt.Errorf("unable to create fallback logger: %w", loggerErr))

Check warning on line 288 in otelcol/collector.go

View check run for this annotation

Codecov / codecov/patch

otelcol/collector.go#L288

Added line #L288 was not covered by tests
}

if col.bc != nil {
x := col.bc.TakeLogs()
for _, log := range x {
ce := logger.Core().Check(log.Entry, nil)
if ce != nil {
ce.Write(log.Context...)

Check warning on line 296 in otelcol/collector.go

View check run for this annotation

Codecov / codecov/patch

otelcol/collector.go#L294-L296

Added lines #L294 - L296 were not covered by tests
}
}
}

return err
}

Expand Down
58 changes: 58 additions & 0 deletions otelcol/collector_core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otelcol // import "go.opentelemetry.io/collector/otelcol"

import (
"sync"

"go.uber.org/zap/zapcore"
)

var _ zapcore.Core = (*collectorCore)(nil)

type collectorCore struct {
core zapcore.Core
rw sync.RWMutex
}

func (c *collectorCore) Enabled(l zapcore.Level) bool {
c.rw.RLock()
defer c.rw.RUnlock()
return c.core.Enabled(l)
}

func (c *collectorCore) With(f []zapcore.Field) zapcore.Core {
c.rw.RLock()
defer c.rw.RUnlock()
return &collectorCore{
core: c.core.With(f),
}
}

func (c *collectorCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
c.rw.RLock()
defer c.rw.RUnlock()
if c.core.Enabled(e.Level) {
return ce.AddCore(e, c)
}
return ce
}

func (c *collectorCore) Write(e zapcore.Entry, f []zapcore.Field) error {
c.rw.RLock()
defer c.rw.RUnlock()
return c.core.Write(e, f)
}

func (c *collectorCore) Sync() error {
c.rw.RLock()
defer c.rw.RUnlock()
return c.core.Sync()
}

func (c *collectorCore) SetCore(core zapcore.Core) {
c.rw.Lock()
defer c.rw.Unlock()
c.core = core
}
Loading
Loading