Skip to content

Commit

Permalink
[pkg/stanza] Switch from SugaredLogger to Logger (#32177)
Browse files Browse the repository at this point in the history
#32662 updated many exported functions to accept
`component.TelemetrySettings` instead of `zap.SugaredLogger`. This PR
continues from there by passing `component.TelemetrySettings` deeper
into the inner packages, and migrates from using `zap.SugaredLogger` to
instead using `zap.Logger` (as provided by
`component.TelemetrySettings`).
  • Loading branch information
djaglowski authored May 16, 2024
1 parent be54222 commit d78d7bb
Show file tree
Hide file tree
Showing 59 changed files with 430 additions and 303 deletions.
27 changes: 27 additions & 0 deletions .chloggen/pkg-stanza-rm-sugared-api.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: The internal logger has been changed from zap.SugaredLogger to zap.Logger.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32177]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Functions accepting a SugaredLogger, and fields of type SugaredLogger, have been deprecated.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
27 changes: 27 additions & 0 deletions .chloggen/pkg-stanza-rm-sugared.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelog, journald, tcp, udp, syslog, windowseventlog receivers

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: The internal logger has been changed from zap.SugaredLogger to zap.Logger.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32177]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This should not have any meaningful impact on most users but the logging format for some logs may have changed.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
14 changes: 8 additions & 6 deletions pkg/stanza/adapter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"

