Skip to content

Commit

Permalink
[pkg/stanza] Clean up emitter shutdown (#35146)
Browse files Browse the repository at this point in the history
Clean up emitter shutdown in stanza. Instead of using contexts for
signaling shutdown, use an idiomatic close channel. We still have a bit
of context usage, but that's required by the operator API, and is only
used for cancelling a given entry, not for component shutdown.

Follow up to
#34638.
  • Loading branch information
swiatekm authored and jriguera committed Oct 4, 2024
1 parent 226f121 commit 48380ba
Showing 1 changed file with 7 additions and 10 deletions.
17 changes: 7 additions & 10 deletions pkg/stanza/operator/helper/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
type LogEmitter struct {
OutputOperator
logChan chan []*entry.Entry
closeChan chan struct{}
stopOnce sync.Once
cancel context.CancelFunc
batchMux sync.Mutex
batch []*entry.Entry
wg sync.WaitGroup
Expand Down Expand Up @@ -66,10 +66,10 @@ func NewLogEmitter(set component.TelemetrySettings, opts ...EmitterOption) *LogE
e := &LogEmitter{
OutputOperator: op,
logChan: make(chan []*entry.Entry),
closeChan: make(chan struct{}),
maxBatchSize: defaultMaxBatchSize,
batch: make([]*entry.Entry, 0, defaultMaxBatchSize),
flushInterval: defaultFlushInterval,
cancel: func() {},
}
for _, opt := range opts {
opt.apply(e)
Expand All @@ -79,18 +79,15 @@ func NewLogEmitter(set component.TelemetrySettings, opts ...EmitterOption) *LogE

// Start starts the goroutine(s) required for this operator
func (e *LogEmitter) Start(_ operator.Persister) error {
ctx, cancel := context.WithCancel(context.Background())
e.cancel = cancel

e.wg.Add(1)
go e.flusher(ctx)
go e.flusher()
return nil
}

// Stop will close the log channel and stop running goroutines
func (e *LogEmitter) Stop() error {
e.stopOnce.Do(func() {
e.cancel()
close(e.closeChan)
e.wg.Wait()

close(e.logChan)
Expand Down Expand Up @@ -135,7 +132,7 @@ func (e *LogEmitter) appendEntry(ent *entry.Entry) []*entry.Entry {
}

// flusher flushes the current batch every flush interval. Intended to be run as a goroutine
func (e *LogEmitter) flusher(ctx context.Context) {
func (e *LogEmitter) flusher() {
defer e.wg.Done()

ticker := time.NewTicker(e.flushInterval)
Expand All @@ -147,9 +144,9 @@ func (e *LogEmitter) flusher(ctx context.Context) {
if oldBatch := e.makeNewBatch(); len(oldBatch) > 0 {
e.flush(context.Background(), oldBatch)
}
case <-ctx.Done():
case <-e.closeChan:
// flush currently batched entries
for oldBatch := e.makeNewBatch(); len(oldBatch) > 0; oldBatch = e.makeNewBatch() {
if oldBatch := e.makeNewBatch(); len(oldBatch) > 0 {
e.flush(context.Background(), oldBatch)
}
return
Expand Down

0 comments on commit 48380ba

Please sign in to comment.