diff --git a/component/componenttest/nop_exporter.go b/component/componenttest/nop_exporter.go index 1c2e2cee59f..b08a25e69af 100644 --- a/component/componenttest/nop_exporter.go +++ b/component/componenttest/nop_exporter.go @@ -74,16 +74,16 @@ func (f *nopExporterFactory) CreateLogsExporter( } var nopExporterInstance = &nopExporter{ - Component: componenthelper.NewComponent(componenthelper.DefaultComponentSettings()), - TracesConsumer: consumertest.NewTracesNop(), - MetricsConsumer: consumertest.NewMetricsNop(), - LogsConsumer: consumertest.NewLogsNop(), + Component: componenthelper.NewComponent(componenthelper.DefaultComponentSettings()), + Traces: consumertest.NewTracesNop(), + Metrics: consumertest.NewMetricsNop(), + Logs: consumertest.NewLogsNop(), } // nopExporter stores consumed traces and metrics for testing purposes. type nopExporter struct { component.Component - consumer.TracesConsumer - consumer.MetricsConsumer - consumer.LogsConsumer + consumer.Traces + consumer.Metrics + consumer.Logs } diff --git a/component/componenttest/nop_processor.go b/component/componenttest/nop_processor.go index 8e368204aa1..e8ff907c168 100644 --- a/component/componenttest/nop_processor.go +++ b/component/componenttest/nop_processor.go @@ -51,7 +51,7 @@ func (f *nopProcessorFactory) CreateTracesProcessor( _ context.Context, _ component.ProcessorCreateParams, _ configmodels.Processor, - _ consumer.TracesConsumer, + _ consumer.Traces, ) (component.TracesProcessor, error) { return nopProcessorInstance, nil } @@ -61,7 +61,7 @@ func (f *nopProcessorFactory) CreateMetricsProcessor( _ context.Context, _ component.ProcessorCreateParams, _ configmodels.Processor, - _ consumer.MetricsConsumer, + _ consumer.Metrics, ) (component.MetricsProcessor, error) { return nopProcessorInstance, nil } @@ -71,24 +71,24 @@ func (f *nopProcessorFactory) CreateLogsProcessor( _ context.Context, _ component.ProcessorCreateParams, _ configmodels.Processor, - _ consumer.LogsConsumer, + _ consumer.Logs, ) (component.LogsProcessor, error) { return nopProcessorInstance, nil } var nopProcessorInstance = &nopProcessor{ - Component: componenthelper.NewComponent(componenthelper.DefaultComponentSettings()), - TracesConsumer: consumertest.NewTracesNop(), - MetricsConsumer: consumertest.NewMetricsNop(), - LogsConsumer: consumertest.NewLogsNop(), + Component: componenthelper.NewComponent(componenthelper.DefaultComponentSettings()), + Traces: consumertest.NewTracesNop(), + Metrics: consumertest.NewMetricsNop(), + Logs: consumertest.NewLogsNop(), } // nopProcessor stores consumed traces and metrics for testing purposes. type nopProcessor struct { component.Component - consumer.TracesConsumer - consumer.MetricsConsumer - consumer.LogsConsumer + consumer.Traces + consumer.Metrics + consumer.Logs } func (*nopProcessor) GetCapabilities() component.ProcessorCapabilities { diff --git a/component/componenttest/nop_receiver.go b/component/componenttest/nop_receiver.go index a98e65f2934..869afa88b83 100644 --- a/component/componenttest/nop_receiver.go +++ b/component/componenttest/nop_receiver.go @@ -50,7 +50,7 @@ func (f *nopReceiverFactory) CreateTracesReceiver( _ context.Context, _ component.ReceiverCreateParams, _ configmodels.Receiver, - _ consumer.TracesConsumer, + _ consumer.Traces, ) (component.TracesReceiver, error) { return nopReceiverInstance, nil } @@ -60,7 +60,7 @@ func (f *nopReceiverFactory) CreateMetricsReceiver( _ context.Context, _ component.ReceiverCreateParams, _ configmodels.Receiver, - _ consumer.MetricsConsumer, + _ consumer.Metrics, ) (component.MetricsReceiver, error) { return nopReceiverInstance, nil } @@ -70,7 +70,7 @@ func (f *nopReceiverFactory) CreateLogsReceiver( _ context.Context, _ component.ReceiverCreateParams, _ configmodels.Receiver, - _ consumer.LogsConsumer, + _ consumer.Logs, ) (component.LogsReceiver, error) { return nopReceiverInstance, nil } diff --git a/component/exporter.go b/component/exporter.go index d95c520ee22..9bae486ad78 100644 --- a/component/exporter.go +++ b/component/exporter.go @@ -31,19 +31,19 @@ type Exporter interface { // TracesExporter is a Exporter that can consume traces. type TracesExporter interface { Exporter - consumer.TracesConsumer + consumer.Traces } // MetricsExporter is an Exporter that can consume metrics. type MetricsExporter interface { Exporter - consumer.MetricsConsumer + consumer.Metrics } // LogsExporter is an Exporter that can consume logs. type LogsExporter interface { Exporter - consumer.LogsConsumer + consumer.Logs } // ExporterCreateParams is passed to Create*Exporter functions. diff --git a/component/processor.go b/component/processor.go index 8d7779c6a74..d4f25c63c2f 100644 --- a/component/processor.go +++ b/component/processor.go @@ -35,19 +35,19 @@ type Processor interface { // TracesProcessor is a processor that can consume traces. type TracesProcessor interface { Processor - consumer.TracesConsumer + consumer.Traces } // MetricsProcessor is a processor that can consume metrics. type MetricsProcessor interface { Processor - consumer.MetricsConsumer + consumer.Metrics } // LogsProcessor is a processor that can consume logs. type LogsProcessor interface { Processor - consumer.LogsConsumer + consumer.Logs } // ProcessorCapabilities describes the capabilities of a Processor. @@ -91,7 +91,7 @@ type ProcessorFactory interface { ctx context.Context, params ProcessorCreateParams, cfg configmodels.Processor, - nextConsumer consumer.TracesConsumer, + nextConsumer consumer.Traces, ) (TracesProcessor, error) // CreateMetricsProcessor creates a metrics processor based on this config. @@ -101,7 +101,7 @@ type ProcessorFactory interface { ctx context.Context, params ProcessorCreateParams, cfg configmodels.Processor, - nextConsumer consumer.MetricsConsumer, + nextConsumer consumer.Metrics, ) (MetricsProcessor, error) // CreateLogsProcessor creates a processor based on the config. @@ -111,6 +111,6 @@ type ProcessorFactory interface { ctx context.Context, params ProcessorCreateParams, cfg configmodels.Processor, - nextConsumer consumer.LogsConsumer, + nextConsumer consumer.Logs, ) (LogsProcessor, error) } diff --git a/component/processor_test.go b/component/processor_test.go index 7849f1006e9..8ea45325acc 100644 --- a/component/processor_test.go +++ b/component/processor_test.go @@ -40,17 +40,17 @@ func (f *TestProcessorFactory) CreateDefaultConfig() configmodels.Processor { } // CreateTraceProcessor creates a trace processor based on this config. -func (f *TestProcessorFactory) CreateTracesProcessor(context.Context, ProcessorCreateParams, configmodels.Processor, consumer.TracesConsumer) (TracesProcessor, error) { +func (f *TestProcessorFactory) CreateTracesProcessor(context.Context, ProcessorCreateParams, configmodels.Processor, consumer.Traces) (TracesProcessor, error) { return nil, configerror.ErrDataTypeIsNotSupported } // CreateMetricsProcessor creates a metrics processor based on this config. -func (f *TestProcessorFactory) CreateMetricsProcessor(context.Context, ProcessorCreateParams, configmodels.Processor, consumer.MetricsConsumer) (MetricsProcessor, error) { +func (f *TestProcessorFactory) CreateMetricsProcessor(context.Context, ProcessorCreateParams, configmodels.Processor, consumer.Metrics) (MetricsProcessor, error) { return nil, configerror.ErrDataTypeIsNotSupported } // CreateMetricsProcessor creates a metrics processor based on this config. -func (f *TestProcessorFactory) CreateLogsProcessor(context.Context, ProcessorCreateParams, configmodels.Processor, consumer.LogsConsumer) (LogsProcessor, error) { +func (f *TestProcessorFactory) CreateLogsProcessor(context.Context, ProcessorCreateParams, configmodels.Processor, consumer.Logs) (LogsProcessor, error) { return nil, configerror.ErrDataTypeIsNotSupported } diff --git a/component/receiver.go b/component/receiver.go index 44b57070b58..54d77f2ac03 100644 --- a/component/receiver.go +++ b/component/receiver.go @@ -30,7 +30,7 @@ type Receiver interface { // A TracesReceiver is an "arbitrary data"-to-"internal format" converter. // Its purpose is to translate data from the wild into internal trace format. -// TracesReceiver feeds a consumer.TracesConsumer with data. +// TracesReceiver feeds a consumer.Traces with data. // // For example it could be Zipkin data source which translates Zipkin spans into pdata.Traces. type TracesReceiver interface { @@ -39,7 +39,7 @@ type TracesReceiver interface { // A MetricsReceiver is an "arbitrary data"-to-"internal format" converter. // Its purpose is to translate data from the wild into internal metrics format. -// MetricsReceiver feeds a consumer.MetricsConsumer with data. +// MetricsReceiver feeds a consumer.Metrics with data. // // For example it could be Prometheus data source which translates Prometheus metrics into pdata.Metrics. type MetricsReceiver interface { @@ -48,7 +48,7 @@ type MetricsReceiver interface { // A LogsReceiver is a "log data"-to-"internal format" converter. // Its purpose is to translate data from the wild into internal data format. -// LogsReceiver feeds a consumer.LogsConsumer with data. +// LogsReceiver feeds a consumer.Logs with data. type LogsReceiver interface { Receiver } @@ -81,17 +81,17 @@ type ReceiverFactory interface { // If the receiver type does not support tracing or if the config is not valid // error will be returned instead. CreateTracesReceiver(ctx context.Context, params ReceiverCreateParams, - cfg configmodels.Receiver, nextConsumer consumer.TracesConsumer) (TracesReceiver, error) + cfg configmodels.Receiver, nextConsumer consumer.Traces) (TracesReceiver, error) // CreateMetricsReceiver creates a metrics receiver based on this config. // If the receiver type does not support metrics or if the config is not valid // error will be returned instead. CreateMetricsReceiver(ctx context.Context, params ReceiverCreateParams, - cfg configmodels.Receiver, nextConsumer consumer.MetricsConsumer) (MetricsReceiver, error) + cfg configmodels.Receiver, nextConsumer consumer.Metrics) (MetricsReceiver, error) // CreateLogsReceiver creates a log receiver based on this config. // If the receiver type does not support the data type or if the config is not valid // error will be returned instead. CreateLogsReceiver(ctx context.Context, params ReceiverCreateParams, - cfg configmodels.Receiver, nextConsumer consumer.LogsConsumer) (LogsReceiver, error) + cfg configmodels.Receiver, nextConsumer consumer.Logs) (LogsReceiver, error) } diff --git a/component/receiver_test.go b/component/receiver_test.go index ee844afd4c2..3fd8fc4288d 100644 --- a/component/receiver_test.go +++ b/component/receiver_test.go @@ -40,17 +40,17 @@ func (f *TestReceiverFactory) CreateDefaultConfig() configmodels.Receiver { } // CreateTraceReceiver creates a trace receiver based on this config. -func (f *TestReceiverFactory) CreateTracesReceiver(context.Context, ReceiverCreateParams, configmodels.Receiver, consumer.TracesConsumer) (TracesReceiver, error) { +func (f *TestReceiverFactory) CreateTracesReceiver(context.Context, ReceiverCreateParams, configmodels.Receiver, consumer.Traces) (TracesReceiver, error) { return nil, configerror.ErrDataTypeIsNotSupported } // CreateMetricsReceiver creates a metrics receiver based on this config. -func (f *TestReceiverFactory) CreateMetricsReceiver(context.Context, ReceiverCreateParams, configmodels.Receiver, consumer.MetricsConsumer) (MetricsReceiver, error) { +func (f *TestReceiverFactory) CreateMetricsReceiver(context.Context, ReceiverCreateParams, configmodels.Receiver, consumer.Metrics) (MetricsReceiver, error) { return nil, configerror.ErrDataTypeIsNotSupported } // CreateMetricsReceiver creates a metrics receiver based on this config. -func (f *TestReceiverFactory) CreateLogsReceiver(context.Context, ReceiverCreateParams, configmodels.Receiver, consumer.LogsConsumer) (LogsReceiver, error) { +func (f *TestReceiverFactory) CreateLogsReceiver(context.Context, ReceiverCreateParams, configmodels.Receiver, consumer.Logs) (LogsReceiver, error) { return nil, configerror.ErrDataTypeIsNotSupported } diff --git a/consumer/consumer.go b/consumer/consumer.go index 2ebc5ff0ce3..b0ef563ea7c 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -21,23 +21,23 @@ import ( "go.opentelemetry.io/collector/consumer/pdata" ) -// MetricsConsumer is the new metrics consumer interface that receives pdata.Metrics, processes it +// Metrics is the new metrics consumer interface that receives pdata.Metrics, processes it // as needed, and sends it to the next processing node if any or to the destination. -type MetricsConsumer interface { +type Metrics interface { // ConsumeMetrics receives pdata.Metrics for consumption. ConsumeMetrics(ctx context.Context, md pdata.Metrics) error } -// TracesConsumer is an interface that receives pdata.Traces, processes it +// Traces is an interface that receives pdata.Traces, processes it // as needed, and sends it to the next processing node if any or to the destination. -type TracesConsumer interface { +type Traces interface { // ConsumeTraces receives pdata.Traces for consumption. ConsumeTraces(ctx context.Context, td pdata.Traces) error } -// LogsConsumer is an interface that receives pdata.Logs, processes it +// Logs is an interface that receives pdata.Logs, processes it // as needed, and sends it to the next processing node if any or to the destination. -type LogsConsumer interface { +type Logs interface { // ConsumeLogs receives pdata.Logs for consumption. ConsumeLogs(ctx context.Context, ld pdata.Logs) error } diff --git a/consumer/consumertest/err.go b/consumer/consumertest/err.go index 818608adb8a..e154a37cc95 100644 --- a/consumer/consumertest/err.go +++ b/consumer/consumertest/err.go @@ -37,17 +37,17 @@ func (er *errConsumer) ConsumeLogs(context.Context, pdata.Logs) error { return er.err } -// NewTracesErr returns a consumer.TracesConsumer that just drops all received data and returns the given error. -func NewTracesErr(err error) consumer.TracesConsumer { +// NewTracesErr returns a consumer.Traces that just drops all received data and returns the given error. +func NewTracesErr(err error) consumer.Traces { return &errConsumer{err: err} } -// NewMetricsErr returns a consumer.MetricsConsumer that just drops all received data and returns the given error. -func NewMetricsErr(err error) consumer.MetricsConsumer { +// NewMetricsErr returns a consumer.Metrics that just drops all received data and returns the given error. +func NewMetricsErr(err error) consumer.Metrics { return &errConsumer{err: err} } -// NewLogsErr returns a consumer.LogsConsumer that just drops all received data and returns the given error. -func NewLogsErr(err error) consumer.LogsConsumer { +// NewLogsErr returns a consumer.Logs that just drops all received data and returns the given error. +func NewLogsErr(err error) consumer.Logs { return &errConsumer{err: err} } diff --git a/consumer/consumertest/nop.go b/consumer/consumertest/nop.go index 1a93a6a01ed..b6c560398aa 100644 --- a/consumer/consumertest/nop.go +++ b/consumer/consumertest/nop.go @@ -39,17 +39,17 @@ func (nc *nopConsumer) ConsumeLogs(context.Context, pdata.Logs) error { return nil } -// NewTracesNop returns a consumer.TracesConsumer that just drops all received data and returns no error. -func NewTracesNop() consumer.TracesConsumer { +// NewTracesNop returns a consumer.Traces that just drops all received data and returns no error. +func NewTracesNop() consumer.Traces { return nopInstance } -// NewMetricsNop returns a consumer.MetricsConsumer that just drops all received data and returns no error. -func NewMetricsNop() consumer.MetricsConsumer { +// NewMetricsNop returns a consumer.Metrics that just drops all received data and returns no error. +func NewMetricsNop() consumer.Metrics { return nopInstance } -// NewLogsNop returns a consumer.LogsConsumer that just drops all received data and returns no error. -func NewLogsNop() consumer.LogsConsumer { +// NewLogsNop returns a consumer.Logs that just drops all received data and returns no error. +func NewLogsNop() consumer.Logs { return nopInstance } diff --git a/consumer/consumertest/sink.go b/consumer/consumertest/sink.go index cd39256fd6a..954403faeb6 100644 --- a/consumer/consumertest/sink.go +++ b/consumer/consumertest/sink.go @@ -22,7 +22,7 @@ import ( "go.opentelemetry.io/collector/consumer/pdata" ) -// TracesSink is a consumer.TracesConsumer that acts like a sink that +// TracesSink is a consumer.Traces that acts like a sink that // stores all traces and allows querying them for testing. type TracesSink struct { mu sync.Mutex @@ -30,7 +30,7 @@ type TracesSink struct { spansCount int } -var _ consumer.TracesConsumer = (*TracesSink)(nil) +var _ consumer.Traces = (*TracesSink)(nil) // ConsumeTraces stores traces to this sink. func (ste *TracesSink) ConsumeTraces(_ context.Context, td pdata.Traces) error { @@ -69,7 +69,7 @@ func (ste *TracesSink) Reset() { ste.spansCount = 0 } -// MetricsSink is a consumer.MetricsConsumer that acts like a sink that +// MetricsSink is a consumer.Metrics that acts like a sink that // stores all metrics and allows querying them for testing. type MetricsSink struct { mu sync.Mutex @@ -77,7 +77,7 @@ type MetricsSink struct { metricsCount int } -var _ consumer.MetricsConsumer = (*MetricsSink)(nil) +var _ consumer.Metrics = (*MetricsSink)(nil) // ConsumeMetrics stores metrics to this sink. func (sme *MetricsSink) ConsumeMetrics(_ context.Context, md pdata.Metrics) error { @@ -116,7 +116,7 @@ func (sme *MetricsSink) Reset() { sme.metricsCount = 0 } -// LogsSink is a consumer.LogsConsumer that acts like a sink that +// LogsSink is a consumer.Logs that acts like a sink that // stores all logs and allows querying them for testing. type LogsSink struct { mu sync.Mutex @@ -124,7 +124,7 @@ type LogsSink struct { logRecordsCount int } -var _ consumer.LogsConsumer = (*LogsSink)(nil) +var _ consumer.Logs = (*LogsSink)(nil) // ConsumeLogs stores logs to this sink. func (sle *LogsSink) ConsumeLogs(_ context.Context, ld pdata.Logs) error { diff --git a/consumer/fanoutconsumer/cloningconsumer.go b/consumer/fanoutconsumer/cloningconsumer.go index a82fb8a0ae1..f8a7f0a0c3a 100644 --- a/consumer/fanoutconsumer/cloningconsumer.go +++ b/consumer/fanoutconsumer/cloningconsumer.go @@ -24,7 +24,7 @@ import ( // NewMetricsCloning wraps multiple metrics consumers in a single one and clones the data // before fanning out. -func NewMetricsCloning(mcs []consumer.MetricsConsumer) consumer.MetricsConsumer { +func NewMetricsCloning(mcs []consumer.Metrics) consumer.Metrics { if len(mcs) == 1 { // Don't wrap if no need to do it. return mcs[0] @@ -32,9 +32,9 @@ func NewMetricsCloning(mcs []consumer.MetricsConsumer) consumer.MetricsConsumer return metricsCloningConsumer(mcs) } -type metricsCloningConsumer []consumer.MetricsConsumer +type metricsCloningConsumer []consumer.Metrics -var _ consumer.MetricsConsumer = (*metricsCloningConsumer)(nil) +var _ consumer.Metrics = (*metricsCloningConsumer)(nil) // ConsumeMetrics exports the pdata.Metrics to all consumers wrapped by the current one. func (mfc metricsCloningConsumer) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { @@ -61,7 +61,7 @@ func (mfc metricsCloningConsumer) ConsumeMetrics(ctx context.Context, md pdata.M // NewTracesCloning wraps multiple traces consumers in a single one and clones the data // before fanning out. -func NewTracesCloning(tcs []consumer.TracesConsumer) consumer.TracesConsumer { +func NewTracesCloning(tcs []consumer.Traces) consumer.Traces { if len(tcs) == 1 { // Don't wrap if no need to do it. return tcs[0] @@ -69,9 +69,9 @@ func NewTracesCloning(tcs []consumer.TracesConsumer) consumer.TracesConsumer { return tracesCloningConsumer(tcs) } -type tracesCloningConsumer []consumer.TracesConsumer +type tracesCloningConsumer []consumer.Traces -var _ consumer.TracesConsumer = (*tracesCloningConsumer)(nil) +var _ consumer.Traces = (*tracesCloningConsumer)(nil) // ConsumeTraces exports the pdata.Traces to all consumers wrapped by the current one. func (tfc tracesCloningConsumer) ConsumeTraces(ctx context.Context, td pdata.Traces) error { @@ -98,7 +98,7 @@ func (tfc tracesCloningConsumer) ConsumeTraces(ctx context.Context, td pdata.Tra // NewLogsCloning wraps multiple trace consumers in a single one and clones the data // before fanning out. -func NewLogsCloning(lcs []consumer.LogsConsumer) consumer.LogsConsumer { +func NewLogsCloning(lcs []consumer.Logs) consumer.Logs { if len(lcs) == 1 { // Don't wrap if no need to do it. return lcs[0] @@ -106,9 +106,9 @@ func NewLogsCloning(lcs []consumer.LogsConsumer) consumer.LogsConsumer { return logsCloningConsumer(lcs) } -type logsCloningConsumer []consumer.LogsConsumer +type logsCloningConsumer []consumer.Logs -var _ consumer.LogsConsumer = (*logsCloningConsumer)(nil) +var _ consumer.Logs = (*logsCloningConsumer)(nil) // ConsumeLogs exports the pdata.Logs to all consumers wrapped by the current one. func (lfc logsCloningConsumer) ConsumeLogs(ctx context.Context, ld pdata.Logs) error { diff --git a/consumer/fanoutconsumer/cloningconsumer_test.go b/consumer/fanoutconsumer/cloningconsumer_test.go index 45760d05357..f9aeaf1b53d 100644 --- a/consumer/fanoutconsumer/cloningconsumer_test.go +++ b/consumer/fanoutconsumer/cloningconsumer_test.go @@ -27,12 +27,12 @@ import ( func TestTraceProcessorCloningNotMultiplexing(t *testing.T) { nop := consumertest.NewTracesNop() - tfc := NewTracesCloning([]consumer.TracesConsumer{nop}) + tfc := NewTracesCloning([]consumer.Traces{nop}) assert.Same(t, nop, tfc) } func TestTraceProcessorCloningMultiplexing(t *testing.T) { - processors := make([]consumer.TracesConsumer, 3) + processors := make([]consumer.Traces, 3) for i := range processors { processors[i] = new(consumertest.TracesSink) } @@ -70,12 +70,12 @@ func TestTraceProcessorCloningMultiplexing(t *testing.T) { func TestMetricsProcessorCloningNotMultiplexing(t *testing.T) { nop := consumertest.NewMetricsNop() - mfc := NewMetrics([]consumer.MetricsConsumer{nop}) + mfc := NewMetrics([]consumer.Metrics{nop}) assert.Same(t, nop, mfc) } func TestMetricsProcessorCloningMultiplexing(t *testing.T) { - processors := make([]consumer.MetricsConsumer, 3) + processors := make([]consumer.Metrics, 3) for i := range processors { processors[i] = new(consumertest.MetricsSink) } @@ -113,12 +113,12 @@ func TestMetricsProcessorCloningMultiplexing(t *testing.T) { func TestLogsProcessorCloningNotMultiplexing(t *testing.T) { nop := consumertest.NewLogsNop() - lfc := NewLogsCloning([]consumer.LogsConsumer{nop}) + lfc := NewLogsCloning([]consumer.Logs{nop}) assert.Same(t, nop, lfc) } func TestLogsProcessorCloningMultiplexing(t *testing.T) { - processors := make([]consumer.LogsConsumer, 3) + processors := make([]consumer.Logs, 3) for i := range processors { processors[i] = new(consumertest.LogsSink) } diff --git a/consumer/fanoutconsumer/consumer.go b/consumer/fanoutconsumer/consumer.go index e41d5cbe16b..763a5b5e4ae 100644 --- a/consumer/fanoutconsumer/consumer.go +++ b/consumer/fanoutconsumer/consumer.go @@ -28,7 +28,7 @@ import ( ) // NewMetrics wraps multiple metrics consumers in a single one. -func NewMetrics(mcs []consumer.MetricsConsumer) consumer.MetricsConsumer { +func NewMetrics(mcs []consumer.Metrics) consumer.Metrics { if len(mcs) == 1 { // Don't wrap if no need to do it. return mcs[0] @@ -36,9 +36,9 @@ func NewMetrics(mcs []consumer.MetricsConsumer) consumer.MetricsConsumer { return metricsConsumer(mcs) } -type metricsConsumer []consumer.MetricsConsumer +type metricsConsumer []consumer.Metrics -var _ consumer.MetricsConsumer = (*metricsConsumer)(nil) +var _ consumer.Metrics = (*metricsConsumer)(nil) // ConsumeMetrics exports the pdata.Metrics to all consumers wrapped by the current one. func (mfc metricsConsumer) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { @@ -52,7 +52,7 @@ func (mfc metricsConsumer) ConsumeMetrics(ctx context.Context, md pdata.Metrics) } // NewTraces wraps multiple trace consumers in a single one. -func NewTraces(tcs []consumer.TracesConsumer) consumer.TracesConsumer { +func NewTraces(tcs []consumer.Traces) consumer.Traces { if len(tcs) == 1 { // Don't wrap if no need to do it. return tcs[0] @@ -60,9 +60,9 @@ func NewTraces(tcs []consumer.TracesConsumer) consumer.TracesConsumer { return traceConsumer(tcs) } -type traceConsumer []consumer.TracesConsumer +type traceConsumer []consumer.Traces -var _ consumer.TracesConsumer = (*traceConsumer)(nil) +var _ consumer.Traces = (*traceConsumer)(nil) // ConsumeTraces exports the pdata.Traces to all consumers wrapped by the current one. func (tfc traceConsumer) ConsumeTraces(ctx context.Context, td pdata.Traces) error { @@ -76,7 +76,7 @@ func (tfc traceConsumer) ConsumeTraces(ctx context.Context, td pdata.Traces) err } // NewLogs wraps multiple log consumers in a single one. -func NewLogs(lcs []consumer.LogsConsumer) consumer.LogsConsumer { +func NewLogs(lcs []consumer.Logs) consumer.Logs { if len(lcs) == 1 { // Don't wrap if no need to do it. return lcs[0] @@ -84,15 +84,15 @@ func NewLogs(lcs []consumer.LogsConsumer) consumer.LogsConsumer { return logsConsumer(lcs) } -type logsConsumer []consumer.LogsConsumer +type logsConsumer []consumer.Logs -var _ consumer.LogsConsumer = (*logsConsumer)(nil) +var _ consumer.Logs = (*logsConsumer)(nil) // ConsumeLogs exports the pdata.Logs to all consumers wrapped by the current one. -func (fc logsConsumer) ConsumeLogs(ctx context.Context, ld pdata.Logs) error { +func (lfc logsConsumer) ConsumeLogs(ctx context.Context, ld pdata.Logs) error { var errs []error - for _, tc := range fc { - if err := tc.ConsumeLogs(ctx, ld); err != nil { + for _, lc := range lfc { + if err := lc.ConsumeLogs(ctx, ld); err != nil { errs = append(errs, err) } } diff --git a/consumer/fanoutconsumer/consumer_test.go b/consumer/fanoutconsumer/consumer_test.go index c428a9bdedd..0f03c409a32 100644 --- a/consumer/fanoutconsumer/consumer_test.go +++ b/consumer/fanoutconsumer/consumer_test.go @@ -28,12 +28,12 @@ import ( func TestTracesProcessorNotMultiplexing(t *testing.T) { nop := consumertest.NewTracesNop() - tfc := NewTraces([]consumer.TracesConsumer{nop}) + tfc := NewTraces([]consumer.Traces{nop}) assert.Same(t, nop, tfc) } func TestTracesProcessorMultiplexing(t *testing.T) { - processors := make([]consumer.TracesConsumer, 3) + processors := make([]consumer.Traces, 3) for i := range processors { processors[i] = new(consumertest.TracesSink) } @@ -59,7 +59,7 @@ func TestTracesProcessorMultiplexing(t *testing.T) { } func TestTraceProcessorWhenOneErrors(t *testing.T) { - processors := make([]consumer.TracesConsumer, 3) + processors := make([]consumer.Traces, 3) for i := range processors { processors[i] = new(consumertest.TracesSink) } @@ -82,12 +82,12 @@ func TestTraceProcessorWhenOneErrors(t *testing.T) { func TestMetricsProcessorNotMultiplexing(t *testing.T) { nop := consumertest.NewMetricsNop() - mfc := NewMetrics([]consumer.MetricsConsumer{nop}) + mfc := NewMetrics([]consumer.Metrics{nop}) assert.Same(t, nop, mfc) } func TestMetricsProcessorMultiplexing(t *testing.T) { - processors := make([]consumer.MetricsConsumer, 3) + processors := make([]consumer.Metrics, 3) for i := range processors { processors[i] = new(consumertest.MetricsSink) } @@ -113,7 +113,7 @@ func TestMetricsProcessorMultiplexing(t *testing.T) { } func TestMetricsProcessorWhenOneErrors(t *testing.T) { - processors := make([]consumer.MetricsConsumer, 3) + processors := make([]consumer.Metrics, 3) for i := range processors { processors[i] = new(consumertest.MetricsSink) } @@ -136,12 +136,12 @@ func TestMetricsProcessorWhenOneErrors(t *testing.T) { func TestLogsProcessorNotMultiplexing(t *testing.T) { nop := consumertest.NewLogsNop() - lfc := NewLogs([]consumer.LogsConsumer{nop}) + lfc := NewLogs([]consumer.Logs{nop}) assert.Same(t, nop, lfc) } func TestLogsProcessorMultiplexing(t *testing.T) { - processors := make([]consumer.LogsConsumer, 3) + processors := make([]consumer.Logs, 3) for i := range processors { processors[i] = new(consumertest.LogsSink) } @@ -167,7 +167,7 @@ func TestLogsProcessorMultiplexing(t *testing.T) { } func TestLogsProcessorWhenOneErrors(t *testing.T) { - processors := make([]consumer.LogsConsumer, 3) + processors := make([]consumer.Logs, 3) for i := range processors { processors[i] = new(consumertest.LogsSink) } diff --git a/exporter/exporterhelper/factory.go b/exporter/exporterhelper/factory.go index 7b55a60ec2d..f73813869e3 100644 --- a/exporter/exporterhelper/factory.go +++ b/exporter/exporterhelper/factory.go @@ -118,7 +118,7 @@ func (f *factory) CreateTracesExporter( return nil, configerror.ErrDataTypeIsNotSupported } -// CreateMetricsExporter creates a consumer.MetricsConsumer based on this config. +// CreateMetricsExporter creates a component.MetricsExporter based on this config. func (f *factory) CreateMetricsExporter( ctx context.Context, params component.ExporterCreateParams, diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index 93ce71155b9..ed0f0dd98b4 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -329,7 +329,7 @@ func createExporterConfig(baseURL string, defaultCfg configmodels.Exporter) *Con return cfg } -func startTraceReceiver(t *testing.T, addr string, next consumer.TracesConsumer) { +func startTraceReceiver(t *testing.T, addr string, next consumer.Traces) { factory := otlpreceiver.NewFactory() cfg := createReceiverConfig(addr, factory.CreateDefaultConfig()) recv, err := factory.CreateTracesReceiver(context.Background(), component.ReceiverCreateParams{Logger: zap.NewNop()}, cfg, next) @@ -337,7 +337,7 @@ func startTraceReceiver(t *testing.T, addr string, next consumer.TracesConsumer) startAndCleanup(t, recv) } -func startMetricsReceiver(t *testing.T, addr string, next consumer.MetricsConsumer) { +func startMetricsReceiver(t *testing.T, addr string, next consumer.Metrics) { factory := otlpreceiver.NewFactory() cfg := createReceiverConfig(addr, factory.CreateDefaultConfig()) recv, err := factory.CreateMetricsReceiver(context.Background(), component.ReceiverCreateParams{Logger: zap.NewNop()}, cfg, next) @@ -345,7 +345,7 @@ func startMetricsReceiver(t *testing.T, addr string, next consumer.MetricsConsum startAndCleanup(t, recv) } -func startLogsReceiver(t *testing.T, addr string, next consumer.LogsConsumer) { +func startLogsReceiver(t *testing.T, addr string, next consumer.Logs) { factory := otlpreceiver.NewFactory() cfg := createReceiverConfig(addr, factory.CreateDefaultConfig()) recv, err := factory.CreateLogsReceiver(context.Background(), component.ReceiverCreateParams{Logger: zap.NewNop()}, cfg, next) diff --git a/internal/testcomponents/example_exporter.go b/internal/testcomponents/example_exporter.go index d6acd2681fd..5f1abec6f5f 100644 --- a/internal/testcomponents/example_exporter.go +++ b/internal/testcomponents/example_exporter.go @@ -104,13 +104,13 @@ func (exp *ExampleExporterConsumer) Start(_ context.Context, _ component.Host) e return nil } -// ConsumeTraces receives pdata.Traces for processing by the TracesConsumer. +// ConsumeTraces receives pdata.Traces for processing by the consumer.Traces. func (exp *ExampleExporterConsumer) ConsumeTraces(_ context.Context, td pdata.Traces) error { exp.Traces = append(exp.Traces, td) return nil } -// ConsumeMetrics receives pdata.Metrics for processing by the MetricsConsumer. +// ConsumeMetrics receives pdata.Metrics for processing by the Metrics. func (exp *ExampleExporterConsumer) ConsumeMetrics(_ context.Context, md pdata.Metrics) error { exp.Metrics = append(exp.Metrics, md) return nil diff --git a/internal/testcomponents/example_processor.go b/internal/testcomponents/example_processor.go index b534f1435e6..adeba96749f 100644 --- a/internal/testcomponents/example_processor.go +++ b/internal/testcomponents/example_processor.go @@ -55,22 +55,22 @@ func createDefaultConfig() configmodels.Processor { } } -func createTracesProcessor(_ context.Context, _ component.ProcessorCreateParams, _ configmodels.Processor, nextConsumer consumer.TracesConsumer) (component.TracesProcessor, error) { - return &exampleProcessor{TracesConsumer: nextConsumer}, nil +func createTracesProcessor(_ context.Context, _ component.ProcessorCreateParams, _ configmodels.Processor, nextConsumer consumer.Traces) (component.TracesProcessor, error) { + return &exampleProcessor{Traces: nextConsumer}, nil } -func createMetricsProcessor(_ context.Context, _ component.ProcessorCreateParams, _ configmodels.Processor, nextConsumer consumer.MetricsConsumer) (component.MetricsProcessor, error) { - return &exampleProcessor{MetricsConsumer: nextConsumer}, nil +func createMetricsProcessor(_ context.Context, _ component.ProcessorCreateParams, _ configmodels.Processor, nextConsumer consumer.Metrics) (component.MetricsProcessor, error) { + return &exampleProcessor{Metrics: nextConsumer}, nil } -func createLogsProcessor(_ context.Context, _ component.ProcessorCreateParams, _ configmodels.Processor, nextConsumer consumer.LogsConsumer) (component.LogsProcessor, error) { - return &exampleProcessor{LogsConsumer: nextConsumer}, nil +func createLogsProcessor(_ context.Context, _ component.ProcessorCreateParams, _ configmodels.Processor, nextConsumer consumer.Logs) (component.LogsProcessor, error) { + return &exampleProcessor{Logs: nextConsumer}, nil } type exampleProcessor struct { - consumer.TracesConsumer - consumer.MetricsConsumer - consumer.LogsConsumer + consumer.Traces + consumer.Metrics + consumer.Logs } func (ep *exampleProcessor) Start(_ context.Context, _ component.Host) error { diff --git a/internal/testcomponents/example_receiver.go b/internal/testcomponents/example_receiver.go index b33cabe558d..47ed2aa55e1 100644 --- a/internal/testcomponents/example_receiver.go +++ b/internal/testcomponents/example_receiver.go @@ -66,10 +66,10 @@ func createTracesReceiver( _ context.Context, _ component.ReceiverCreateParams, cfg configmodels.Receiver, - nextConsumer consumer.TracesConsumer, + nextConsumer consumer.Traces, ) (component.TracesReceiver, error) { receiver := createReceiver(cfg) - receiver.TracesConsumer = nextConsumer + receiver.Traces = nextConsumer return receiver, nil } @@ -78,10 +78,10 @@ func createMetricsReceiver( _ context.Context, _ component.ReceiverCreateParams, cfg configmodels.Receiver, - nextConsumer consumer.MetricsConsumer, + nextConsumer consumer.Metrics, ) (component.MetricsReceiver, error) { receiver := createReceiver(cfg) - receiver.MetricsConsumer = nextConsumer + receiver.Metrics = nextConsumer return receiver, nil } @@ -89,10 +89,10 @@ func createLogsReceiver( _ context.Context, _ component.ReceiverCreateParams, cfg configmodels.Receiver, - nextConsumer consumer.LogsConsumer, + nextConsumer consumer.Logs, ) (component.LogsReceiver, error) { receiver := createReceiver(cfg) - receiver.LogsConsumer = nextConsumer + receiver.Logs = nextConsumer return receiver, nil } @@ -116,9 +116,9 @@ func createReceiver(cfg configmodels.Receiver) *ExampleReceiverProducer { type ExampleReceiverProducer struct { Started bool Stopped bool - consumer.TracesConsumer - consumer.MetricsConsumer - consumer.LogsConsumer + consumer.Traces + consumer.Metrics + consumer.Logs } // Start tells the receiver to start its processing. diff --git a/processor/attributesprocessor/factory.go b/processor/attributesprocessor/factory.go index c0a30538004..919e76d1906 100644 --- a/processor/attributesprocessor/factory.go +++ b/processor/attributesprocessor/factory.go @@ -56,7 +56,7 @@ func createTraceProcessor( _ context.Context, _ component.ProcessorCreateParams, cfg configmodels.Processor, - nextConsumer consumer.TracesConsumer, + nextConsumer consumer.Traces, ) (component.TracesProcessor, error) { oCfg := cfg.(*Config) if len(oCfg.Actions) == 0 { @@ -86,7 +86,7 @@ func createLogProcessor( _ context.Context, _ component.ProcessorCreateParams, cfg configmodels.Processor, - nextConsumer consumer.LogsConsumer, + nextConsumer consumer.Logs, ) (component.LogsProcessor, error) { oCfg := cfg.(*Config) if len(oCfg.Actions) == 0 { diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index fb80871ac37..a1c6e85de14 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -33,7 +33,7 @@ import ( // batch_processor is a component that accepts spans and metrics, places them // into batches and sends downstream. // -// batch_processor implements consumer.TracesConsumer and consumer.MetricsConsumer +// batch_processor implements consumer.Traces and consumer.Metrics // // Batches are sent out with any of the following conditions: // - batch size reaches cfg.SendBatchSize @@ -73,9 +73,9 @@ type batch interface { add(item interface{}) } -var _ consumer.TracesConsumer = (*batchProcessor)(nil) -var _ consumer.MetricsConsumer = (*batchProcessor)(nil) -var _ consumer.LogsConsumer = (*batchProcessor)(nil) +var _ consumer.Traces = (*batchProcessor)(nil) +var _ consumer.Metrics = (*batchProcessor)(nil) +var _ consumer.Logs = (*batchProcessor)(nil) func newBatchProcessor(params component.ProcessorCreateParams, cfg *Config, batch batch, telemetryLevel configtelemetry.Level) *batchProcessor { ctx, cancel := context.WithCancel(context.Background()) @@ -222,27 +222,27 @@ func (bp *batchProcessor) ConsumeLogs(_ context.Context, ld pdata.Logs) error { } // newBatchTracesProcessor creates a new batch processor that batches traces by size or with timeout -func newBatchTracesProcessor(params component.ProcessorCreateParams, trace consumer.TracesConsumer, cfg *Config, telemetryLevel configtelemetry.Level) *batchProcessor { +func newBatchTracesProcessor(params component.ProcessorCreateParams, trace consumer.Traces, cfg *Config, telemetryLevel configtelemetry.Level) *batchProcessor { return newBatchProcessor(params, cfg, newBatchTraces(trace), telemetryLevel) } // newBatchMetricsProcessor creates a new batch processor that batches metrics by size or with timeout -func newBatchMetricsProcessor(params component.ProcessorCreateParams, metrics consumer.MetricsConsumer, cfg *Config, telemetryLevel configtelemetry.Level) *batchProcessor { +func newBatchMetricsProcessor(params component.ProcessorCreateParams, metrics consumer.Metrics, cfg *Config, telemetryLevel configtelemetry.Level) *batchProcessor { return newBatchProcessor(params, cfg, newBatchMetrics(metrics), telemetryLevel) } // newBatchLogsProcessor creates a new batch processor that batches logs by size or with timeout -func newBatchLogsProcessor(params component.ProcessorCreateParams, logs consumer.LogsConsumer, cfg *Config, telemetryLevel configtelemetry.Level) *batchProcessor { +func newBatchLogsProcessor(params component.ProcessorCreateParams, logs consumer.Logs, cfg *Config, telemetryLevel configtelemetry.Level) *batchProcessor { return newBatchProcessor(params, cfg, newBatchLogs(logs), telemetryLevel) } type batchTraces struct { - nextConsumer consumer.TracesConsumer + nextConsumer consumer.Traces traceData pdata.Traces spanCount uint32 } -func newBatchTraces(nextConsumer consumer.TracesConsumer) *batchTraces { +func newBatchTraces(nextConsumer consumer.Traces) *batchTraces { b := &batchTraces{nextConsumer: nextConsumer} b.reset() return b @@ -279,12 +279,12 @@ func (bt *batchTraces) reset() { } type batchMetrics struct { - nextConsumer consumer.MetricsConsumer + nextConsumer consumer.Metrics metricData pdata.Metrics metricCount uint32 } -func newBatchMetrics(nextConsumer consumer.MetricsConsumer) *batchMetrics { +func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics { b := &batchMetrics{nextConsumer: nextConsumer} b.reset() return b @@ -320,12 +320,12 @@ func (bm *batchMetrics) add(item interface{}) { } type batchLogs struct { - nextConsumer consumer.LogsConsumer + nextConsumer consumer.Logs logData pdata.Logs logCount uint32 } -func newBatchLogs(nextConsumer consumer.LogsConsumer) *batchLogs { +func newBatchLogs(nextConsumer consumer.Logs) *batchLogs { b := &batchLogs{nextConsumer: nextConsumer} b.reset() return b diff --git a/processor/batchprocessor/factory.go b/processor/batchprocessor/factory.go index 9e5d4ca441d..0ef1bac8288 100644 --- a/processor/batchprocessor/factory.go +++ b/processor/batchprocessor/factory.go @@ -58,7 +58,7 @@ func createTraceProcessor( _ context.Context, params component.ProcessorCreateParams, cfg configmodels.Processor, - nextConsumer consumer.TracesConsumer, + nextConsumer consumer.Traces, ) (component.TracesProcessor, error) { oCfg := cfg.(*Config) level := configtelemetry.GetMetricsLevelFlagValue() @@ -69,7 +69,7 @@ func createMetricsProcessor( _ context.Context, params component.ProcessorCreateParams, cfg configmodels.Processor, - nextConsumer consumer.MetricsConsumer, + nextConsumer consumer.Metrics, ) (component.MetricsProcessor, error) { oCfg := cfg.(*Config) level := configtelemetry.GetMetricsLevelFlagValue() @@ -80,7 +80,7 @@ func createLogsProcessor( _ context.Context, params component.ProcessorCreateParams, cfg configmodels.Processor, - nextConsumer consumer.LogsConsumer, + nextConsumer consumer.Logs, ) (component.LogsProcessor, error) { oCfg := cfg.(*Config) level := configtelemetry.GetMetricsLevelFlagValue() diff --git a/processor/filterprocessor/factory.go b/processor/filterprocessor/factory.go index 383d16a2238..6ae0d5ee2d9 100644 --- a/processor/filterprocessor/factory.go +++ b/processor/filterprocessor/factory.go @@ -51,7 +51,7 @@ func createMetricsProcessor( _ context.Context, params component.ProcessorCreateParams, cfg configmodels.Processor, - nextConsumer consumer.MetricsConsumer, + nextConsumer consumer.Metrics, ) (component.MetricsProcessor, error) { fp, err := newFilterMetricProcessor(params.Logger, cfg.(*Config)) if err != nil { diff --git a/processor/memorylimiter/factory.go b/processor/memorylimiter/factory.go index 401957a42c4..b8a6c4062ee 100644 --- a/processor/memorylimiter/factory.go +++ b/processor/memorylimiter/factory.go @@ -55,7 +55,7 @@ func createTraceProcessor( _ context.Context, params component.ProcessorCreateParams, cfg configmodels.Processor, - nextConsumer consumer.TracesConsumer, + nextConsumer consumer.Traces, ) (component.TracesProcessor, error) { ml, err := newMemoryLimiter(params.Logger, cfg.(*Config)) if err != nil { @@ -73,7 +73,7 @@ func createMetricsProcessor( _ context.Context, params component.ProcessorCreateParams, cfg configmodels.Processor, - nextConsumer consumer.MetricsConsumer, + nextConsumer consumer.Metrics, ) (component.MetricsProcessor, error) { ml, err := newMemoryLimiter(params.Logger, cfg.(*Config)) if err != nil { @@ -91,7 +91,7 @@ func createLogsProcessor( _ context.Context, params component.ProcessorCreateParams, cfg configmodels.Processor, - nextConsumer consumer.LogsConsumer, + nextConsumer consumer.Logs, ) (component.LogsProcessor, error) { ml, err := newMemoryLimiter(params.Logger, cfg.(*Config)) if err != nil { diff --git a/processor/memorylimiter/memorylimiter_test.go b/processor/memorylimiter/memorylimiter_test.go index 1cdf8d0759c..84268ed0059 100644 --- a/processor/memorylimiter/memorylimiter_test.go +++ b/processor/memorylimiter/memorylimiter_test.go @@ -36,7 +36,7 @@ import ( func TestNew(t *testing.T) { type args struct { - nextConsumer consumer.TracesConsumer + nextConsumer consumer.Traces checkInterval time.Duration memoryLimitMiB uint32 memorySpikeLimitMiB uint32 diff --git a/processor/probabilisticsamplerprocessor/factory.go b/processor/probabilisticsamplerprocessor/factory.go index f8af5fc3447..c9ea05c7d8b 100644 --- a/processor/probabilisticsamplerprocessor/factory.go +++ b/processor/probabilisticsamplerprocessor/factory.go @@ -50,7 +50,7 @@ func createTraceProcessor( _ context.Context, _ component.ProcessorCreateParams, cfg configmodels.Processor, - nextConsumer consumer.TracesConsumer, + nextConsumer consumer.Traces, ) (component.TracesProcessor, error) { oCfg := cfg.(*Config) return newTraceProcessor(nextConsumer, *oCfg) diff --git a/processor/probabilisticsamplerprocessor/probabilisticsampler.go b/processor/probabilisticsamplerprocessor/probabilisticsampler.go index 0ae8f9ccc0d..88237d06397 100644 --- a/processor/probabilisticsamplerprocessor/probabilisticsampler.go +++ b/processor/probabilisticsamplerprocessor/probabilisticsampler.go @@ -49,14 +49,14 @@ const ( ) type tracesamplerprocessor struct { - nextConsumer consumer.TracesConsumer + nextConsumer consumer.Traces scaledSamplingRate uint32 hashSeed uint32 } // newTraceProcessor returns a processor.TracesProcessor that will perform head sampling according to the given // configuration. -func newTraceProcessor(nextConsumer consumer.TracesConsumer, cfg Config) (component.TracesProcessor, error) { +func newTraceProcessor(nextConsumer consumer.Traces, cfg Config) (component.TracesProcessor, error) { if nextConsumer == nil { return nil, componenterror.ErrNilNextConsumer } diff --git a/processor/probabilisticsamplerprocessor/probabilisticsampler_test.go b/processor/probabilisticsamplerprocessor/probabilisticsampler_test.go index aade1e63d35..896016970b1 100644 --- a/processor/probabilisticsamplerprocessor/probabilisticsampler_test.go +++ b/processor/probabilisticsamplerprocessor/probabilisticsampler_test.go @@ -34,7 +34,7 @@ import ( func TestNewTraceProcessor(t *testing.T) { tests := []struct { name string - nextConsumer consumer.TracesConsumer + nextConsumer consumer.Traces cfg Config want component.TracesProcessor wantErr bool diff --git a/processor/processorhelper/factory.go b/processor/processorhelper/factory.go index c5175f762d2..2cc5c56ebb5 100644 --- a/processor/processorhelper/factory.go +++ b/processor/processorhelper/factory.go @@ -32,13 +32,13 @@ type FactoryOption func(o *factory) type CreateDefaultConfig func() configmodels.Processor // CreateTraceProcessor is the equivalent of component.ProcessorFactory.CreateTracesProcessor() -type CreateTraceProcessor func(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.TracesConsumer) (component.TracesProcessor, error) +type CreateTraceProcessor func(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.Traces) (component.TracesProcessor, error) // CreateMetricsProcessor is the equivalent of component.ProcessorFactory.CreateMetricsProcessor() -type CreateMetricsProcessor func(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.MetricsConsumer) (component.MetricsProcessor, error) +type CreateMetricsProcessor func(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.Metrics) (component.MetricsProcessor, error) // CreateLogsProcessor is the equivalent of component.ProcessorFactory.CreateLogsProcessor() -type CreateLogsProcessor func(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.LogsConsumer) (component.LogsProcessor, error) +type CreateLogsProcessor func(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.Logs) (component.LogsProcessor, error) type factory struct { cfgType configmodels.Type @@ -109,27 +109,37 @@ func (f *factory) CreateDefaultConfig() configmodels.Processor { } // CreateTraceProcessor creates a component.TracesProcessor based on this config. -func (f *factory) CreateTracesProcessor(ctx context.Context, params component.ProcessorCreateParams, cfg configmodels.Processor, nextConsumer consumer.TracesConsumer) (component.TracesProcessor, error) { +func (f *factory) CreateTracesProcessor( + ctx context.Context, + params component.ProcessorCreateParams, + cfg configmodels.Processor, + nextConsumer consumer.Traces, +) (component.TracesProcessor, error) { if f.createTraceProcessor != nil { return f.createTraceProcessor(ctx, params, cfg, nextConsumer) } return nil, configerror.ErrDataTypeIsNotSupported } -// CreateMetricsProcessor creates a consumer.MetricsConsumer based on this config. -func (f *factory) CreateMetricsProcessor(ctx context.Context, params component.ProcessorCreateParams, cfg configmodels.Processor, nextConsumer consumer.MetricsConsumer) (component.MetricsProcessor, error) { +// CreateMetricsProcessor creates a component.MetricsProcessor based on this config. +func (f *factory) CreateMetricsProcessor( + ctx context.Context, + params component.ProcessorCreateParams, + cfg configmodels.Processor, + nextConsumer consumer.Metrics, +) (component.MetricsProcessor, error) { if f.createMetricsProcessor != nil { return f.createMetricsProcessor(ctx, params, cfg, nextConsumer) } return nil, configerror.ErrDataTypeIsNotSupported } -// CreateLogsProcessor creates a metrics processor based on this config. +// CreateLogsProcessor creates a component.LogsProcessor based on this config. func (f *factory) CreateLogsProcessor( ctx context.Context, params component.ProcessorCreateParams, cfg configmodels.Processor, - nextConsumer consumer.LogsConsumer, + nextConsumer consumer.Logs, ) (component.LogsProcessor, error) { if f.createLogsProcessor != nil { return f.createLogsProcessor(ctx, params, cfg, nextConsumer) diff --git a/processor/processorhelper/factory_test.go b/processor/processorhelper/factory_test.go index 690a334fbbe..03e7960aa8b 100644 --- a/processor/processorhelper/factory_test.go +++ b/processor/processorhelper/factory_test.go @@ -79,15 +79,15 @@ func defaultConfig() configmodels.Processor { return defaultCfg } -func createTraceProcessor(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.TracesConsumer) (component.TracesProcessor, error) { +func createTraceProcessor(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.Traces) (component.TracesProcessor, error) { return nil, nil } -func createMetricsProcessor(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.MetricsConsumer) (component.MetricsProcessor, error) { +func createMetricsProcessor(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.Metrics) (component.MetricsProcessor, error) { return nil, nil } -func createLogsProcessor(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.LogsConsumer) (component.LogsProcessor, error) { +func createLogsProcessor(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.Logs) (component.LogsProcessor, error) { return nil, nil } diff --git a/processor/processorhelper/processor.go b/processor/processorhelper/processor.go index 0846d9d2846..e68339f39c1 100644 --- a/processor/processorhelper/processor.go +++ b/processor/processorhelper/processor.go @@ -132,7 +132,7 @@ func (bp *baseProcessor) GetCapabilities() component.ProcessorCapabilities { type tracesProcessor struct { baseProcessor processor TProcessor - nextConsumer consumer.TracesConsumer + nextConsumer consumer.Traces } func (tp *tracesProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error { @@ -151,7 +151,7 @@ func (tp *tracesProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) e // TODO: Add observability metrics support func NewTraceProcessor( config configmodels.Processor, - nextConsumer consumer.TracesConsumer, + nextConsumer consumer.Traces, processor TProcessor, options ...Option, ) (component.TracesProcessor, error) { @@ -173,7 +173,7 @@ func NewTraceProcessor( type metricsProcessor struct { baseProcessor processor MProcessor - nextConsumer consumer.MetricsConsumer + nextConsumer consumer.Metrics } func (mp *metricsProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { @@ -195,7 +195,7 @@ func (mp *metricsProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics // TODO: Add observability metrics support func NewMetricsProcessor( config configmodels.Processor, - nextConsumer consumer.MetricsConsumer, + nextConsumer consumer.Metrics, processor MProcessor, options ...Option, ) (component.MetricsProcessor, error) { @@ -217,7 +217,7 @@ func NewMetricsProcessor( type logProcessor struct { baseProcessor processor LProcessor - nextConsumer consumer.LogsConsumer + nextConsumer consumer.Logs } func (lp *logProcessor) ConsumeLogs(ctx context.Context, ld pdata.Logs) error { @@ -236,7 +236,7 @@ func (lp *logProcessor) ConsumeLogs(ctx context.Context, ld pdata.Logs) error { // TODO: Add observability metrics support func NewLogsProcessor( config configmodels.Processor, - nextConsumer consumer.LogsConsumer, + nextConsumer consumer.Logs, processor LProcessor, options ...Option, ) (component.LogsProcessor, error) { diff --git a/processor/resourceprocessor/factory.go b/processor/resourceprocessor/factory.go index c5e0d8fddc8..5aab38b1b0d 100644 --- a/processor/resourceprocessor/factory.go +++ b/processor/resourceprocessor/factory.go @@ -55,7 +55,7 @@ func createTraceProcessor( _ context.Context, _ component.ProcessorCreateParams, cfg configmodels.Processor, - nextConsumer consumer.TracesConsumer) (component.TracesProcessor, error) { + nextConsumer consumer.Traces) (component.TracesProcessor, error) { attrProc, err := createAttrProcessor(cfg.(*Config)) if err != nil { return nil, err @@ -71,7 +71,7 @@ func createMetricsProcessor( _ context.Context, _ component.ProcessorCreateParams, cfg configmodels.Processor, - nextConsumer consumer.MetricsConsumer) (component.MetricsProcessor, error) { + nextConsumer consumer.Metrics) (component.MetricsProcessor, error) { attrProc, err := createAttrProcessor(cfg.(*Config)) if err != nil { return nil, err @@ -87,7 +87,7 @@ func createLogsProcessor( _ context.Context, _ component.ProcessorCreateParams, cfg configmodels.Processor, - nextConsumer consumer.LogsConsumer) (component.LogsProcessor, error) { + nextConsumer consumer.Logs) (component.LogsProcessor, error) { attrProc, err := createAttrProcessor(cfg.(*Config)) if err != nil { return nil, err diff --git a/processor/spanprocessor/factory.go b/processor/spanprocessor/factory.go index 0d835953216..6271490e6bb 100644 --- a/processor/spanprocessor/factory.go +++ b/processor/spanprocessor/factory.go @@ -58,7 +58,7 @@ func createTraceProcessor( _ context.Context, _ component.ProcessorCreateParams, cfg configmodels.Processor, - nextConsumer consumer.TracesConsumer, + nextConsumer consumer.Traces, ) (component.TracesProcessor, error) { // 'from_attributes' or 'to_attributes' under 'name' has to be set for the span diff --git a/receiver/fluentforwardreceiver/collector.go b/receiver/fluentforwardreceiver/collector.go index 1a54c2d45c2..8e46ad299fa 100644 --- a/receiver/fluentforwardreceiver/collector.go +++ b/receiver/fluentforwardreceiver/collector.go @@ -30,12 +30,12 @@ import ( // instances from several Forward events into one to hopefully reduce // allocations and GC overhead. type Collector struct { - nextConsumer consumer.LogsConsumer + nextConsumer consumer.Logs eventCh <-chan Event logger *zap.Logger } -func newCollector(eventCh <-chan Event, next consumer.LogsConsumer, logger *zap.Logger) *Collector { +func newCollector(eventCh <-chan Event, next consumer.Logs, logger *zap.Logger) *Collector { return &Collector{ nextConsumer: next, eventCh: eventCh, diff --git a/receiver/fluentforwardreceiver/factory.go b/receiver/fluentforwardreceiver/factory.go index 5061ad64112..80ee701130e 100644 --- a/receiver/fluentforwardreceiver/factory.go +++ b/receiver/fluentforwardreceiver/factory.go @@ -50,7 +50,7 @@ func createLogsReceiver( _ context.Context, params component.ReceiverCreateParams, cfg configmodels.Receiver, - consumer consumer.LogsConsumer, + consumer consumer.Logs, ) (component.LogsReceiver, error) { rCfg := cfg.(*Config) diff --git a/receiver/fluentforwardreceiver/receiver.go b/receiver/fluentforwardreceiver/receiver.go index 3b110baac8d..e5080a7488d 100644 --- a/receiver/fluentforwardreceiver/receiver.go +++ b/receiver/fluentforwardreceiver/receiver.go @@ -38,7 +38,7 @@ type fluentReceiver struct { cancel context.CancelFunc } -func newFluentReceiver(logger *zap.Logger, conf *Config, next consumer.LogsConsumer) (component.LogsReceiver, error) { +func newFluentReceiver(logger *zap.Logger, conf *Config, next consumer.Logs) (component.LogsReceiver, error) { eventCh := make(chan Event, eventChannelLength) collector := newCollector(eventCh, next, logger) diff --git a/receiver/hostmetricsreceiver/factory.go b/receiver/hostmetricsreceiver/factory.go index 9530175fc09..612b07cc8ef 100644 --- a/receiver/hostmetricsreceiver/factory.go +++ b/receiver/hostmetricsreceiver/factory.go @@ -145,7 +145,7 @@ func createMetricsReceiver( ctx context.Context, params component.ReceiverCreateParams, cfg configmodels.Receiver, - consumer consumer.MetricsConsumer, + consumer consumer.Metrics, ) (component.MetricsReceiver, error) { oCfg := cfg.(*Config) diff --git a/receiver/jaegerreceiver/factory.go b/receiver/jaegerreceiver/factory.go index 162e2f536f5..9bd940384f2 100644 --- a/receiver/jaegerreceiver/factory.go +++ b/receiver/jaegerreceiver/factory.go @@ -145,7 +145,7 @@ func createTraceReceiver( _ context.Context, params component.ReceiverCreateParams, cfg configmodels.Receiver, - nextConsumer consumer.TracesConsumer, + nextConsumer consumer.Traces, ) (component.TracesReceiver, error) { // Convert settings in the source config to configuration struct diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index d1a06900bed..35a17d3ccd2 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -77,7 +77,7 @@ type jReceiver struct { // mu protects the fields of this type mu sync.Mutex - nextConsumer consumer.TracesConsumer + nextConsumer consumer.Traces instanceName string startOnce sync.Once @@ -117,7 +117,7 @@ var ( func newJaegerReceiver( instanceName string, config *configuration, - nextConsumer consumer.TracesConsumer, + nextConsumer consumer.Traces, params component.ReceiverCreateParams, ) *jReceiver { return &jReceiver{ @@ -240,7 +240,7 @@ func (jr *jReceiver) Shutdown(context.Context) error { return err } -func consumeTraces(ctx context.Context, batch *jaeger.Batch, consumer consumer.TracesConsumer) (int, error) { +func consumeTraces(ctx context.Context, batch *jaeger.Batch, consumer consumer.Traces) (int, error) { if batch == nil { return 0, nil } @@ -255,7 +255,7 @@ var _ configmanager.ClientConfigManager = (*jReceiver)(nil) type agentHandler struct { name string transport string - nextConsumer consumer.TracesConsumer + nextConsumer consumer.Traces } // EmitZipkinBatch is unsupported agent's diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index f6cd6ee65ad..b5a28ee3a8f 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -96,7 +96,7 @@ func (f *kafkaReceiverFactory) createTraceReceiver( _ context.Context, params component.ReceiverCreateParams, cfg configmodels.Receiver, - nextConsumer consumer.TracesConsumer, + nextConsumer consumer.Traces, ) (component.TracesReceiver, error) { c := cfg.(*Config) r, err := newReceiver(*c, params, f.unmarshalers, nextConsumer) diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 8703305c7ae..d6f42b6777a 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -40,7 +40,7 @@ var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding") type kafkaConsumer struct { name string consumerGroup sarama.ConsumerGroup - nextConsumer consumer.TracesConsumer + nextConsumer consumer.Traces topics []string cancelConsumeLoop context.CancelFunc unmarshaller Unmarshaller @@ -50,7 +50,7 @@ type kafkaConsumer struct { var _ component.Receiver = (*kafkaConsumer)(nil) -func newReceiver(config Config, params component.ReceiverCreateParams, unmarshalers map[string]Unmarshaller, nextConsumer consumer.TracesConsumer) (*kafkaConsumer, error) { +func newReceiver(config Config, params component.ReceiverCreateParams, unmarshalers map[string]Unmarshaller, nextConsumer consumer.Traces) (*kafkaConsumer, error) { unmarshaller := unmarshalers[config.Encoding] if unmarshaller == nil { return nil, errUnrecognizedEncoding @@ -124,7 +124,7 @@ func (c *kafkaConsumer) Shutdown(context.Context) error { type consumerGroupHandler struct { name string unmarshaller Unmarshaller - nextConsumer consumer.TracesConsumer + nextConsumer consumer.Traces ready chan bool readyCloser sync.Once diff --git a/receiver/opencensusreceiver/factory.go b/receiver/opencensusreceiver/factory.go index 484e37dcbc4..02ac5996b50 100644 --- a/receiver/opencensusreceiver/factory.go +++ b/receiver/opencensusreceiver/factory.go @@ -59,7 +59,7 @@ func createTraceReceiver( _ context.Context, _ component.ReceiverCreateParams, cfg configmodels.Receiver, - nextConsumer consumer.TracesConsumer, + nextConsumer consumer.Traces, ) (component.TracesReceiver, error) { r, err := createReceiver(cfg) if err != nil { @@ -75,7 +75,7 @@ func createMetricsReceiver( _ context.Context, _ component.ReceiverCreateParams, cfg configmodels.Receiver, - nextConsumer consumer.MetricsConsumer, + nextConsumer consumer.Metrics, ) (component.MetricsReceiver, error) { r, err := createReceiver(cfg) if err != nil { diff --git a/receiver/opencensusreceiver/ocmetrics/opencensus.go b/receiver/opencensusreceiver/ocmetrics/opencensus.go index b005724a3a6..40dfbe45d9f 100644 --- a/receiver/opencensusreceiver/ocmetrics/opencensus.go +++ b/receiver/opencensusreceiver/ocmetrics/opencensus.go @@ -33,11 +33,11 @@ import ( type Receiver struct { agentmetricspb.UnimplementedMetricsServiceServer instanceName string - nextConsumer consumer.MetricsConsumer + nextConsumer consumer.Metrics } // New creates a new ocmetrics.Receiver reference. -func New(instanceName string, nextConsumer consumer.MetricsConsumer) (*Receiver, error) { +func New(instanceName string, nextConsumer consumer.Metrics) (*Receiver, error) { if nextConsumer == nil { return nil, componenterror.ErrNilNextConsumer } diff --git a/receiver/opencensusreceiver/ocmetrics/opencensus_test.go b/receiver/opencensusreceiver/ocmetrics/opencensus_test.go index 11c7c7f0283..ca1a9f538b7 100644 --- a/receiver/opencensusreceiver/ocmetrics/opencensus_test.go +++ b/receiver/opencensusreceiver/ocmetrics/opencensus_test.go @@ -350,7 +350,7 @@ func nodeToKey(n *commonpb.Node) string { return string(blob) } -func ocReceiverOnGRPCServer(t *testing.T, sr consumer.MetricsConsumer) (int, func()) { +func ocReceiverOnGRPCServer(t *testing.T, sr consumer.Metrics) (int, func()) { ln, err := net.Listen("tcp", "localhost:") require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) diff --git a/receiver/opencensusreceiver/octrace/opencensus.go b/receiver/opencensusreceiver/octrace/opencensus.go index 9185c60fd98..f9250d293cd 100644 --- a/receiver/opencensusreceiver/octrace/opencensus.go +++ b/receiver/opencensusreceiver/octrace/opencensus.go @@ -40,12 +40,12 @@ const ( // Receiver is the type used to handle spans from OpenCensus exporters. type Receiver struct { agenttracepb.UnimplementedTraceServiceServer - nextConsumer consumer.TracesConsumer + nextConsumer consumer.Traces instanceName string } // New creates a new opencensus.Receiver reference. -func New(instanceName string, nextConsumer consumer.TracesConsumer, opts ...Option) (*Receiver, error) { +func New(instanceName string, nextConsumer consumer.Traces, opts ...Option) (*Receiver, error) { if nextConsumer == nil { return nil, componenterror.ErrNilNextConsumer } diff --git a/receiver/opencensusreceiver/octrace/opencensus_test.go b/receiver/opencensusreceiver/octrace/opencensus_test.go index 98aef88066b..c4f49f17b23 100644 --- a/receiver/opencensusreceiver/octrace/opencensus_test.go +++ b/receiver/opencensusreceiver/octrace/opencensus_test.go @@ -366,7 +366,7 @@ func nodeToKey(n *commonpb.Node) string { return string(blob) } -func ocReceiverOnGRPCServer(t *testing.T, sr consumer.TracesConsumer) (int, func()) { +func ocReceiverOnGRPCServer(t *testing.T, sr consumer.Traces) (int, func()) { ln, err := net.Listen("tcp", "localhost:0") require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) diff --git a/receiver/opencensusreceiver/opencensus.go b/receiver/opencensusreceiver/opencensus.go index 09b3f146480..6ce93b944c2 100644 --- a/receiver/opencensusreceiver/opencensus.go +++ b/receiver/opencensusreceiver/opencensus.go @@ -52,8 +52,8 @@ type ocReceiver struct { traceReceiver *octrace.Receiver metricsReceiver *ocmetrics.Receiver - traceConsumer consumer.TracesConsumer - metricsConsumer consumer.MetricsConsumer + traceConsumer consumer.Traces + metricsConsumer consumer.Metrics stopOnce sync.Once startServerOnce sync.Once @@ -70,8 +70,8 @@ func newOpenCensusReceiver( instanceName string, transport string, addr string, - tc consumer.TracesConsumer, - mc consumer.MetricsConsumer, + tc consumer.Traces, + mc consumer.Metrics, opts ...ocOption, ) (*ocReceiver, error) { // TODO: (@odeke-em) use options to enable address binding changes. diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index 4cfb7568cc5..70d504ebec7 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -124,7 +124,7 @@ func createTraceReceiver( ctx context.Context, params component.ReceiverCreateParams, cfg configmodels.Receiver, - nextConsumer consumer.TracesConsumer, + nextConsumer consumer.Traces, ) (component.TracesReceiver, error) { r, err := createReceiver(cfg, params.Logger) if err != nil { @@ -141,7 +141,7 @@ func createMetricsReceiver( ctx context.Context, params component.ReceiverCreateParams, cfg configmodels.Receiver, - consumer consumer.MetricsConsumer, + consumer consumer.Metrics, ) (component.MetricsReceiver, error) { r, err := createReceiver(cfg, params.Logger) if err != nil { @@ -158,7 +158,7 @@ func createLogReceiver( ctx context.Context, params component.ReceiverCreateParams, cfg configmodels.Receiver, - consumer consumer.LogsConsumer, + consumer consumer.Logs, ) (component.LogsReceiver, error) { r, err := createReceiver(cfg, params.Logger) if err != nil { diff --git a/receiver/otlpreceiver/factory_test.go b/receiver/otlpreceiver/factory_test.go index 60ea0b9094b..d4fb010bbbb 100644 --- a/receiver/otlpreceiver/factory_test.go +++ b/receiver/otlpreceiver/factory_test.go @@ -250,7 +250,7 @@ func TestCreateLogReceiver(t *testing.T) { cfg *Config wantStartErr bool wantErr bool - sink consumer.LogsConsumer + sink consumer.Logs }{ { name: "default", diff --git a/receiver/otlpreceiver/logs/otlp.go b/receiver/otlpreceiver/logs/otlp.go index ef41687d7b8..814832833fc 100644 --- a/receiver/otlpreceiver/logs/otlp.go +++ b/receiver/otlpreceiver/logs/otlp.go @@ -32,11 +32,11 @@ const ( // Receiver is the type used to handle spans from OpenTelemetry exporters. type Receiver struct { instanceName string - nextConsumer consumer.LogsConsumer + nextConsumer consumer.Logs } // New creates a new Receiver reference. -func New(instanceName string, nextConsumer consumer.LogsConsumer) *Receiver { +func New(instanceName string, nextConsumer consumer.Logs) *Receiver { r := &Receiver{ instanceName: instanceName, nextConsumer: nextConsumer, diff --git a/receiver/otlpreceiver/logs/otlp_test.go b/receiver/otlpreceiver/logs/otlp_test.go index fe3f1008aae..07f313c957c 100644 --- a/receiver/otlpreceiver/logs/otlp_test.go +++ b/receiver/otlpreceiver/logs/otlp_test.go @@ -146,7 +146,7 @@ func makeLogsServiceClient(port int) (collectorlog.LogsServiceClient, func(), er return logClient, doneFn, nil } -func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.LogsConsumer) (int, func()) { +func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Logs) (int, func()) { ln, err := net.Listen("tcp", "localhost:") require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) diff --git a/receiver/otlpreceiver/metrics/otlp.go b/receiver/otlpreceiver/metrics/otlp.go index 268f265515c..80a4232020e 100644 --- a/receiver/otlpreceiver/metrics/otlp.go +++ b/receiver/otlpreceiver/metrics/otlp.go @@ -32,11 +32,11 @@ const ( // Receiver is the type used to handle metrics from OpenTelemetry exporters. type Receiver struct { instanceName string - nextConsumer consumer.MetricsConsumer + nextConsumer consumer.Metrics } // New creates a new Receiver reference. -func New(instanceName string, nextConsumer consumer.MetricsConsumer) *Receiver { +func New(instanceName string, nextConsumer consumer.Metrics) *Receiver { r := &Receiver{ instanceName: instanceName, nextConsumer: nextConsumer, diff --git a/receiver/otlpreceiver/metrics/otlp_test.go b/receiver/otlpreceiver/metrics/otlp_test.go index 5122d083639..e7f82dbcfe3 100644 --- a/receiver/otlpreceiver/metrics/otlp_test.go +++ b/receiver/otlpreceiver/metrics/otlp_test.go @@ -193,7 +193,7 @@ func makeMetricsServiceClient(port int) (collectormetrics.MetricsServiceClient, return metricsClient, doneFn, nil } -func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.MetricsConsumer) (int, func()) { +func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) (int, func()) { ln, err := net.Listen("tcp", "localhost:") require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index 2057abdca44..34f642e8f2d 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -198,7 +198,7 @@ func (r *otlpReceiver) Shutdown(ctx context.Context) error { return err } -func (r *otlpReceiver) registerTraceConsumer(ctx context.Context, tc consumer.TracesConsumer) error { +func (r *otlpReceiver) registerTraceConsumer(ctx context.Context, tc consumer.Traces) error { if tc == nil { return componenterror.ErrNilNextConsumer } @@ -217,7 +217,7 @@ func (r *otlpReceiver) registerTraceConsumer(ctx context.Context, tc consumer.Tr return nil } -func (r *otlpReceiver) registerMetricsConsumer(ctx context.Context, mc consumer.MetricsConsumer) error { +func (r *otlpReceiver) registerMetricsConsumer(ctx context.Context, mc consumer.Metrics) error { if mc == nil { return componenterror.ErrNilNextConsumer } @@ -231,7 +231,7 @@ func (r *otlpReceiver) registerMetricsConsumer(ctx context.Context, mc consumer. return nil } -func (r *otlpReceiver) registerLogsConsumer(ctx context.Context, tc consumer.LogsConsumer) error { +func (r *otlpReceiver) registerLogsConsumer(ctx context.Context, tc consumer.Logs) error { if tc == nil { return componenterror.ErrNilNextConsumer } diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index 100c6894df1..bb8e4266e94 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -713,7 +713,7 @@ func TestHTTPInvalidTLSCredentials(t *testing.T) { `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) } -func newGRPCReceiver(t *testing.T, name string, endpoint string, tc consumer.TracesConsumer, mc consumer.MetricsConsumer) *otlpReceiver { +func newGRPCReceiver(t *testing.T, name string, endpoint string, tc consumer.Traces, mc consumer.Metrics) *otlpReceiver { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) cfg.SetName(name) @@ -722,7 +722,7 @@ func newGRPCReceiver(t *testing.T, name string, endpoint string, tc consumer.Tra return newReceiver(t, factory, cfg, tc, mc) } -func newHTTPReceiver(t *testing.T, endpoint string, tc consumer.TracesConsumer, mc consumer.MetricsConsumer) *otlpReceiver { +func newHTTPReceiver(t *testing.T, endpoint string, tc consumer.Traces, mc consumer.Metrics) *otlpReceiver { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) cfg.SetName(otlpReceiverName) @@ -731,7 +731,7 @@ func newHTTPReceiver(t *testing.T, endpoint string, tc consumer.TracesConsumer, return newReceiver(t, factory, cfg, tc, mc) } -func newReceiver(t *testing.T, factory component.ReceiverFactory, cfg *Config, tc consumer.TracesConsumer, mc consumer.MetricsConsumer) *otlpReceiver { +func newReceiver(t *testing.T, factory component.ReceiverFactory, cfg *Config, tc consumer.Traces, mc consumer.Metrics) *otlpReceiver { r, err := createReceiver(cfg, zap.NewNop()) require.NoError(t, err) if tc != nil { diff --git a/receiver/otlpreceiver/trace/otlp.go b/receiver/otlpreceiver/trace/otlp.go index b6b36bf8ac6..2781f5d6b5d 100644 --- a/receiver/otlpreceiver/trace/otlp.go +++ b/receiver/otlpreceiver/trace/otlp.go @@ -33,11 +33,11 @@ const ( // Receiver is the type used to handle spans from OpenTelemetry exporters. type Receiver struct { instanceName string - nextConsumer consumer.TracesConsumer + nextConsumer consumer.Traces } // New creates a new Receiver reference. -func New(instanceName string, nextConsumer consumer.TracesConsumer) *Receiver { +func New(instanceName string, nextConsumer consumer.Traces) *Receiver { r := &Receiver{ instanceName: instanceName, nextConsumer: nextConsumer, diff --git a/receiver/otlpreceiver/trace/otlp_test.go b/receiver/otlpreceiver/trace/otlp_test.go index ae2fe8cfb9e..2c65e2c1db7 100644 --- a/receiver/otlpreceiver/trace/otlp_test.go +++ b/receiver/otlpreceiver/trace/otlp_test.go @@ -150,7 +150,7 @@ func makeTraceServiceClient(port int) (collectortrace.TraceServiceClient, func() return metricsClient, doneFn, nil } -func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.TracesConsumer) (int, func()) { +func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) (int, func()) { ln, err := net.Listen("tcp", "localhost:") require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) diff --git a/receiver/prometheusreceiver/factory.go b/receiver/prometheusreceiver/factory.go index 46890fa382c..b0e78fc2b60 100644 --- a/receiver/prometheusreceiver/factory.go +++ b/receiver/prometheusreceiver/factory.go @@ -97,7 +97,7 @@ func createMetricsReceiver( _ context.Context, params component.ReceiverCreateParams, cfg configmodels.Receiver, - nextConsumer consumer.MetricsConsumer, + nextConsumer consumer.Metrics, ) (component.MetricsReceiver, error) { config := cfg.(*Config) if config.PrometheusConfig == nil || len(config.PrometheusConfig.ScrapeConfigs) == 0 { diff --git a/receiver/prometheusreceiver/internal/ocastore.go b/receiver/prometheusreceiver/internal/ocastore.go index e019d9bf4be..f933639fbcc 100644 --- a/receiver/prometheusreceiver/internal/ocastore.go +++ b/receiver/prometheusreceiver/internal/ocastore.go @@ -41,7 +41,7 @@ type OcaStore struct { ctx context.Context running int32 // access atomically - sink consumer.MetricsConsumer + sink consumer.Metrics mc *metadataService jobsMap *JobsMap useStartTimeMetric bool @@ -52,7 +52,7 @@ type OcaStore struct { } // NewOcaStore returns an ocaStore instance, which can be acted as prometheus' scrape.Appendable -func NewOcaStore(ctx context.Context, sink consumer.MetricsConsumer, logger *zap.Logger, jobsMap *JobsMap, useStartTimeMetric bool, startTimeMetricRegex string, receiverName string) *OcaStore { +func NewOcaStore(ctx context.Context, sink consumer.Metrics, logger *zap.Logger, jobsMap *JobsMap, useStartTimeMetric bool, startTimeMetricRegex string, receiverName string) *OcaStore { return &OcaStore{ running: runningStateInit, ctx: ctx, diff --git a/receiver/prometheusreceiver/internal/transaction.go b/receiver/prometheusreceiver/internal/transaction.go index 9ebe2e7e294..1e92b29e2a1 100644 --- a/receiver/prometheusreceiver/internal/transaction.go +++ b/receiver/prometheusreceiver/internal/transaction.go @@ -58,7 +58,7 @@ type transaction struct { id int64 ctx context.Context isNew bool - sink consumer.MetricsConsumer + sink consumer.Metrics job string instance string jobsMap *JobsMap @@ -72,7 +72,7 @@ type transaction struct { logger *zap.Logger } -func newTransaction(ctx context.Context, jobsMap *JobsMap, useStartTimeMetric bool, startTimeMetricRegex string, receiverName string, ms *metadataService, sink consumer.MetricsConsumer, logger *zap.Logger) *transaction { +func newTransaction(ctx context.Context, jobsMap *JobsMap, useStartTimeMetric bool, startTimeMetricRegex string, receiverName string, ms *metadataService, sink consumer.Metrics, logger *zap.Logger) *transaction { return &transaction{ id: atomic.AddInt64(&idSeq, 1), ctx: ctx, diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 6230a1ad116..1dbd808b056 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -33,14 +33,14 @@ const transport = "http" // pReceiver is the type that provides Prometheus scraper/receiver functionality. type pReceiver struct { cfg *Config - consumer consumer.MetricsConsumer + consumer consumer.Metrics cancelFunc context.CancelFunc logger *zap.Logger } // New creates a new prometheus.Receiver reference. -func newPrometheusReceiver(logger *zap.Logger, cfg *Config, next consumer.MetricsConsumer) *pReceiver { +func newPrometheusReceiver(logger *zap.Logger, cfg *Config, next consumer.Metrics) *pReceiver { pr := &pReceiver{ cfg: cfg, consumer: next, diff --git a/receiver/receiverhelper/factory.go b/receiver/receiverhelper/factory.go index 206b2777275..f583aabfbc1 100644 --- a/receiver/receiverhelper/factory.go +++ b/receiver/receiverhelper/factory.go @@ -60,13 +60,13 @@ func WithLogs(createLogsReceiver CreateLogsReceiver) FactoryOption { type CreateDefaultConfig func() configmodels.Receiver // CreateTraceReceiver is the equivalent of component.ReceiverFactory.CreateTracesReceiver() -type CreateTraceReceiver func(context.Context, component.ReceiverCreateParams, configmodels.Receiver, consumer.TracesConsumer) (component.TracesReceiver, error) +type CreateTraceReceiver func(context.Context, component.ReceiverCreateParams, configmodels.Receiver, consumer.Traces) (component.TracesReceiver, error) // CreateMetricsReceiver is the equivalent of component.ReceiverFactory.CreateMetricsReceiver() -type CreateMetricsReceiver func(context.Context, component.ReceiverCreateParams, configmodels.Receiver, consumer.MetricsConsumer) (component.MetricsReceiver, error) +type CreateMetricsReceiver func(context.Context, component.ReceiverCreateParams, configmodels.Receiver, consumer.Metrics) (component.MetricsReceiver, error) // CreateLogsReceiver is the equivalent of component.ReceiverFactory.CreateLogsReceiver() -type CreateLogsReceiver func(context.Context, component.ReceiverCreateParams, configmodels.Receiver, consumer.LogsConsumer) (component.LogsReceiver, error) +type CreateLogsReceiver func(context.Context, component.ReceiverCreateParams, configmodels.Receiver, consumer.Logs) (component.LogsReceiver, error) type factory struct { cfgType configmodels.Type @@ -113,19 +113,19 @@ func (f *factory) CreateTracesReceiver( ctx context.Context, params component.ReceiverCreateParams, cfg configmodels.Receiver, - nextConsumer consumer.TracesConsumer) (component.TracesReceiver, error) { + nextConsumer consumer.Traces) (component.TracesReceiver, error) { if f.createTraceReceiver != nil { return f.createTraceReceiver(ctx, params, cfg, nextConsumer) } return nil, configerror.ErrDataTypeIsNotSupported } -// CreateMetricsReceiver creates a consumer.MetricsConsumer based on this config. +// CreateMetricsReceiver creates a component.MetricsReceiver based on this config. func (f *factory) CreateMetricsReceiver( ctx context.Context, params component.ReceiverCreateParams, cfg configmodels.Receiver, - nextConsumer consumer.MetricsConsumer) (component.MetricsReceiver, error) { + nextConsumer consumer.Metrics) (component.MetricsReceiver, error) { if f.createMetricsReceiver != nil { return f.createMetricsReceiver(ctx, params, cfg, nextConsumer) } @@ -137,7 +137,7 @@ func (f *factory) CreateLogsReceiver( ctx context.Context, params component.ReceiverCreateParams, cfg configmodels.Receiver, - nextConsumer consumer.LogsConsumer, + nextConsumer consumer.Logs, ) (component.LogsReceiver, error) { if f.createLogsReceiver != nil { return f.createLogsReceiver(ctx, params, cfg, nextConsumer) diff --git a/receiver/receiverhelper/factory_test.go b/receiver/receiverhelper/factory_test.go index a215cd6d060..9a7ceecbdf5 100644 --- a/receiver/receiverhelper/factory_test.go +++ b/receiver/receiverhelper/factory_test.go @@ -79,15 +79,15 @@ func defaultConfig() configmodels.Receiver { return defaultCfg } -func createTraceReceiver(context.Context, component.ReceiverCreateParams, configmodels.Receiver, consumer.TracesConsumer) (component.TracesReceiver, error) { +func createTraceReceiver(context.Context, component.ReceiverCreateParams, configmodels.Receiver, consumer.Traces) (component.TracesReceiver, error) { return nil, nil } -func createMetricsReceiver(context.Context, component.ReceiverCreateParams, configmodels.Receiver, consumer.MetricsConsumer) (component.MetricsReceiver, error) { +func createMetricsReceiver(context.Context, component.ReceiverCreateParams, configmodels.Receiver, consumer.Metrics) (component.MetricsReceiver, error) { return nil, nil } -func createLogsReceiver(context.Context, component.ReceiverCreateParams, configmodels.Receiver, consumer.LogsConsumer) (component.LogsReceiver, error) { +func createLogsReceiver(context.Context, component.ReceiverCreateParams, configmodels.Receiver, consumer.Logs) (component.LogsReceiver, error) { return nil, nil } diff --git a/receiver/scraperhelper/scrapercontroller.go b/receiver/scraperhelper/scrapercontroller.go index 5d07ffb9740..1c76b347928 100644 --- a/receiver/scraperhelper/scrapercontroller.go +++ b/receiver/scraperhelper/scrapercontroller.go @@ -90,7 +90,7 @@ type controller struct { name string logger *zap.Logger collectionInterval time.Duration - nextConsumer consumer.MetricsConsumer + nextConsumer consumer.Metrics metricsScrapers *multiMetricScraper resourceMetricScrapers []ResourceMetricsScraper @@ -106,7 +106,7 @@ type controller struct { func NewScraperControllerReceiver( cfg *ScraperControllerSettings, logger *zap.Logger, - nextConsumer consumer.MetricsConsumer, + nextConsumer consumer.Metrics, options ...ScraperControllerOption, ) (component.Receiver, error) { if nextConsumer == nil { diff --git a/receiver/scraperhelper/scrapercontroller_test.go b/receiver/scraperhelper/scrapercontroller_test.go index f2ef6a3863d..a211c69d6c1 100644 --- a/receiver/scraperhelper/scrapercontroller_test.go +++ b/receiver/scraperhelper/scrapercontroller_test.go @@ -201,7 +201,7 @@ func TestScrapeController(t *testing.T) { tickerCh := make(chan time.Time) options = append(options, WithTickerChannel(tickerCh)) - var nextConsumer consumer.MetricsConsumer + var nextConsumer consumer.Metrics sink := new(consumertest.MetricsSink) if !test.nilNextConsumer { nextConsumer = sink diff --git a/receiver/zipkinreceiver/factory.go b/receiver/zipkinreceiver/factory.go index 8d10e5cbc10..2940323b0c9 100644 --- a/receiver/zipkinreceiver/factory.go +++ b/receiver/zipkinreceiver/factory.go @@ -61,7 +61,7 @@ func createTraceReceiver( _ context.Context, _ component.ReceiverCreateParams, cfg configmodels.Receiver, - nextConsumer consumer.TracesConsumer, + nextConsumer consumer.Traces, ) (component.TracesReceiver, error) { rCfg := cfg.(*Config) return New(rCfg, nextConsumer) diff --git a/receiver/zipkinreceiver/trace_receiver.go b/receiver/zipkinreceiver/trace_receiver.go index 94768b9ff2e..004e3395db4 100644 --- a/receiver/zipkinreceiver/trace_receiver.go +++ b/receiver/zipkinreceiver/trace_receiver.go @@ -56,7 +56,7 @@ type ZipkinReceiver struct { // addr is the address onto which the HTTP server will be bound host component.Host - nextConsumer consumer.TracesConsumer + nextConsumer consumer.Traces instanceName string startOnce sync.Once @@ -69,7 +69,7 @@ type ZipkinReceiver struct { var _ http.Handler = (*ZipkinReceiver)(nil) // New creates a new zipkinreceiver.ZipkinReceiver reference. -func New(config *Config, nextConsumer consumer.TracesConsumer) (*ZipkinReceiver, error) { +func New(config *Config, nextConsumer consumer.Traces) (*ZipkinReceiver, error) { if nextConsumer == nil { return nil, componenterror.ErrNilNextConsumer } diff --git a/receiver/zipkinreceiver/trace_receiver_test.go b/receiver/zipkinreceiver/trace_receiver_test.go index b466277f579..b70ba8aee6e 100644 --- a/receiver/zipkinreceiver/trace_receiver_test.go +++ b/receiver/zipkinreceiver/trace_receiver_test.go @@ -54,7 +54,7 @@ const zipkinReceiverName = "zipkin_receiver_test" func TestNew(t *testing.T) { type args struct { address string - nextConsumer consumer.TracesConsumer + nextConsumer consumer.Traces } tests := []struct { name string diff --git a/service/internal/builder/pipelines_builder.go b/service/internal/builder/pipelines_builder.go index 782260af7c3..bc37706072c 100644 --- a/service/internal/builder/pipelines_builder.go +++ b/service/internal/builder/pipelines_builder.go @@ -32,9 +32,9 @@ import ( // processor in the pipeline or the exporter if pipeline has no processors). type builtPipeline struct { logger *zap.Logger - firstTC consumer.TracesConsumer - firstMC consumer.MetricsConsumer - firstLC consumer.LogsConsumer + firstTC consumer.Traces + firstMC consumer.Metrics + firstLC consumer.Logs // MutatesConsumedData is set to true if any processors in the pipeline // can mutate the TraceData or MetricsData input argument. @@ -118,9 +118,9 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf // BuildProcessors the pipeline backwards. // First create a consumer junction point that fans out the data to all exporters. - var tc consumer.TracesConsumer - var mc consumer.MetricsConsumer - var lc consumer.LogsConsumer + var tc consumer.Traces + var mc consumer.Metrics + var lc consumer.Logs switch pipelineCfg.InputType { case configmodels.TracesDataType: @@ -225,10 +225,10 @@ func (pb *pipelinesBuilder) getBuiltExportersByNames(exporterNames []string) []* return result } -func (pb *pipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []string) consumer.TracesConsumer { +func (pb *pipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []string) consumer.Traces { builtExporters := pb.getBuiltExportersByNames(exporterNames) - var exporters []consumer.TracesConsumer + var exporters []consumer.Traces for _, builtExp := range builtExporters { exporters = append(exporters, builtExp.getTraceExporter()) } @@ -237,10 +237,10 @@ func (pb *pipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []st return fanoutconsumer.NewTraces(exporters) } -func (pb *pipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.MetricsConsumer { +func (pb *pipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.Metrics { builtExporters := pb.getBuiltExportersByNames(exporterNames) - var exporters []consumer.MetricsConsumer + var exporters []consumer.Metrics for _, builtExp := range builtExporters { exporters = append(exporters, builtExp.getMetricExporter()) } @@ -249,10 +249,10 @@ func (pb *pipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames [] return fanoutconsumer.NewMetrics(exporters) } -func (pb *pipelinesBuilder) buildFanoutExportersLogConsumer(exporterNames []string) consumer.LogsConsumer { +func (pb *pipelinesBuilder) buildFanoutExportersLogConsumer(exporterNames []string) consumer.Logs { builtExporters := pb.getBuiltExportersByNames(exporterNames) - exporters := make([]consumer.LogsConsumer, len(builtExporters)) + exporters := make([]consumer.Logs, len(builtExporters)) for i, builtExp := range builtExporters { exporters[i] = builtExp.getLogExporter() } diff --git a/service/internal/builder/pipelines_builder_test.go b/service/internal/builder/pipelines_builder_test.go index 984d0b0070a..89bc68c1f00 100644 --- a/service/internal/builder/pipelines_builder_test.go +++ b/service/internal/builder/pipelines_builder_test.go @@ -160,7 +160,7 @@ func TestBuildPipelines_BuildVarious(t *testing.T) { // Send one custom data. log := pdata.Logs{} - processor.firstLC.(consumer.LogsConsumer).ConsumeLogs(context.Background(), log) + processor.firstLC.(consumer.Logs).ConsumeLogs(context.Background(), log) // Now verify received data. for _, expConsumer := range exporterConsumers { @@ -221,7 +221,7 @@ func testPipeline(t *testing.T, pipelineName string, exporterNames []string) { } td := testdata.GenerateTraceDataOneSpan() - processor.firstTC.(consumer.TracesConsumer).ConsumeTraces(context.Background(), td) + processor.firstTC.(consumer.Traces).ConsumeTraces(context.Background(), td) // Now verify received data. for _, expConsumer := range exporterConsumers { diff --git a/service/internal/builder/receivers_builder.go b/service/internal/builder/receivers_builder.go index 276dec2677b..55ba6bdb098 100644 --- a/service/internal/builder/receivers_builder.go +++ b/service/internal/builder/receivers_builder.go @@ -271,13 +271,13 @@ func (rb *receiversBuilder) buildReceiver(ctx context.Context, logger *zap.Logge return rcv, nil } -func buildFanoutTraceConsumer(pipelines []*builtPipeline) consumer.TracesConsumer { +func buildFanoutTraceConsumer(pipelines []*builtPipeline) consumer.Traces { // Optimize for the case when there is only one processor, no need to create junction point. if len(pipelines) == 1 { return pipelines[0].firstTC } - var pipelineConsumers []consumer.TracesConsumer + var pipelineConsumers []consumer.Traces anyPipelineMutatesData := false for _, pipeline := range pipelines { pipelineConsumers = append(pipelineConsumers, pipeline.firstTC) @@ -296,13 +296,13 @@ func buildFanoutTraceConsumer(pipelines []*builtPipeline) consumer.TracesConsume return fanoutconsumer.NewTraces(pipelineConsumers) } -func buildFanoutMetricConsumer(pipelines []*builtPipeline) consumer.MetricsConsumer { +func buildFanoutMetricConsumer(pipelines []*builtPipeline) consumer.Metrics { // Optimize for the case when there is only one processor, no need to create junction point. if len(pipelines) == 1 { return pipelines[0].firstMC } - var pipelineConsumers []consumer.MetricsConsumer + var pipelineConsumers []consumer.Metrics anyPipelineMutatesData := false for _, pipeline := range pipelines { pipelineConsumers = append(pipelineConsumers, pipeline.firstMC) @@ -321,13 +321,13 @@ func buildFanoutMetricConsumer(pipelines []*builtPipeline) consumer.MetricsConsu return fanoutconsumer.NewMetrics(pipelineConsumers) } -func buildFanoutLogConsumer(pipelines []*builtPipeline) consumer.LogsConsumer { +func buildFanoutLogConsumer(pipelines []*builtPipeline) consumer.Logs { // Optimize for the case when there is only one processor, no need to create junction point. if len(pipelines) == 1 { return pipelines[0].firstLC } - var pipelineConsumers []consumer.LogsConsumer + var pipelineConsumers []consumer.Logs anyPipelineMutatesData := false for _, pipeline := range pipelines { pipelineConsumers = append(pipelineConsumers, pipeline.firstLC) diff --git a/testbed/testbed/mock_backend.go b/testbed/testbed/mock_backend.go index e57d74dcf71..5ec59a560d2 100644 --- a/testbed/testbed/mock_backend.go +++ b/testbed/testbed/mock_backend.go @@ -154,7 +154,7 @@ func (mb *MockBackend) ConsumeMetric(md pdata.Metrics) { } } -var _ consumer.TracesConsumer = (*MockTraceConsumer)(nil) +var _ consumer.Traces = (*MockTraceConsumer)(nil) func (mb *MockBackend) ConsumeLogs(ld pdata.Logs) { mb.recordMutex.Lock() @@ -205,7 +205,7 @@ func (tc *MockTraceConsumer) ConsumeTraces(_ context.Context, td pdata.Traces) e return nil } -var _ consumer.MetricsConsumer = (*MockMetricConsumer)(nil) +var _ consumer.Metrics = (*MockMetricConsumer)(nil) type MockMetricConsumer struct { numMetricsReceived atomic.Uint64 diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index 748506add98..af0d901e416 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -43,7 +43,7 @@ import ( // from Collector and the corresponding entity in the Collector that sends this data is // an exporter. type DataReceiver interface { - Start(tc consumer.TracesConsumer, mc consumer.MetricsConsumer, lc consumer.LogsConsumer) error + Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error Stop() error // Generate a config string to place in exporter part of collector config @@ -98,7 +98,7 @@ func NewOCDataReceiver(port int) *OCDataReceiver { return &OCDataReceiver{DataReceiverBase: DataReceiverBase{Port: port}} } -func (or *OCDataReceiver) Start(tc consumer.TracesConsumer, mc consumer.MetricsConsumer, _ consumer.LogsConsumer) error { +func (or *OCDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, _ consumer.Logs) error { factory := opencensusreceiver.NewFactory() cfg := factory.CreateDefaultConfig().(*opencensusreceiver.Config) cfg.SetName(or.ProtocolName()) @@ -153,7 +153,7 @@ func NewJaegerDataReceiver(port int) *JaegerDataReceiver { return &JaegerDataReceiver{DataReceiverBase: DataReceiverBase{Port: port}} } -func (jr *JaegerDataReceiver) Start(tc consumer.TracesConsumer, _ consumer.MetricsConsumer, _ consumer.LogsConsumer) error { +func (jr *JaegerDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, _ consumer.Logs) error { factory := jaegerreceiver.NewFactory() cfg := factory.CreateDefaultConfig().(*jaegerreceiver.Config) cfg.SetName(jr.ProtocolName()) @@ -197,7 +197,7 @@ type BaseOTLPDataReceiver struct { compression string } -func (bor *BaseOTLPDataReceiver) Start(tc consumer.TracesConsumer, mc consumer.MetricsConsumer, lc consumer.LogsConsumer) error { +func (bor *BaseOTLPDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error { factory := otlpreceiver.NewFactory() cfg := factory.CreateDefaultConfig().(*otlpreceiver.Config) cfg.SetName(bor.exporterType) @@ -299,7 +299,7 @@ func NewZipkinDataReceiver(port int) *ZipkinDataReceiver { return &ZipkinDataReceiver{DataReceiverBase: DataReceiverBase{Port: port}} } -func (zr *ZipkinDataReceiver) Start(tc consumer.TracesConsumer, _ consumer.MetricsConsumer, _ consumer.LogsConsumer) error { +func (zr *ZipkinDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, _ consumer.Logs) error { factory := zipkinreceiver.NewFactory() cfg := factory.CreateDefaultConfig().(*zipkinreceiver.Config) cfg.SetName(zr.ProtocolName()) @@ -345,7 +345,7 @@ func NewPrometheusDataReceiver(port int) *PrometheusDataReceiver { return &PrometheusDataReceiver{DataReceiverBase: DataReceiverBase{Port: port}} } -func (dr *PrometheusDataReceiver) Start(_ consumer.TracesConsumer, mc consumer.MetricsConsumer, _ consumer.LogsConsumer) error { +func (dr *PrometheusDataReceiver) Start(_ consumer.Traces, mc consumer.Metrics, _ consumer.Logs) error { factory := prometheusreceiver.NewFactory() cfg := factory.CreateDefaultConfig().(*prometheusreceiver.Config) addr := fmt.Sprintf("0.0.0.0:%d", dr.Port) diff --git a/testbed/testbed/senders.go b/testbed/testbed/senders.go index 3f1681ac222..328ea2eb2a3 100644 --- a/testbed/testbed/senders.go +++ b/testbed/testbed/senders.go @@ -66,21 +66,21 @@ type DataSender interface { // to send a batch of Spans to the DataSender interface. type TraceDataSender interface { DataSender - consumer.TracesConsumer + consumer.Traces } // MetricDataSender defines the interface that allows sending metric data. It adds ability // to send a batch of Metrics to the DataSender interface. type MetricDataSender interface { DataSender - consumer.MetricsConsumer + consumer.Metrics } // LogDataSender defines the interface that allows sending log data. It adds ability // to send a batch of Logs to the DataSender interface. type LogDataSender interface { DataSender - consumer.LogsConsumer + consumer.Logs } type DataSenderBase struct { @@ -117,7 +117,7 @@ func (dsb *DataSenderBase) Flush() { // JaegerGRPCDataSender implements TraceDataSender for Jaeger thrift_http exporterType. type JaegerGRPCDataSender struct { DataSenderBase - consumer.TracesConsumer + consumer.Traces } // Ensure JaegerGRPCDataSender implements TraceDataSender. @@ -148,7 +148,7 @@ func (je *JaegerGRPCDataSender) Start() error { return err } - je.TracesConsumer = exp + je.Traces = exp return exp.Start(context.Background(), je) } @@ -190,7 +190,7 @@ func (ods *ocDataSender) ProtocolName() string { // OCTraceDataSender implements TraceDataSender for OpenCensus trace exporterType. type OCTraceDataSender struct { ocDataSender - consumer.TracesConsumer + consumer.Traces } // Ensure OCTraceDataSender implements TraceDataSender. @@ -217,14 +217,14 @@ func (ote *OCTraceDataSender) Start() error { return err } - ote.TracesConsumer = exp + ote.Traces = exp return exp.Start(context.Background(), ote) } // OCMetricsDataSender implements MetricDataSender for OpenCensus metrics exporterType. type OCMetricsDataSender struct { ocDataSender - consumer.MetricsConsumer + consumer.Metrics } // Ensure OCMetricsDataSender implements MetricDataSender. @@ -251,7 +251,7 @@ func (ome *OCMetricsDataSender) Start() error { return err } - ome.MetricsConsumer = exp + ome.Metrics = exp return exp.Start(context.Background(), ome) } @@ -287,7 +287,7 @@ func (ods *otlpHTTPDataSender) ProtocolName() string { // OTLPHTTPTraceDataSender implements TraceDataSender for OTLP/HTTP trace exporterType. type OTLPHTTPTraceDataSender struct { otlpHTTPDataSender - consumer.TracesConsumer + consumer.Traces } // Ensure OTLPHTTPTraceDataSender implements TraceDataSender. @@ -313,14 +313,14 @@ func (ote *OTLPHTTPTraceDataSender) Start() error { return err } - ote.TracesConsumer = exp + ote.Traces = exp return exp.Start(context.Background(), ote) } // OTLPHTTPMetricsDataSender implements MetricDataSender for OTLP/HTTP metrics exporterType. type OTLPHTTPMetricsDataSender struct { otlpHTTPDataSender - consumer.MetricsConsumer + consumer.Metrics } // Ensure OTLPHTTPMetricsDataSender implements MetricDataSender. @@ -347,14 +347,14 @@ func (ome *OTLPHTTPMetricsDataSender) Start() error { return err } - ome.MetricsConsumer = exp + ome.Metrics = exp return exp.Start(context.Background(), ome) } // OTLPHTTPLogsDataSender implements LogsDataSender for OTLP/HTTP logs exporterType. type OTLPHTTPLogsDataSender struct { otlpHTTPDataSender - consumer.LogsConsumer + consumer.Logs } // Ensure OTLPHTTPLogsDataSender implements MetricDataSender. @@ -381,7 +381,7 @@ func (olds *OTLPHTTPLogsDataSender) Start() error { return err } - olds.LogsConsumer = exp + olds.Logs = exp return exp.Start(context.Background(), olds) } @@ -417,7 +417,7 @@ func (ods *otlpDataSender) ProtocolName() string { // OTLPTraceDataSender implements TraceDataSender for OTLP trace exporterType. type OTLPTraceDataSender struct { otlpDataSender - consumer.TracesConsumer + consumer.Traces } // Ensure OTLPTraceDataSender implements TraceDataSender. @@ -443,14 +443,14 @@ func (ote *OTLPTraceDataSender) Start() error { return err } - ote.TracesConsumer = exp + ote.Traces = exp return exp.Start(context.Background(), ote) } // OTLPMetricsDataSender implements MetricDataSender for OTLP metrics exporterType. type OTLPMetricsDataSender struct { otlpDataSender - consumer.MetricsConsumer + consumer.Metrics } // Ensure OTLPMetricsDataSender implements MetricDataSender. @@ -477,14 +477,14 @@ func (ome *OTLPMetricsDataSender) Start() error { return err } - ome.MetricsConsumer = exp + ome.Metrics = exp return exp.Start(context.Background(), ome) } // OTLPLogsDataSender implements LogsDataSender for OTLP logs exporterType. type OTLPLogsDataSender struct { otlpDataSender - consumer.LogsConsumer + consumer.Logs } // Ensure OTLPLogsDataSender implements LogDataSender. @@ -511,14 +511,14 @@ func (olds *OTLPLogsDataSender) Start() error { return err } - olds.LogsConsumer = exp + olds.Logs = exp return exp.Start(context.Background(), olds) } // ZipkinDataSender implements TraceDataSender for Zipkin http exporterType. type ZipkinDataSender struct { DataSenderBase - consumer.TracesConsumer + consumer.Traces } // Ensure ZipkinDataSender implements TraceDataSender. @@ -549,7 +549,7 @@ func (zs *ZipkinDataSender) Start() error { return err } - zs.TracesConsumer = exp + zs.Traces = exp return exp.Start(context.Background(), zs) } @@ -567,7 +567,7 @@ func (zs *ZipkinDataSender) ProtocolName() string { type PrometheusDataSender struct { DataSenderBase - consumer.MetricsConsumer + consumer.Metrics namespace string } @@ -593,7 +593,7 @@ func (pds *PrometheusDataSender) Start() error { return err } - pds.MetricsConsumer = exp + pds.Metrics = exp return exp.Start(context.Background(), pds) }