diff --git a/component/componenttest/shutdown_verifier.go b/component/componenttest/shutdown_verifier.go new file mode 100644 index 00000000000..b48a06e5979 --- /dev/null +++ b/component/componenttest/shutdown_verifier.go @@ -0,0 +1,79 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package componenttest + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configerror" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/consumer/pdata" +) + +func createSingleSpanTrace() pdata.Traces { + d := pdata.NewTraces() + d.ResourceSpans().Resize(1) + d.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1) + d.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1) + span := d.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0) + span.SetName("test span") + return d +} + +func verifyTraceProcessorDoesntProduceAfterShutdown(t *testing.T, factory component.ProcessorFactory, cfg configmodels.Processor) { + // Create a processor and output its produce to a sink. + nextSink := new(consumertest.TracesSink) + processor, err := factory.CreateTracesProcessor( + context.Background(), + component.ProcessorCreateParams{Logger: zap.NewNop()}, + cfg, + nextSink, + ) + if err != nil { + if err == configerror.ErrDataTypeIsNotSupported { + return + } + require.NoError(t, err) + } + err = processor.Start(context.Background(), NewNopHost()) + assert.NoError(t, err) + + // Send some traces to the processor. + const generatedCount = 10 + for i := 0; i < generatedCount; i++ { + processor.ConsumeTraces(context.Background(), createSingleSpanTrace()) + } + + // Now shutdown the processor. + err = processor.Shutdown(context.Background()) + assert.NoError(t, err) + + // The Shutdown() is done. It means the processor must have sent everything we + // gave it to the next sink. + assert.EqualValues(t, generatedCount, nextSink.SpansCount()) +} + +func VerifyProcessorShutdown(t *testing.T, factory component.ProcessorFactory, cfg configmodels.Processor) { + verifyTraceProcessorDoesntProduceAfterShutdown(t, factory, cfg) + // TODO: add metrics and logs verification. + // TODO: add other shutdown verifications. +} diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 66fbdcf53b8..4374f4837ed 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -108,6 +108,8 @@ func (bp *batchProcessor) Start(context.Context, component.Host) error { // Shutdown is invoked during service shutdown. func (bp *batchProcessor) Shutdown(context.Context) error { bp.cancel() + + // Wait until current batch is drained. <-bp.done return nil } @@ -132,6 +134,7 @@ func (bp *batchProcessor) startProcessingCycle() { // make it cancellable using the context that Shutdown gets as a parameter bp.sendItems(statTimeoutTriggerSend) } + // Indicate that we finished draining. close(bp.done) return case item := <-bp.newItem: diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index b21bbe3dff8..9a6b95ec3c7 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -689,3 +689,8 @@ func logsReceivedByName(lds []pdata.Logs) map[string]pdata.LogRecord { } return logsReceivedByName } + +func TestShutdown(t *testing.T) { + factory := NewFactory() + componenttest.VerifyProcessorShutdown(t, factory, factory.CreateDefaultConfig()) +}