Skip to content

Commit

Permalink
Fix Shutdown behavior for batchprocessor
Browse files Browse the repository at this point in the history
I added a Shutdown() test that does basic verification of the behavior of the
Shutdown() function. More verifications can be added later.

The test revealed a bug in batchprocessor Shutdown() function which would
hang in Consume* functions after shutdown was called because it was not possible
to send to the channel that the batchprocessor uses.

I will add tests for more components in subsequent PRs. This work is
necessary to ensure Shutdown() works correctly (we will see in future
PRs that we have other bugs that need to be fixed). The test is written
in a generic way that can be used for other components.
  • Loading branch information
tigrannajaryan committed Feb 25, 2021
1 parent 0ed7a4c commit 37ec59b
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 7 deletions.
124 changes: 124 additions & 0 deletions component/componenttest/shutdown_verifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package componenttest

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
)

func createSingleSpanTrace() pdata.Traces {
d := pdata.NewTraces()
d.ResourceSpans().Resize(1)
d.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1)
d.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1)
span := d.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0)
span.SetName("test span")
return d
}

func verifyTraceProcessorDoesntProduceAfterShutdown(t *testing.T, factory component.ProcessorFactory, cfg configmodels.Processor) {
// Create a processor and output its produce to a sink.
nextSink := new(consumertest.TracesSink)
processor, err := factory.CreateTracesProcessor(
context.Background(),
component.ProcessorCreateParams{Logger: zap.NewNop()},
cfg,
nextSink,
)
if err != nil {
if err == configerror.ErrDataTypeIsNotSupported {
return
}
require.NoError(t, err)
}
err = processor.Start(context.Background(), NewNopHost())
assert.NoError(t, err)

doneSignal := make(chan bool)

// Send traces to the processor until we signal via doneSignal, and then continue
// sending some more traces after that.
go generateTraces(processor, doneSignal)

// Wait until the processor outputs anything to the sink.
assert.Eventually(t, func() bool {
return nextSink.SpansCount() > 0
}, time.Second, 1*time.Millisecond)

// Now shutdown the processor.
err = processor.Shutdown(context.Background())
assert.NoError(t, err)

// Remember how many spans the sink received. This number should not change after this
// point because after Shutdown() returns the component is not allowed to produce
// any more data.
sinkSpanCountAfterShutdown := nextSink.SpansCount()

// Now signal to generateTraces to exit the main generation loop, then send
// a number of follow up traces and stop.
doneSignal <- true

// Wait until all follow up traces are sent.
<-doneSignal

// The follow up traces should not be received by sink, so the number of spans in
// the sink should not change.
assert.EqualValues(t, sinkSpanCountAfterShutdown, nextSink.SpansCount())

// Note that sending the follow up traces also helps catch another bug: component's
// ongoing Consume* function never returning once Shutdown() is called.
}

func generateTraces(consumer consumer.TracesConsumer, doneSignal chan bool) {
// Continuously generate spans until signaled to stop.
loop:
for {
select {
case <-doneSignal:
break loop
default:
}
consumer.ConsumeTraces(context.Background(), createSingleSpanTrace())
}

// After getting the signal to stop generate another 1000 spans and then
// finally stop.
const afterDoneSpanCount = 1000
for i := 0; i < afterDoneSpanCount; i++ {
consumer.ConsumeTraces(context.Background(), createSingleSpanTrace())
}

// Indicate that we are done.
close(doneSignal)
}

func VerifyProcessorShutdown(t *testing.T, factory component.ProcessorFactory, cfg configmodels.Processor) {
verifyTraceProcessorDoesntProduceAfterShutdown(t, factory, cfg)
// TODO: add metrics and logs verification.
// TODO: add other shutdown verifications.
}
39 changes: 32 additions & 7 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package batchprocessor

import (
"context"
"fmt"
"runtime"
"sync/atomic"
"time"

"go.opencensus.io/stats"
Expand Down Expand Up @@ -47,10 +49,11 @@ type batchProcessor struct {
timeout time.Duration
sendBatchMaxSize uint32

timer *time.Timer
done chan struct{}
newItem chan interface{}
batch batch
timer *time.Timer
doneCh chan struct{}
shutdownFlag int64 // 0=false, 1=true
newItem chan interface{}
batch batch

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -87,7 +90,7 @@ func newBatchProcessor(params component.ProcessorCreateParams, cfg *Config, batc
sendBatchSize: cfg.SendBatchSize,
sendBatchMaxSize: cfg.SendBatchMaxSize,
timeout: cfg.Timeout,
done: make(chan struct{}, 1),
doneCh: make(chan struct{}, 1),
newItem: make(chan interface{}, runtime.NumCPU()),
batch: batch,
ctx: ctx,
Expand All @@ -108,10 +111,19 @@ func (bp *batchProcessor) Start(context.Context, component.Host) error {
// Shutdown is invoked during service shutdown.
func (bp *batchProcessor) Shutdown(context.Context) error {
bp.cancel()
<-bp.done

// Stop accepting new data.
atomic.StoreInt64(&bp.shutdownFlag, 1)

// Wait until current batch is drained.
<-bp.doneCh
return nil
}

func (bp *batchProcessor) isShutdown() bool {
return atomic.LoadInt64(&bp.shutdownFlag) != 0
}

func (bp *batchProcessor) startProcessingCycle() {
bp.timer = time.NewTimer(bp.timeout)
for {
Expand All @@ -132,7 +144,8 @@ func (bp *batchProcessor) startProcessingCycle() {
// make it cancellable using the context that Shutdown gets as a parameter
bp.sendItems(statTimeoutTriggerSend)
}
close(bp.done)
// Indicate that we finished draining.
close(bp.doneCh)
return
case item := <-bp.newItem:
if item == nil {
Expand Down Expand Up @@ -201,19 +214,31 @@ func (bp *batchProcessor) sendItems(measure *stats.Int64Measure) {

// ConsumeTraces implements TracesProcessor
func (bp *batchProcessor) ConsumeTraces(_ context.Context, td pdata.Traces) error {
if bp.isShutdown() {
return fmt.Errorf("ConsumeTraces called after Shutdown")
}

bp.newItem <- td
return nil
}

// ConsumeTraces implements MetricsProcessor
func (bp *batchProcessor) ConsumeMetrics(_ context.Context, md pdata.Metrics) error {
if bp.isShutdown() {
return fmt.Errorf("ConsumeMetrics called after Shutdown")
}

// First thing is convert into a different internal format
bp.newItem <- md
return nil
}

// ConsumeLogs implements LogsProcessor
func (bp *batchProcessor) ConsumeLogs(_ context.Context, ld pdata.Logs) error {
if bp.isShutdown() {
return fmt.Errorf("ConsumeLogs called after Shutdown")
}

bp.newItem <- ld
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,3 +689,8 @@ func logsReceivedByName(lds []pdata.Logs) map[string]pdata.LogRecord {
}
return logsReceivedByName
}

func TestShutdown(t *testing.T) {
factory := NewFactory()
componenttest.VerifyProcessorShutdown(t, factory, factory.CreateDefaultConfig())
}

0 comments on commit 37ec59b

Please sign in to comment.