"github.com/cespare/xxhash/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
Expand Down Expand Up @@ -52,6 +53,8 @@ import (
// │ downstream consumers via OutChannel() │
// └─────────────────────────────────────────────────────┘
type Converter struct {
set component.TelemetrySettings

// pLogsChan is a channel on which aggregated logs will be sent to.
pLogsChan chan plog.Logs

Expand All @@ -70,8 +73,6 @@ type Converter struct {
// wg is a WaitGroup that makes sure that we wait for spun up goroutines exit
// when Stop() is called.
wg sync.WaitGroup

logger *zap.Logger
}

type converterOption interface {
Expand All @@ -90,14 +91,15 @@ func (o workerCountOption) apply(c *Converter) {
c.workerCount = o.workerCount
}

func NewConverter(logger *zap.Logger, opts ...converterOption) *Converter {
func NewConverter(set component.TelemetrySettings, opts ...converterOption) *Converter {
set.Logger = set.Logger.With(zap.String("component", "converter"))
c := &Converter{
set: set,
workerChan: make(chan []*entry.Entry),
workerCount: int(math.Max(1, float64(runtime.NumCPU()/4))),
pLogsChan: make(chan plog.Logs),
stopChan: make(chan struct{}),
flushChan: make(chan plog.Logs),
logger: logger,
}
for _, opt := range opts {
opt.apply(c)
Expand All @@ -106,7 +108,7 @@ func NewConverter(logger *zap.Logger, opts ...converterOption) *Converter {
}

func (c *Converter) Start() {
c.logger.Debug("Starting log converter", zap.Int("worker_count", c.workerCount))
c.set.Logger.Debug("Starting log converter", zap.Int("worker_count", c.workerCount))

c.wg.Add(c.workerCount)
for i := 0; i < c.workerCount; i++ {
Expand Down Expand Up @@ -202,7 +204,7 @@ func (c *Converter) flushLoop() {

case pLogs := <-c.flushChan:
if err := c.flush(ctx, pLogs); err != nil {
c.logger.Debug("Problem sending log entries",
c.set.Logger.Debug("Problem sending log entries",
zap.Error(err),
)
}
Expand Down
20 changes: 14 additions & 6 deletions pkg/stanza/adapter/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
)
Expand Down Expand Up @@ -391,7 +392,9 @@ func TestAllConvertedEntriesScopeGrouping(t *testing.T) {
t.Run(strconv.Itoa(i), func(t *testing.T) {
t.Parallel()

converter := NewConverter(zap.NewNop())
set := componenttest.NewNopTelemetrySettings()
set.Logger = zaptest.NewLogger(t)
converter := NewConverter(set)
converter.Start()
defer converter.Stop()

Expand Down Expand Up @@ -458,7 +461,9 @@ func TestAllConvertedEntriesAreSentAndReceived(t *testing.T) {
t.Run(strconv.Itoa(i), func(t *testing.T) {
t.Parallel()

converter := NewConverter(zap.NewNop())
set := componenttest.NewNopTelemetrySettings()
set.Logger = zaptest.NewLogger(t)
converter := NewConverter(set)
converter.Start()
defer converter.Stop()

Expand Down Expand Up @@ -520,7 +525,9 @@ func TestAllConvertedEntriesAreSentAndReceived(t *testing.T) {
}

func TestConverterCancelledContextCancellsTheFlush(t *testing.T) {
converter := NewConverter(zap.NewNop())
set := componenttest.NewNopTelemetrySettings()
set.Logger = zaptest.NewLogger(t)
converter := NewConverter(set)
converter.Start()
defer converter.Stop()
var wg sync.WaitGroup
Expand Down Expand Up @@ -932,8 +939,9 @@ func BenchmarkConverter(b *testing.B) {
for _, wc := range workerCounts {
b.Run(fmt.Sprintf("worker_count=%d", wc), func(b *testing.B) {
for i := 0; i < b.N; i++ {

converter := NewConverter(zap.NewNop(), withWorkerCount(wc))
set := componenttest.NewNopTelemetrySettings()
set.Logger = zaptest.NewLogger(b)
converter := NewConverter(set, withWorkerCount(wc))
converter.Start()
defer converter.Stop()

Expand Down
7 changes: 4 additions & 3 deletions pkg/stanza/adapter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
package adapter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"

import (
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

// Deprecated [v0.100.0] Use helper.LogEmitter directly instead
// Deprecated [v0.101.0] Use helper.LogEmitter directly instead
type LogEmitter = helper.LogEmitter

// Deprecated [v0.100.0] Use helper.NewLogEmitter directly instead
// Deprecated [v0.101.0] Use helper.NewLogEmitter directly instead
func NewLogEmitter(logger *zap.SugaredLogger, opts ...helper.EmitterOption) *LogEmitter {
return helper.NewLogEmitter(logger, opts...)
return helper.NewLogEmitter(component.TelemetrySettings{Logger: logger.Desugar()}, opts...)
}
6 changes: 3 additions & 3 deletions pkg/stanza/adapter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {
if baseCfg.flushInterval > 0 {
emitterOpts = append(emitterOpts, helper.WithFlushInterval(baseCfg.flushInterval))
}
emitter := helper.NewLogEmitter(params.Logger.Sugar(), emitterOpts...)
emitter := helper.NewLogEmitter(params.TelemetrySettings, emitterOpts...)
pipe, err := pipeline.Config{
Operators: operators,
DefaultOutput: emitter,
Expand All @@ -66,7 +66,7 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {
if baseCfg.numWorkers > 0 {
converterOpts = append(converterOpts, withWorkerCount(baseCfg.numWorkers))
}
converter := NewConverter(params.Logger, converterOpts...)
converter := NewConverter(params.TelemetrySettings, converterOpts...)
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: params.ID,
ReceiverCreateSettings: params,
Expand All @@ -75,11 +75,11 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {
return nil, err
}
return &receiver{
set: params.TelemetrySettings,
id: params.ID,
pipe: pipe,
emitter: emitter,
consumer: consumerretry.NewLogs(baseCfg.RetryOnFailure, params.Logger, nextConsumer),
logger: params.Logger,
converter: converter,
obsrecv: obsrecv,
storageID: baseCfg.StorageID,
Expand Down
15 changes: 8 additions & 7 deletions pkg/stanza/adapter/frompdataconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
Expand Down Expand Up @@ -38,6 +39,8 @@ import (
// └─┤ and sends them along entriesChan │
// └───────────────────────────────────────────────────┘
type FromPdataConverter struct {
set component.TelemetrySettings

// entriesChan is a channel on which converted logs will be sent out of the converter.
entriesChan chan []*entry.Entry

Expand All @@ -51,28 +54,26 @@ type FromPdataConverter struct {
// wg is a WaitGroup that makes sure that we wait for spun up goroutines exit
// when Stop() is called.
wg sync.WaitGroup

logger *zap.Logger
}

func NewFromPdataConverter(workerCount int, logger *zap.Logger) *FromPdataConverter {
if logger == nil {
logger = zap.NewNop()
func NewFromPdataConverter(set component.TelemetrySettings, workerCount int) *FromPdataConverter {
if set.Logger == nil {
set.Logger = zap.NewNop()
}
if workerCount <= 0 {
workerCount = int(math.Max(1, float64(runtime.NumCPU())))
}

return &FromPdataConverter{
set: set,
workerChan: make(chan fromConverterWorkerItem, workerCount),
entriesChan: make(chan []*entry.Entry),
stopChan: make(chan struct{}),
logger: logger,
}
}

func (c *FromPdataConverter) Start() {
c.logger.Debug("Starting log converter from pdata", zap.Int("worker_count", cap(c.workerChan)))
c.set.Logger.Debug("Starting log converter from pdata", zap.Int("worker_count", cap(c.workerChan)))

for i := 0; i < cap(c.workerChan); i++ {
c.wg.Add(1)
Expand Down
3 changes: 2 additions & 1 deletion pkg/stanza/adapter/frompdataconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

Expand Down Expand Up @@ -126,7 +127,7 @@ func BenchmarkFromPdataConverter(b *testing.B) {
b.Run(fmt.Sprintf("worker_count=%d", wc), func(b *testing.B) {
for i := 0; i < b.N; i++ {

converter := NewFromPdataConverter(wc, nil)
converter := NewFromPdataConverter(componenttest.NewNopTelemetrySettings(), wc)
converter.Start()
defer converter.Stop()
b.ResetTimer()
Expand Down
8 changes: 4 additions & 4 deletions pkg/stanza/adapter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
)

func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) {
emitter := helper.NewLogEmitter(zap.NewNop().Sugar())

set := componenttest.NewNopTelemetrySettings()
set.Logger = zap.NewNop()
emitter := helper.NewLogEmitter(set)
pipe, err := pipeline.Config{
Operators: []operator.Config{
{
Expand All @@ -48,12 +48,12 @@ func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) {
}

return &receiver{
set: set,
id: component.MustNewID("testReceiver"),
pipe: pipe,
emitter: emitter,
consumer: nextConsumer,
logger: zap.NewNop(),
converter: NewConverter(zap.NewNop()),
converter: NewConverter(componenttest.NewNopTelemetrySettings()),
obsrecv: obsrecv,
}, nil
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/stanza/adapter/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
)

type receiver struct {
set component.TelemetrySettings
id component.ID
wg sync.WaitGroup
cancel context.CancelFunc
Expand All @@ -29,7 +30,6 @@ type receiver struct {
emitter *helper.LogEmitter
consumer consumer.Logs
converter *Converter
logger *zap.Logger
obsrecv *receiverhelper.ObsReport

storageID *component.ID
Expand All @@ -43,7 +43,7 @@ var _ rcvr.Logs = (*receiver)(nil)
func (r *receiver) Start(ctx context.Context, host component.Host) error {
rctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
r.logger.Info("Starting stanza receiver")
r.set.Logger.Info("Starting stanza receiver")

if err := r.setStorageClient(ctx, host); err != nil {
return fmt.Errorf("storage client: %w", err)
Expand Down Expand Up @@ -88,7 +88,7 @@ func (r *receiver) emitterLoop(ctx context.Context) {
for {
select {
case <-doneChan:
r.logger.Debug("Receive loop stopped")
r.set.Logger.Debug("Receive loop stopped")
return

case e, ok := <-r.emitter.OutChannel():
Expand All @@ -97,7 +97,7 @@ func (r *receiver) emitterLoop(ctx context.Context) {
}

if err := r.converter.Batch(e); err != nil {
r.logger.Error("Could not add entry to batch", zap.Error(err))
r.set.Logger.Error("Could not add entry to batch", zap.Error(err))
}
}
}
Expand All @@ -113,19 +113,19 @@ func (r *receiver) consumerLoop(ctx context.Context) {
for {
select {
case <-doneChan:
r.logger.Debug("Consumer loop stopped")
r.set.Logger.Debug("Consumer loop stopped")
return

case pLogs, ok := <-pLogsChan:
if !ok {
r.logger.Debug("Converter channel got closed")
r.set.Logger.Debug("Converter channel got closed")
continue
}
obsrecvCtx := r.obsrecv.StartLogsOp(ctx)
logRecordCount := pLogs.LogRecordCount()
cErr := r.consumer.ConsumeLogs(ctx, pLogs)
if cErr != nil {
r.logger.Error("ConsumeLogs() failed", zap.Error(cErr))
r.set.Logger.Error("ConsumeLogs() failed", zap.Error(cErr))
}
r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr)
}
Expand All @@ -138,7 +138,7 @@ func (r *receiver) Shutdown(ctx context.Context) error {
return nil
}

r.logger.Info("Stopping stanza receiver")
r.set.Logger.Info("Stopping stanza receiver")
pipelineErr := r.pipe.Stop()
r.converter.Stop()
r.cancel()
Expand Down
Loading

0 comments on commit d78d7bb

Please sign in to comment.