diff --git a/service/nodes.go b/service/nodes.go index 63612982f56..0640a4714c1 100644 --- a/service/nodes.go +++ b/service/nodes.go @@ -16,10 +16,16 @@ package service // import "go.opentelemetry.io/collector/service" import ( + "context" + "fmt" "hash/fnv" "strings" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/fanoutconsumer" ) type nodeID int64 @@ -103,3 +109,76 @@ func newConnectorNode(exprPipelineType, rcvrPipelineType component.DataType, con rcvrPipelineType: rcvrPipelineType, } } + +func (n *connectorNode) build( + ctx context.Context, + tel component.TelemetrySettings, + info component.BuildInfo, + builder *connector.Builder, + nexts []baseConsumer, +) error { + if len(nexts) == 0 { + return fmt.Errorf("connector %q has no next consumer", n.componentID) + } + set := connector.CreateSettings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} + set.TelemetrySettings.Logger = components.ConnectorLogger(set.TelemetrySettings.Logger, n.componentID, n.exprPipelineType, n.rcvrPipelineType) + + var err error + switch n.rcvrPipelineType { + case component.DataTypeTraces: + var consumers []consumer.Traces + for _, next := range nexts { + tracesConsumer, ok := next.(consumer.Traces) + if !ok { + return fmt.Errorf("next component is not a traces consumer: %s", n.componentID) + } + consumers = append(consumers, tracesConsumer) + } + fanoutConsumer := fanoutconsumer.NewTraces(consumers) + switch n.exprPipelineType { + case component.DataTypeTraces: + n.Component, err = builder.CreateTracesToTraces(ctx, set, fanoutConsumer) + case component.DataTypeMetrics: + n.Component, err = builder.CreateMetricsToTraces(ctx, set, fanoutConsumer) + case component.DataTypeLogs: + n.Component, err = builder.CreateLogsToTraces(ctx, set, fanoutConsumer) + } + case component.DataTypeMetrics: + var consumers []consumer.Metrics + for _, next := range nexts { + metricsConsumer, ok := next.(consumer.Metrics) + if !ok { + return fmt.Errorf("next component is not a metrics consumer: %s", n.componentID) + } + consumers = append(consumers, metricsConsumer) + } + fanoutConsumer := fanoutconsumer.NewMetrics(consumers) + switch n.exprPipelineType { + case component.DataTypeTraces: + n.Component, err = builder.CreateTracesToMetrics(ctx, set, fanoutConsumer) + case component.DataTypeMetrics: + n.Component, err = builder.CreateMetricsToMetrics(ctx, set, fanoutConsumer) + case component.DataTypeLogs: + n.Component, err = builder.CreateLogsToMetrics(ctx, set, fanoutConsumer) + } + case component.DataTypeLogs: + var consumers []consumer.Logs + for _, next := range nexts { + logsConsumer, ok := next.(consumer.Logs) + if !ok { + return fmt.Errorf("next component is not a logs consumer: %s", n.componentID) + } + consumers = append(consumers, logsConsumer) + } + fanoutConsumer := fanoutconsumer.NewLogs(consumers) + switch n.exprPipelineType { + case component.DataTypeTraces: + n.Component, err = builder.CreateTracesToLogs(ctx, set, fanoutConsumer) + case component.DataTypeMetrics: + n.Component, err = builder.CreateMetricsToLogs(ctx, set, fanoutConsumer) + case component.DataTypeLogs: + n.Component, err = builder.CreateLogsToLogs(ctx, set, fanoutConsumer) + } + } + return err +}