Skip to content

Commit

Permalink
Fix Shutdown behavior for batchprocessor
Browse files Browse the repository at this point in the history
I added a Shutdown() test that does basic verification of the behavior of the
Shutdown() function. More verifications can be added later.

The test revealed a bug in batchprocessor Shutdown() function which would
not wait until all pending data was drained.
  • Loading branch information
tigrannajaryan committed Mar 1, 2021
1 parent 0ed7a4c commit c980f54
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 4 deletions.
79 changes: 79 additions & 0 deletions component/componenttest/shutdown_verifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package componenttest

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
)

func createSingleSpanTrace() pdata.Traces {
d := pdata.NewTraces()
d.ResourceSpans().Resize(1)
d.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1)
d.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1)
span := d.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0)
span.SetName("test span")
return d
}

func verifyTraceProcessorDoesntProduceAfterShutdown(t *testing.T, factory component.ProcessorFactory, cfg configmodels.Processor) {
// Create a processor and output its produce to a sink.
nextSink := new(consumertest.TracesSink)
processor, err := factory.CreateTracesProcessor(
context.Background(),
component.ProcessorCreateParams{Logger: zap.NewNop()},
cfg,
nextSink,
)
if err != nil {
if err == configerror.ErrDataTypeIsNotSupported {
return
}
require.NoError(t, err)
}
err = processor.Start(context.Background(), NewNopHost())
assert.NoError(t, err)

// Send some traces to the processor.
const generatedCount = 10
for i := 0; i < generatedCount; i++ {
processor.ConsumeTraces(context.Background(), createSingleSpanTrace())
}

// Now shutdown the processor.
err = processor.Shutdown(context.Background())
assert.NoError(t, err)

// The Shutdown() is done. It means the processor must have sent everything we
// gave it to the next sink.
assert.EqualValues(t, generatedCount, nextSink.SpansCount())
}

func VerifyProcessorShutdown(t *testing.T, factory component.ProcessorFactory, cfg configmodels.Processor) {
verifyTraceProcessorDoesntProduceAfterShutdown(t, factory, cfg)
// TODO: add metrics and logs verification.
// TODO: add other shutdown verifications.
}
11 changes: 7 additions & 4 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type batchProcessor struct {
sendBatchMaxSize uint32

timer *time.Timer
done chan struct{}
doneCh chan struct{}
newItem chan interface{}
batch batch

Expand Down Expand Up @@ -87,7 +87,7 @@ func newBatchProcessor(params component.ProcessorCreateParams, cfg *Config, batc
sendBatchSize: cfg.SendBatchSize,
sendBatchMaxSize: cfg.SendBatchMaxSize,
timeout: cfg.Timeout,
done: make(chan struct{}, 1),
doneCh: make(chan struct{}, 1),
newItem: make(chan interface{}, runtime.NumCPU()),
batch: batch,
ctx: ctx,
Expand All @@ -108,7 +108,9 @@ func (bp *batchProcessor) Start(context.Context, component.Host) error {
// Shutdown is invoked during service shutdown.
func (bp *batchProcessor) Shutdown(context.Context) error {
bp.cancel()
<-bp.done

// Wait until current batch is drained.
<-bp.doneCh
return nil
}

Expand All @@ -132,7 +134,8 @@ func (bp *batchProcessor) startProcessingCycle() {
// make it cancellable using the context that Shutdown gets as a parameter
bp.sendItems(statTimeoutTriggerSend)
}
close(bp.done)
// Indicate that we finished draining.
close(bp.doneCh)
return
case item := <-bp.newItem:
if item == nil {
Expand Down
5 changes: 5 additions & 0 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,3 +689,8 @@ func logsReceivedByName(lds []pdata.Logs) map[string]pdata.LogRecord {
}
return logsReceivedByName
}

func TestShutdown(t *testing.T) {
factory := NewFactory()
componenttest.VerifyProcessorShutdown(t, factory, factory.CreateDefaultConfig())
}

0 comments on commit c980f54

Please sign in to comment.