diff --git a/service/internal/builder/exporters_builder_test.go b/service/internal/builder/exporters_builder_test.go index ad9d036970c..64ecd0f7bbd 100644 --- a/service/internal/builder/exporters_builder_test.go +++ b/service/internal/builder/exporters_builder_test.go @@ -16,7 +16,6 @@ package builder import ( "context" - "path/filepath" "testing" "github.com/stretchr/testify/assert" @@ -26,7 +25,6 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/internal/testcomponents" - "go.opentelemetry.io/collector/service/servicetest" ) func TestBuildExporters(t *testing.T) { @@ -114,32 +112,3 @@ func TestBuildExportersStartStopAll(t *testing.T) { assert.True(t, metricExporter.Stopped) assert.True(t, logsExporter.Stopped) } - -func TestBuildExportersNotSupportedDataType(t *testing.T) { - factories := createTestFactories() - - tests := []struct { - configFile string - }{ - { - configFile: "not_supported_exporter_logs.yaml", - }, - { - configFile: "not_supported_exporter_metrics.yaml", - }, - { - configFile: "not_supported_exporter_traces.yaml", - }, - } - - for _, test := range tests { - t.Run(test.configFile, func(t *testing.T) { - - cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", test.configFile), factories) - require.Nil(t, err) - - _, err = BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - assert.Error(t, err) - }) - } -} diff --git a/service/internal/builder/pipelines_builder.go b/service/internal/builder/pipelines_builder.go index 014bfbb7ad1..22fad0a23d5 100644 --- a/service/internal/builder/pipelines_builder.go +++ b/service/internal/builder/pipelines_builder.go @@ -43,7 +43,7 @@ type builtPipeline struct { // can mutate the TraceData or MetricsData input argument. MutatesData bool - processors []component.Processor + Processors []component.Processor } // BuiltPipelines is a map of build pipelines created from pipeline configs. @@ -57,8 +57,8 @@ func (bps BuiltPipelines) StartProcessors(ctx context.Context, host component.Ho // This is important so that processors that are earlier in the pipeline and // reference processors that are later in the pipeline do not start sending // data to later pipelines which are not yet started. - for i := len(bp.processors) - 1; i >= 0; i-- { - if err := bp.processors[i].Start(ctx, hostWrapper); err != nil { + for i := len(bp.Processors) - 1; i >= 0; i-- { + if err := bp.Processors[i].Start(ctx, hostWrapper); err != nil { return err } } @@ -71,7 +71,7 @@ func (bps BuiltPipelines) ShutdownProcessors(ctx context.Context) error { var errs error for _, bp := range bps { bp.logger.Info("Pipeline is shutting down...") - for _, p := range bp.processors { + for _, p := range bp.Processors { errs = multierr.Append(errs, p.Shutdown(ctx)) } bp.logger.Info("Pipeline is shutdown.") @@ -243,7 +243,7 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineID config firstLC: lc, Config: pipelineCfg, MutatesData: mutatesConsumedData, - processors: processors, + Processors: processors, } return bp, nil diff --git a/service/internal/builder/pipelines_builder_test.go b/service/internal/builder/pipelines_builder_test.go index f270f04dc5d..596ab96454d 100644 --- a/service/internal/builder/pipelines_builder_test.go +++ b/service/internal/builder/pipelines_builder_test.go @@ -228,36 +228,3 @@ func testPipeline(t *testing.T, pipelineID config.ComponentID, exporterIDs []con err = pipelineProcessors.ShutdownProcessors(context.Background()) assert.NoError(t, err) } - -func TestBuildPipelines_NotSupportedDataType(t *testing.T) { - factories := createTestFactories() - - tests := []struct { - configFile string - }{ - { - configFile: "not_supported_processor_logs.yaml", - }, - { - configFile: "not_supported_processor_metrics.yaml", - }, - { - configFile: "not_supported_processor_traces.yaml", - }, - } - - for _, test := range tests { - t.Run(test.configFile, func(t *testing.T) { - - cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", test.configFile), factories) - require.Nil(t, err) - - allExporters, err := BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - assert.NoError(t, err) - - pipelineProcessors, err := BuildPipelines(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, allExporters, factories.Processors) - assert.Error(t, err) - assert.Zero(t, len(pipelineProcessors)) - }) - } -} diff --git a/service/internal/builder/receivers_builder.go b/service/internal/builder/receivers_builder.go index f73d2be3e48..4db4cab8665 100644 --- a/service/internal/builder/receivers_builder.go +++ b/service/internal/builder/receivers_builder.go @@ -35,17 +35,17 @@ var errUnusedReceiver = errors.New("receiver defined but not used by any pipelin // a trace and/or a metrics component. type builtReceiver struct { logger *zap.Logger - receiver component.Receiver + Receiver component.Receiver } // Start starts the receiver. func (rcv *builtReceiver) Start(ctx context.Context, host component.Host) error { - return rcv.receiver.Start(ctx, components.NewHostWrapper(host, rcv.logger)) + return rcv.Receiver.Start(ctx, components.NewHostWrapper(host, rcv.logger)) } // Shutdown stops the receiver. func (rcv *builtReceiver) Shutdown(ctx context.Context) error { - return rcv.receiver.Shutdown(ctx) + return rcv.Receiver.Shutdown(ctx) } // Receivers is a map of receivers created from receiver configs. @@ -207,11 +207,11 @@ func attachReceiverToPipelines( return fmt.Errorf("factory for %v produced a nil receiver", id) } - if rcv.receiver != nil { + if rcv.Receiver != nil { // The receiver was previously created for this config. This can happen if the // same receiver type supports more than one data type. In that case we expect // that CreateTracesReceiver and CreateMetricsReceiver return the same value. - if rcv.receiver != createdReceiver { + if rcv.Receiver != createdReceiver { return fmt.Errorf( "factory for %q is implemented incorrectly: "+ "CreateTracesReceiver, CreateMetricsReceiver and CreateLogsReceiver must return "+ @@ -220,7 +220,7 @@ func attachReceiverToPipelines( ) } } - rcv.receiver = createdReceiver + rcv.Receiver = createdReceiver set.Logger.Info("Receiver was built.", zap.String("datatype", string(dataType))) @@ -258,7 +258,7 @@ func (rb *receiversBuilder) buildReceiver(ctx context.Context, set component.Rec } } - if rcv.receiver == nil { + if rcv.Receiver == nil { return nil, errUnusedReceiver } diff --git a/service/internal/builder/receivers_builder_test.go b/service/internal/builder/receivers_builder_test.go index 157d56c0405..21eed8b4867 100644 --- a/service/internal/builder/receivers_builder_test.go +++ b/service/internal/builder/receivers_builder_test.go @@ -108,7 +108,7 @@ func testReceivers(t *testing.T, test testCase) { // Ensure receiver has its fields correctly populated. require.NotNil(t, receiver) - assert.NotNil(t, receiver.receiver) + assert.NotNil(t, receiver.Receiver) // Compose the list of created exporters. var exporters []component.Exporter @@ -130,13 +130,13 @@ func testReceivers(t *testing.T, test testCase) { td := testdata.GenerateTracesOneSpan() if test.hasTraces { - traceProducer := receiver.receiver.(*testcomponents.ExampleReceiver) + traceProducer := receiver.Receiver.(*testcomponents.ExampleReceiver) assert.NoError(t, traceProducer.ConsumeTraces(context.Background(), td)) } md := testdata.GenerateMetricsOneMetric() if test.hasMetrics { - metricsProducer := receiver.receiver.(*testcomponents.ExampleReceiver) + metricsProducer := receiver.Receiver.(*testcomponents.ExampleReceiver) assert.NoError(t, metricsProducer.ConsumeMetrics(context.Background(), md)) } @@ -209,7 +209,7 @@ func TestBuildReceiversBuildCustom(t *testing.T) { // Ensure receiver has its fields correctly populated. require.NotNil(t, receiver) - assert.NotNil(t, receiver.receiver) + assert.NotNil(t, receiver.Receiver) // Compose the list of created exporters. exporterIDs := []config.ComponentID{config.NewComponentID("exampleexporter")} @@ -231,7 +231,7 @@ func TestBuildReceiversBuildCustom(t *testing.T) { // Send one data. log := plog.Logs{} - producer := receiver.receiver.(*testcomponents.ExampleReceiver) + producer := receiver.Receiver.(*testcomponents.ExampleReceiver) require.NoError(t, producer.ConsumeLogs(context.Background(), log)) // Now verify received data. @@ -251,7 +251,7 @@ func TestBuildReceivers_StartAll(t *testing.T) { receivers[config.NewComponentID("example")] = &builtReceiver{ logger: zap.NewNop(), - receiver: receiver, + Receiver: receiver, } assert.False(t, receiver.Started) @@ -282,40 +282,3 @@ func TestBuildReceivers_Unused(t *testing.T) { assert.NoError(t, receivers.StartAll(context.Background(), componenttest.NewNopHost())) assert.NoError(t, receivers.ShutdownAll(context.Background())) } - -func TestBuildReceivers_NotSupportedDataType(t *testing.T) { - factories := createTestFactories() - - tests := []struct { - configFile string - }{ - { - configFile: "not_supported_receiver_logs.yaml", - }, - { - configFile: "not_supported_receiver_metrics.yaml", - }, - { - configFile: "not_supported_receiver_traces.yaml", - }, - } - - for _, test := range tests { - t.Run(test.configFile, func(t *testing.T) { - - cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", test.configFile), factories) - assert.NoError(t, err) - require.NotNil(t, cfg) - - allExporters, err := BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - assert.NoError(t, err) - - pipelineProcessors, err := BuildPipelines(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, allExporters, factories.Processors) - assert.NoError(t, err) - - receivers, err := BuildReceivers(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, pipelineProcessors, factories.Receivers) - assert.Error(t, err) - assert.Zero(t, len(receivers)) - }) - } -} diff --git a/service/internal/builder/testdata/not_supported_processor_logs.yaml b/service/internal/builder/testdata/not_supported_processor_logs.yaml deleted file mode 100644 index bf8ce83017e..00000000000 --- a/service/internal/builder/testdata/not_supported_processor_logs.yaml +++ /dev/null @@ -1,13 +0,0 @@ -receivers: - examplereceiver: -processors: - bf: -exporters: - exampleexporter: - -service: - pipelines: - logs: - receivers: [examplereceiver] - processors: [bf] - exporters: [exampleexporter] diff --git a/service/internal/builder/testdata/not_supported_processor_metrics.yaml b/service/internal/builder/testdata/not_supported_processor_metrics.yaml deleted file mode 100644 index 38d0cd79a14..00000000000 --- a/service/internal/builder/testdata/not_supported_processor_metrics.yaml +++ /dev/null @@ -1,13 +0,0 @@ -receivers: - examplereceiver: -processors: - bf: -exporters: - exampleexporter: - -service: - pipelines: - metrics: - receivers: [examplereceiver] - processors: [bf] - exporters: [exampleexporter] diff --git a/service/internal/builder/testdata/not_supported_processor_traces.yaml b/service/internal/builder/testdata/not_supported_processor_traces.yaml deleted file mode 100644 index d9c931a2d7b..00000000000 --- a/service/internal/builder/testdata/not_supported_processor_traces.yaml +++ /dev/null @@ -1,13 +0,0 @@ -receivers: - examplereceiver: -processors: - bf: -exporters: - exampleexporter: - -service: - pipelines: - traces: - receivers: [examplereceiver] - processors: [bf] - exporters: [exampleexporter] diff --git a/service/internal/pipelines/pipelines_test.go b/service/internal/pipelines/pipelines_test.go new file mode 100644 index 00000000000..855d21827c8 --- /dev/null +++ b/service/internal/pipelines/pipelines_test.go @@ -0,0 +1,292 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipelines + +import ( + "context" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/internal/testcomponents" + "go.opentelemetry.io/collector/internal/testdata" + "go.opentelemetry.io/collector/service/servicetest" +) + +func TestBuild(t *testing.T) { + tests := []struct { + name string + receiverIDs []config.ComponentID + processorIDs []config.ComponentID + exporterIDs []config.ComponentID + expectedRequests int + }{ + { + name: "pipelines_simple.yaml", + receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver")}, + processorIDs: []config.ComponentID{config.NewComponentID("exampleprocessor")}, + exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter")}, + expectedRequests: 1, + }, + { + name: "pipelines_simple_multi_proc.yaml", + receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver")}, + processorIDs: []config.ComponentID{config.NewComponentID("exampleprocessor"), config.NewComponentID("exampleprocessor")}, + exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter")}, + expectedRequests: 1, + }, + { + name: "pipelines_simple_no_proc.yaml", + receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver")}, + exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter")}, + expectedRequests: 1, + }, + { + name: "pipelines_multi.yaml", + receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver"), config.NewComponentIDWithName("examplereceiver", "1")}, + processorIDs: []config.ComponentID{config.NewComponentID("exampleprocessor"), config.NewComponentIDWithName("exampleprocessor", "1")}, + exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter"), config.NewComponentIDWithName("exampleexporter", "1")}, + expectedRequests: 2, + }, + { + name: "pipelines_multi_no_proc.yaml", + receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver"), config.NewComponentIDWithName("examplereceiver", "1")}, + exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter"), config.NewComponentIDWithName("exampleexporter", "1")}, + expectedRequests: 2, + }, + { + name: "pipelines_exporter_multi_pipeline.yaml", + receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver")}, + exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter")}, + expectedRequests: 2, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + factories, err := testcomponents.ExampleComponents() + assert.NoError(t, err) + + cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", test.name), factories) + require.NoError(t, err) + + // Build the pipeline + pipelines, err := Build(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories) + assert.NoError(t, err) + + assert.NoError(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + + // Verify exporters created, started and empty. + for _, expID := range test.exporterIDs { + traceExporter := pipelines.GetExporters()[config.TracesDataType][expID].(*testcomponents.ExampleExporter) + assert.True(t, traceExporter.Started) + assert.Equal(t, len(traceExporter.Traces), 0) + + // Validate metrics. + metricsExporter := pipelines.GetExporters()[config.MetricsDataType][expID].(*testcomponents.ExampleExporter) + assert.True(t, metricsExporter.Started) + assert.Zero(t, len(metricsExporter.Traces)) + + // Validate logs. + logsExporter := pipelines.GetExporters()[config.LogsDataType][expID].(*testcomponents.ExampleExporter) + assert.True(t, logsExporter.Started) + assert.Zero(t, len(logsExporter.Traces)) + } + + // Verify processors created in the given order and started. + for i := range test.processorIDs { + traceProcessor := pipelines.pipelines[config.NewComponentID(config.TracesDataType)].Processors[i] + assert.True(t, traceProcessor.(*testcomponents.ExampleProcessor).Started) + + // Validate metrics. + metricsProcessor := pipelines.pipelines[config.NewComponentID(config.MetricsDataType)].Processors[i] + assert.True(t, metricsProcessor.(*testcomponents.ExampleProcessor).Started) + + // Validate logs. + logsProcessor := pipelines.pipelines[config.NewComponentID(config.LogsDataType)].Processors[i] + assert.True(t, logsProcessor.(*testcomponents.ExampleProcessor).Started) + } + + // Verify receivers created, started and send data to confirm pipelines correctly connected. + for _, recvID := range test.receiverIDs { + traceReceiver := pipelines.receivers[recvID].Receiver.(*testcomponents.ExampleReceiver) + assert.True(t, traceReceiver.Started) + // Send traces. + assert.NoError(t, traceReceiver.ConsumeTraces(context.Background(), testdata.GenerateTracesOneSpan())) + + metricsReceiver := pipelines.receivers[recvID].Receiver.(*testcomponents.ExampleReceiver) + assert.True(t, metricsReceiver.Started) + // Send metrics. + assert.NoError(t, metricsReceiver.ConsumeMetrics(context.Background(), testdata.GenerateMetricsOneMetric())) + + logsReceiver := pipelines.receivers[recvID].Receiver.(*testcomponents.ExampleReceiver) + assert.True(t, logsReceiver.Started) + // Send logs. + assert.NoError(t, logsReceiver.ConsumeLogs(context.Background(), testdata.GenerateLogsOneLogRecord())) + } + + assert.NoError(t, pipelines.ShutdownAll(context.Background())) + + // Verify receivers shutdown. + for _, recvID := range test.receiverIDs { + traceReceiver := pipelines.receivers[recvID].Receiver.(*testcomponents.ExampleReceiver) + assert.True(t, traceReceiver.Stopped) + + metricsReceiver := pipelines.receivers[recvID].Receiver.(*testcomponents.ExampleReceiver) + assert.True(t, metricsReceiver.Stopped) + + logsReceiver := pipelines.receivers[recvID].Receiver.(*testcomponents.ExampleReceiver) + assert.True(t, logsReceiver.Stopped) + } + + // Verify processors shutdown. + for i := range test.processorIDs { + traceProcessor := pipelines.pipelines[config.NewComponentID(config.TracesDataType)].Processors[i] + assert.True(t, traceProcessor.(*testcomponents.ExampleProcessor).Stopped) + + // Validate metrics. + metricsProcessor := pipelines.pipelines[config.NewComponentID(config.MetricsDataType)].Processors[i] + assert.True(t, metricsProcessor.(*testcomponents.ExampleProcessor).Stopped) + + // Validate logs. + logsProcessor := pipelines.pipelines[config.NewComponentID(config.LogsDataType)].Processors[i] + assert.True(t, logsProcessor.(*testcomponents.ExampleProcessor).Stopped) + } + + // Now verify that exporters received data, and are shutdown. + for _, expID := range test.exporterIDs { + // Validate traces. + traceExporter := pipelines.GetExporters()[config.TracesDataType][expID].(*testcomponents.ExampleExporter) + require.Len(t, traceExporter.Traces, test.expectedRequests) + assert.EqualValues(t, testdata.GenerateTracesOneSpan(), traceExporter.Traces[0]) + assert.True(t, traceExporter.Stopped) + + // Validate metrics. + metricsExporter := pipelines.GetExporters()[config.MetricsDataType][expID].(*testcomponents.ExampleExporter) + require.Len(t, metricsExporter.Metrics, test.expectedRequests) + assert.EqualValues(t, testdata.GenerateMetricsOneMetric(), metricsExporter.Metrics[0]) + assert.True(t, metricsExporter.Stopped) + + // Validate logs. + logsExporter := pipelines.GetExporters()[config.LogsDataType][expID].(*testcomponents.ExampleExporter) + require.Len(t, logsExporter.Logs, test.expectedRequests) + assert.EqualValues(t, testdata.GenerateLogsOneLogRecord(), logsExporter.Logs[0]) + assert.True(t, logsExporter.Stopped) + } + }) + } +} + +func TestBuildExportersNotSupportedDataType(t *testing.T) { + nopReceiverFactory := componenttest.NewNopReceiverFactory() + nopProcessorFactory := componenttest.NewNopProcessorFactory() + nopExporterFactory := componenttest.NewNopExporterFactory() + badReceiverFactory := newBadReceiverFactory() + badProcessorFactory := newBadProcessorFactory() + badExporterFactory := newBadExporterFactory() + + tests := []struct { + configFile string + }{ + {configFile: "not_supported_exporter_logs.yaml"}, + {configFile: "not_supported_exporter_metrics.yaml"}, + {configFile: "not_supported_exporter_traces.yaml"}, + {configFile: "not_supported_processor_logs.yaml"}, + {configFile: "not_supported_processor_metrics.yaml"}, + {configFile: "not_supported_processor_traces.yaml"}, + {configFile: "not_supported_receiver_traces.yaml"}, + {configFile: "not_supported_receiver_metrics.yaml"}, + {configFile: "not_supported_receiver_traces.yaml"}, + {configFile: "unknown_exporter_config.yaml"}, + {configFile: "unknown_exporter_factory.yaml"}, + {configFile: "unknown_processor_config.yaml"}, + {configFile: "unknown_processor_factory.yaml"}, + {configFile: "unknown_receiver_config.yaml"}, + {configFile: "unknown_receiver_factory.yaml"}, + } + + for _, test := range tests { + t.Run(test.configFile, func(t *testing.T) { + if test.configFile == "unknown_receiver_config.yaml" { + t.Skip("This is a small issue with current implementation which will be fixed in #5512." + + "In real binary this will not be hit, since the validation of the config will catch this issue in advance.") + } + factories := component.Factories{ + Receivers: map[config.Type]component.ReceiverFactory{ + nopReceiverFactory.Type(): nopReceiverFactory, + "unknown": nopReceiverFactory, + badReceiverFactory.Type(): badReceiverFactory, + }, + Processors: map[config.Type]component.ProcessorFactory{ + nopProcessorFactory.Type(): nopProcessorFactory, + "unknown": nopProcessorFactory, + badProcessorFactory.Type(): badProcessorFactory, + }, + Exporters: map[config.Type]component.ExporterFactory{ + nopExporterFactory.Type(): nopExporterFactory, + "unknown": nopExporterFactory, + badExporterFactory.Type(): badExporterFactory, + }, + } + + // Need the unknown factories to do unmarshalling. + cfg, err := servicetest.LoadConfig(filepath.Join("testdata", test.configFile), factories) + require.NoError(t, err) + + // Remove the unknown factories, so they are NOT available during building. + delete(factories.Exporters, "unknown") + delete(factories.Processors, "unknown") + delete(factories.Receivers, "unknown") + + _, err = Build(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories) + assert.Error(t, err) + }) + } +} + +func newBadReceiverFactory() component.ReceiverFactory { + return component.NewReceiverFactory("bf", func() config.Receiver { + return &struct { + config.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + }{ + ReceiverSettings: config.NewReceiverSettings(config.NewComponentID("bf")), + } + }) +} + +func newBadProcessorFactory() component.ProcessorFactory { + return component.NewProcessorFactory("bf", func() config.Processor { + return &struct { + config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + }{ + ProcessorSettings: config.NewProcessorSettings(config.NewComponentID("bf")), + } + }) +} + +func newBadExporterFactory() component.ExporterFactory { + return component.NewExporterFactory("bf", func() config.Exporter { + return &struct { + config.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + }{ + ExporterSettings: config.NewExporterSettings(config.NewComponentID("bf")), + } + }) +} diff --git a/service/internal/builder/testdata/not_supported_exporter_logs.yaml b/service/internal/pipelines/testdata/not_supported_exporter_logs.yaml similarity index 60% rename from service/internal/builder/testdata/not_supported_exporter_logs.yaml rename to service/internal/pipelines/testdata/not_supported_exporter_logs.yaml index 5dcb27600fe..2907aabeb5f 100644 --- a/service/internal/builder/testdata/not_supported_exporter_logs.yaml +++ b/service/internal/pipelines/testdata/not_supported_exporter_logs.yaml @@ -1,10 +1,10 @@ receivers: - examplereceiver: + nop: exporters: bf: service: pipelines: logs: - receivers: [examplereceiver] + receivers: [nop] exporters: [bf] diff --git a/service/internal/builder/testdata/not_supported_exporter_metrics.yaml b/service/internal/pipelines/testdata/not_supported_exporter_metrics.yaml similarity index 61% rename from service/internal/builder/testdata/not_supported_exporter_metrics.yaml rename to service/internal/pipelines/testdata/not_supported_exporter_metrics.yaml index c46e8b5c40d..962aad9ef81 100644 --- a/service/internal/builder/testdata/not_supported_exporter_metrics.yaml +++ b/service/internal/pipelines/testdata/not_supported_exporter_metrics.yaml @@ -1,10 +1,10 @@ receivers: - examplereceiver: + nop: exporters: bf: service: pipelines: metrics: - receivers: [examplereceiver] + receivers: [nop] exporters: [bf] diff --git a/service/internal/builder/testdata/not_supported_exporter_traces.yaml b/service/internal/pipelines/testdata/not_supported_exporter_traces.yaml similarity index 61% rename from service/internal/builder/testdata/not_supported_exporter_traces.yaml rename to service/internal/pipelines/testdata/not_supported_exporter_traces.yaml index b1b4283e8ee..88aaba0a214 100644 --- a/service/internal/builder/testdata/not_supported_exporter_traces.yaml +++ b/service/internal/pipelines/testdata/not_supported_exporter_traces.yaml @@ -1,10 +1,10 @@ receivers: - examplereceiver: + nop: exporters: bf: service: pipelines: traces: - receivers: [examplereceiver] + receivers: [nop] exporters: [bf] diff --git a/service/internal/pipelines/testdata/not_supported_processor_logs.yaml b/service/internal/pipelines/testdata/not_supported_processor_logs.yaml new file mode 100644 index 00000000000..49636decb7b --- /dev/null +++ b/service/internal/pipelines/testdata/not_supported_processor_logs.yaml @@ -0,0 +1,13 @@ +receivers: + nop: +processors: + bf: +exporters: + nop: + +service: + pipelines: + logs: + receivers: [nop] + processors: [bf] + exporters: [nop] diff --git a/service/internal/pipelines/testdata/not_supported_processor_metrics.yaml b/service/internal/pipelines/testdata/not_supported_processor_metrics.yaml new file mode 100644 index 00000000000..bb62de57e18 --- /dev/null +++ b/service/internal/pipelines/testdata/not_supported_processor_metrics.yaml @@ -0,0 +1,13 @@ +receivers: + nop: +processors: + bf: +exporters: + nop: + +service: + pipelines: + metrics: + receivers: [nop] + processors: [bf] + exporters: [nop] diff --git a/service/internal/pipelines/testdata/not_supported_processor_traces.yaml b/service/internal/pipelines/testdata/not_supported_processor_traces.yaml new file mode 100644 index 00000000000..79baed24a29 --- /dev/null +++ b/service/internal/pipelines/testdata/not_supported_processor_traces.yaml @@ -0,0 +1,13 @@ +receivers: + nop: +processors: + bf: +exporters: + nop: + +service: + pipelines: + traces: + receivers: [nop] + processors: [bf] + exporters: [nop] diff --git a/service/internal/builder/testdata/not_supported_receiver_logs.yaml b/service/internal/pipelines/testdata/not_supported_receiver_logs.yaml similarity index 68% rename from service/internal/builder/testdata/not_supported_receiver_logs.yaml rename to service/internal/pipelines/testdata/not_supported_receiver_logs.yaml index e4ac06f49b8..f59d862f0ae 100644 --- a/service/internal/builder/testdata/not_supported_receiver_logs.yaml +++ b/service/internal/pipelines/testdata/not_supported_receiver_logs.yaml @@ -1,10 +1,10 @@ receivers: bf: # this is the bad receiver factory exporters: - exampleexporter: + nop: service: pipelines: logs: receivers: [bf] - exporters: [exampleexporter] + exporters: [nop] diff --git a/service/internal/builder/testdata/not_supported_receiver_metrics.yaml b/service/internal/pipelines/testdata/not_supported_receiver_metrics.yaml similarity index 69% rename from service/internal/builder/testdata/not_supported_receiver_metrics.yaml rename to service/internal/pipelines/testdata/not_supported_receiver_metrics.yaml index e73eb326c00..20edc1d388c 100644 --- a/service/internal/builder/testdata/not_supported_receiver_metrics.yaml +++ b/service/internal/pipelines/testdata/not_supported_receiver_metrics.yaml @@ -1,10 +1,10 @@ receivers: bf: # this is the bad receiver factory exporters: - exampleexporter: + nop: service: pipelines: metrics: receivers: [bf] - exporters: [exampleexporter] + exporters: [nop] diff --git a/service/internal/builder/testdata/not_supported_receiver_traces.yaml b/service/internal/pipelines/testdata/not_supported_receiver_traces.yaml similarity index 68% rename from service/internal/builder/testdata/not_supported_receiver_traces.yaml rename to service/internal/pipelines/testdata/not_supported_receiver_traces.yaml index 1359d79578c..ec29c48b204 100644 --- a/service/internal/builder/testdata/not_supported_receiver_traces.yaml +++ b/service/internal/pipelines/testdata/not_supported_receiver_traces.yaml @@ -1,10 +1,10 @@ receivers: bf: # this is the bad receiver factory exporters: - exampleexporter: + nop: service: pipelines: traces: receivers: [bf] - exporters: [exampleexporter] + exporters: [nop] diff --git a/service/internal/pipelines/testdata/pipelines_exporter_multi_pipeline.yaml b/service/internal/pipelines/testdata/pipelines_exporter_multi_pipeline.yaml new file mode 100644 index 00000000000..f0932b14d20 --- /dev/null +++ b/service/internal/pipelines/testdata/pipelines_exporter_multi_pipeline.yaml @@ -0,0 +1,37 @@ +receivers: + examplereceiver: + +processors: + exampleprocessor: + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [ examplereceiver ] + processors: [ exampleprocessor ] + exporters: [ exampleexporter ] + + traces/1: + receivers: [ examplereceiver ] + exporters: [ exampleexporter ] + + metrics: + receivers: [ examplereceiver ] + processors: [ exampleprocessor ] + exporters: [ exampleexporter ] + + metrics/1: + receivers: [ examplereceiver ] + exporters: [ exampleexporter ] + + logs: + receivers: [ examplereceiver ] + processors: [ exampleprocessor ] + exporters: [ exampleexporter ] + + logs/1: + receivers: [ examplereceiver ] + exporters: [ exampleexporter ] diff --git a/service/internal/pipelines/testdata/pipelines_multi.yaml b/service/internal/pipelines/testdata/pipelines_multi.yaml new file mode 100644 index 00000000000..8f08e07dc75 --- /dev/null +++ b/service/internal/pipelines/testdata/pipelines_multi.yaml @@ -0,0 +1,28 @@ +receivers: + examplereceiver: + examplereceiver/1: + +processors: + exampleprocessor: + exampleprocessor/1: + +exporters: + exampleexporter: + exampleexporter/1: + +service: + pipelines: + traces: + receivers: [ examplereceiver, examplereceiver/1 ] + processors: [ exampleprocessor, exampleprocessor/1 ] + exporters: [ exampleexporter, exampleexporter/1 ] + + metrics: + receivers: [ examplereceiver, examplereceiver/1 ] + processors: [ exampleprocessor, exampleprocessor/1 ] + exporters: [ exampleexporter, exampleexporter/1 ] + + logs: + receivers: [ examplereceiver, examplereceiver/1 ] + processors: [ exampleprocessor, exampleprocessor/1 ] + exporters: [ exampleexporter, exampleexporter/1 ] diff --git a/service/internal/pipelines/testdata/pipelines_multi_no_proc.yaml b/service/internal/pipelines/testdata/pipelines_multi_no_proc.yaml new file mode 100644 index 00000000000..5746e41fe5f --- /dev/null +++ b/service/internal/pipelines/testdata/pipelines_multi_no_proc.yaml @@ -0,0 +1,21 @@ +receivers: + examplereceiver: + examplereceiver/1: + +exporters: + exampleexporter: + exampleexporter/1: + +service: + pipelines: + traces: + receivers: [ examplereceiver, examplereceiver/1 ] + exporters: [ exampleexporter, exampleexporter/1 ] + + metrics: + receivers: [ examplereceiver, examplereceiver/1 ] + exporters: [ exampleexporter, exampleexporter/1 ] + + logs: + receivers: [ examplereceiver, examplereceiver/1 ] + exporters: [ exampleexporter, exampleexporter/1 ] diff --git a/service/internal/pipelines/testdata/pipelines_simple.yaml b/service/internal/pipelines/testdata/pipelines_simple.yaml new file mode 100644 index 00000000000..c18ca604ab5 --- /dev/null +++ b/service/internal/pipelines/testdata/pipelines_simple.yaml @@ -0,0 +1,25 @@ +receivers: + examplereceiver: + +processors: + exampleprocessor: + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [examplereceiver] + processors: [exampleprocessor] + exporters: [exampleexporter] + + metrics: + receivers: [examplereceiver] + processors: [exampleprocessor] + exporters: [exampleexporter] + + logs: + receivers: [examplereceiver] + processors: [exampleprocessor] + exporters: [exampleexporter] diff --git a/service/internal/pipelines/testdata/pipelines_simple_multi_proc.yaml b/service/internal/pipelines/testdata/pipelines_simple_multi_proc.yaml new file mode 100644 index 00000000000..bb51c870843 --- /dev/null +++ b/service/internal/pipelines/testdata/pipelines_simple_multi_proc.yaml @@ -0,0 +1,25 @@ +receivers: + examplereceiver: + +processors: + exampleprocessor: + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [examplereceiver] + processors: [exampleprocessor, exampleprocessor] + exporters: [exampleexporter] + + metrics: + receivers: [examplereceiver] + processors: [exampleprocessor, exampleprocessor] + exporters: [exampleexporter] + + logs: + receivers: [examplereceiver] + processors: [exampleprocessor, exampleprocessor] + exporters: [exampleexporter] diff --git a/service/internal/pipelines/testdata/pipelines_simple_no_proc.yaml b/service/internal/pipelines/testdata/pipelines_simple_no_proc.yaml new file mode 100644 index 00000000000..351687e69c5 --- /dev/null +++ b/service/internal/pipelines/testdata/pipelines_simple_no_proc.yaml @@ -0,0 +1,22 @@ +receivers: + examplereceiver: + +processors: + exampleprocessor: + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [examplereceiver] + exporters: [exampleexporter] + + metrics: + receivers: [examplereceiver] + exporters: [exampleexporter] + + logs: + receivers: [examplereceiver] + exporters: [exampleexporter] diff --git a/service/internal/pipelines/testdata/unknown_exporter_config.yaml b/service/internal/pipelines/testdata/unknown_exporter_config.yaml new file mode 100644 index 00000000000..a205cbfa016 --- /dev/null +++ b/service/internal/pipelines/testdata/unknown_exporter_config.yaml @@ -0,0 +1,10 @@ +receivers: + nop: +exporters: + nop: + +service: + pipelines: + logs: + receivers: [nop] + exporters: [nop/1] \ No newline at end of file diff --git a/service/internal/pipelines/testdata/unknown_exporter_factory.yaml b/service/internal/pipelines/testdata/unknown_exporter_factory.yaml new file mode 100644 index 00000000000..51e20dba800 --- /dev/null +++ b/service/internal/pipelines/testdata/unknown_exporter_factory.yaml @@ -0,0 +1,10 @@ +receivers: + nop: +exporters: + unknown: + +service: + pipelines: + logs: + receivers: [nop] + exporters: [unknown] diff --git a/service/internal/pipelines/testdata/unknown_processor_config.yaml b/service/internal/pipelines/testdata/unknown_processor_config.yaml new file mode 100644 index 00000000000..f7df08fb164 --- /dev/null +++ b/service/internal/pipelines/testdata/unknown_processor_config.yaml @@ -0,0 +1,13 @@ +receivers: + nop: +processors: + nop: +exporters: + nop: + +service: + pipelines: + metrics: + receivers: [nop] + processors: [nop/1] + exporters: [nop] \ No newline at end of file diff --git a/service/internal/pipelines/testdata/unknown_processor_factory.yaml b/service/internal/pipelines/testdata/unknown_processor_factory.yaml new file mode 100644 index 00000000000..315b40c0020 --- /dev/null +++ b/service/internal/pipelines/testdata/unknown_processor_factory.yaml @@ -0,0 +1,13 @@ +receivers: + nop: +processors: + unknown: +exporters: + nop: + +service: + pipelines: + metrics: + receivers: [nop] + processors: [unknown] + exporters: [nop] \ No newline at end of file diff --git a/service/internal/pipelines/testdata/unknown_receiver_config.yaml b/service/internal/pipelines/testdata/unknown_receiver_config.yaml new file mode 100644 index 00000000000..ad1558070ed --- /dev/null +++ b/service/internal/pipelines/testdata/unknown_receiver_config.yaml @@ -0,0 +1,10 @@ +receivers: + nop: +exporters: + nop: + +service: + pipelines: + traces: + receivers: [nop/1] + exporters: [nop] diff --git a/service/internal/pipelines/testdata/unknown_receiver_factory.yaml b/service/internal/pipelines/testdata/unknown_receiver_factory.yaml new file mode 100644 index 00000000000..8e0eb5952dd --- /dev/null +++ b/service/internal/pipelines/testdata/unknown_receiver_factory.yaml @@ -0,0 +1,10 @@ +receivers: + unknown: +exporters: + nop: + +service: + pipelines: + traces: + receivers: [unknown] + exporters: [nop]