Skip to content

Commit

Permalink
[chore] embed build[Exporter|Processor|Receiver|Connector] into nodes (
Browse files Browse the repository at this point in the history
…#7358)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Mar 13, 2023
1 parent a70c017 commit 10ae965
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 346 deletions.
48 changes: 31 additions & 17 deletions service/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,29 @@ import (
"gonum.org/v1/gonum/graph/topo"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
"go.opentelemetry.io/collector/service/internal/fanoutconsumer"
)

// graphSettings holds configuration for building builtPipelines.
type graphSettings struct {
Telemetry component.TelemetrySettings
BuildInfo component.BuildInfo

ReceiverBuilder *receiver.Builder
ProcessorBuilder *processor.Builder
ExporterBuilder *exporter.Builder
ConnectorBuilder *connector.Builder

// PipelineConfigs is a map of component.ID to PipelineConfig.
PipelineConfigs map[component.ID]*PipelineConfig
}

type pipelinesGraph struct {
// All component instances represented as nodes, with directed edges indicating data flow.
componentGraph *simple.DirectedGraph
Expand All @@ -39,7 +57,7 @@ type pipelinesGraph struct {
pipelines map[component.ID]*pipelineNodes
}

func buildPipelinesGraph(ctx context.Context, set pipelinesSettings) (*pipelinesGraph, error) {
func buildPipelinesGraph(ctx context.Context, set graphSettings) (*pipelinesGraph, error) {
pipelines := &pipelinesGraph{
componentGraph: simple.NewDirectedGraph(),
pipelines: make(map[component.ID]*pipelineNodes, len(set.PipelineConfigs)),
Expand All @@ -56,7 +74,7 @@ func buildPipelinesGraph(ctx context.Context, set pipelinesSettings) (*pipelines
}

// Creates a node for each instance of a component and adds it to the graph
func (g *pipelinesGraph) createNodes(set pipelinesSettings) {
func (g *pipelinesGraph) createNodes(set graphSettings) {
// Keep track of connectors and where they are used. (map[connectorID][]pipelineID)
connectorsAsExporter := make(map[component.ID][]component.ID)
connectorsAsReceiver := make(map[component.ID][]component.ID)
Expand All @@ -68,7 +86,7 @@ func (g *pipelinesGraph) createNodes(set pipelinesSettings) {
connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID)
continue
}
rcvrNode := g.createReceiver(pipelineID, recvID)
rcvrNode := g.createReceiver(pipelineID.Type(), recvID)
pipe.receivers[rcvrNode.ID()] = rcvrNode
}

Expand All @@ -85,7 +103,7 @@ func (g *pipelinesGraph) createNodes(set pipelinesSettings) {
connectorsAsExporter[exprID] = append(connectorsAsExporter[exprID], pipelineID)
continue
}
expNode := g.createExporter(pipelineID, exprID)
expNode := g.createExporter(pipelineID.Type(), exprID)
pipe.exporters[expNode.ID()] = expNode
}
}
Expand All @@ -101,8 +119,8 @@ func (g *pipelinesGraph) createNodes(set pipelinesSettings) {
}
}

func (g *pipelinesGraph) createReceiver(pipelineID, recvID component.ID) *receiverNode {
rcvrNode := newReceiverNode(pipelineID, recvID)
func (g *pipelinesGraph) createReceiver(pipelineType component.DataType, recvID component.ID) *receiverNode {
rcvrNode := newReceiverNode(pipelineType, recvID)
if node := g.componentGraph.Node(rcvrNode.ID()); node != nil {
return node.(*receiverNode)
}
Expand All @@ -116,8 +134,8 @@ func (g *pipelinesGraph) createProcessor(pipelineID, procID component.ID) *proce
return procNode
}

func (g *pipelinesGraph) createExporter(pipelineID, exprID component.ID) *exporterNode {
expNode := newExporterNode(pipelineID, exprID)
func (g *pipelinesGraph) createExporter(pipelineType component.DataType, exprID component.ID) *exporterNode {
expNode := newExporterNode(pipelineType, exprID)
if node := g.componentGraph.Node(expNode.ID()); node != nil {
return node.(*exporterNode)
}
Expand Down Expand Up @@ -156,7 +174,7 @@ func (g *pipelinesGraph) createEdges() {
}
}

func (g *pipelinesGraph) buildComponents(ctx context.Context, set pipelinesSettings) error {
func (g *pipelinesGraph) buildComponents(ctx context.Context, set graphSettings) error {
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
return cycleErr(err, topo.DirectedCyclesIn(g.componentGraph))
Expand All @@ -166,17 +184,13 @@ func (g *pipelinesGraph) buildComponents(ctx context.Context, set pipelinesSetti
node := nodes[i]
switch n := node.(type) {
case *receiverNode:
n.Component, err = buildReceiver(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ReceiverBuilder,
component.NewIDWithName(n.pipelineType, "*"), g.nextConsumers(n.ID()))
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ReceiverBuilder, g.nextConsumers(n.ID()))
case *processorNode:
n.Component, err = buildProcessor(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ProcessorBuilder,
n.pipelineID, g.nextConsumers(n.ID())[0])
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ProcessorBuilder, g.nextConsumers(n.ID())[0])
case *exporterNode:
n.Component, err = buildExporter(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ExporterBuilder,
component.NewIDWithName(n.pipelineType, "*"))
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ExporterBuilder)
case *connectorNode:
n.Component, err = buildConnector(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ConnectorBuilder,
n.exprPipelineType, n.rcvrPipelineType, g.nextConsumers(n.ID()))
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ConnectorBuilder, g.nextConsumers(n.ID()))
case *capabilitiesNode:
cap := consumer.Capabilities{MutatesData: false}
for _, proc := range g.pipelines[n.pipelineID].processors {
Expand Down
151 changes: 137 additions & 14 deletions service/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/connector/connectortest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/internal/testdata"
Expand Down Expand Up @@ -595,7 +597,7 @@ func TestConnectorPipelinesGraph(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Build the pipeline
set := pipelinesSettings{
set := graphSettings{
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
ReceiverBuilder: receiver.NewBuilder(
Expand Down Expand Up @@ -831,7 +833,7 @@ func TestConnectorRouter(t *testing.T) {
logsLeftID := component.NewIDWithName("logs", "left")

ctx := context.Background()
set := pipelinesSettings{
set := graphSettings{
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
ReceiverBuilder: receiver.NewBuilder(
Expand Down Expand Up @@ -1025,7 +1027,7 @@ func TestGraphBuildErrors(t *testing.T) {
Exporters: []component.ID{component.NewID("bf")},
},
},
expected: "failed to create \"bf\" exporter, in pipeline \"logs/*\": telemetry type is not supported",
expected: "failed to create \"bf\" exporter for data type \"logs\": telemetry type is not supported",
},
{
name: "not_supported_exporter_metrics",
Expand All @@ -1041,7 +1043,7 @@ func TestGraphBuildErrors(t *testing.T) {
Exporters: []component.ID{component.NewID("bf")},
},
},
expected: "failed to create \"bf\" exporter, in pipeline \"metrics/*\": telemetry type is not supported",
expected: "failed to create \"bf\" exporter for data type \"metrics\": telemetry type is not supported",
},
{
name: "not_supported_exporter_traces",
Expand All @@ -1057,7 +1059,7 @@ func TestGraphBuildErrors(t *testing.T) {
Exporters: []component.ID{component.NewID("bf")},
},
},
expected: "failed to create \"bf\" exporter, in pipeline \"traces/*\": telemetry type is not supported",
expected: "failed to create \"bf\" exporter for data type \"traces\": telemetry type is not supported",
},
{
name: "not_supported_processor_logs",
Expand Down Expand Up @@ -1133,7 +1135,7 @@ func TestGraphBuildErrors(t *testing.T) {
Exporters: []component.ID{component.NewID("nop")},
},
},
expected: "failed to create \"bf\" receiver, in pipeline \"logs/*\": telemetry type is not supported",
expected: "failed to create \"bf\" receiver for data type \"logs\": telemetry type is not supported",
},
{
name: "not_supported_receiver_metrics",
Expand All @@ -1149,7 +1151,7 @@ func TestGraphBuildErrors(t *testing.T) {
Exporters: []component.ID{component.NewID("nop")},
},
},
expected: "failed to create \"bf\" receiver, in pipeline \"metrics/*\": telemetry type is not supported",
expected: "failed to create \"bf\" receiver for data type \"metrics\": telemetry type is not supported",
},
{
name: "not_supported_receiver_traces",
Expand All @@ -1165,7 +1167,7 @@ func TestGraphBuildErrors(t *testing.T) {
Exporters: []component.ID{component.NewID("nop")},
},
},
expected: "failed to create \"bf\" receiver, in pipeline \"traces/*\": telemetry type is not supported",
expected: "failed to create \"bf\" receiver for data type \"traces\": telemetry type is not supported",
},
{
name: "not_supported_connector_traces_traces.yaml",
Expand Down Expand Up @@ -1664,7 +1666,7 @@ func TestGraphBuildErrors(t *testing.T) {
Exporters: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "1")},
},
},
expected: "failed to create \"nop/1\" exporter, in pipeline \"traces/*\": exporter \"nop/1\" is not configured",
expected: "failed to create \"nop/1\" exporter for data type \"traces\": exporter \"nop/1\" is not configured",
},
{
name: "unknown_exporter_factory",
Expand All @@ -1680,7 +1682,7 @@ func TestGraphBuildErrors(t *testing.T) {
Exporters: []component.ID{component.NewID("unknown")},
},
},
expected: "failed to create \"unknown\" exporter, in pipeline \"traces/*\": exporter factory not available for: \"unknown\"",
expected: "failed to create \"unknown\" exporter for data type \"traces\": exporter factory not available for: \"unknown\"",
},
{
name: "unknown_processor_config",
Expand Down Expand Up @@ -1736,7 +1738,7 @@ func TestGraphBuildErrors(t *testing.T) {
Exporters: []component.ID{component.NewID("nop")},
},
},
expected: "failed to create \"nop/1\" receiver, in pipeline \"logs/*\": receiver \"nop/1\" is not configured",
expected: "failed to create \"nop/1\" receiver for data type \"logs\": receiver \"nop/1\" is not configured",
},
{
name: "unknown_receiver_factory",
Expand All @@ -1752,7 +1754,7 @@ func TestGraphBuildErrors(t *testing.T) {
Exporters: []component.ID{component.NewID("nop")},
},
},
expected: "failed to create \"unknown\" receiver, in pipeline \"logs/*\": receiver factory not available for: \"unknown\"",
expected: "failed to create \"unknown\" receiver for data type \"logs\": receiver factory not available for: \"unknown\"",
},
{
name: "unknown_connector_factory",
Expand Down Expand Up @@ -1781,7 +1783,7 @@ func TestGraphBuildErrors(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
set := pipelinesSettings{
set := graphSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Telemetry: componenttest.NewNopTelemetrySettings(),
ReceiverBuilder: receiver.NewBuilder(
Expand Down Expand Up @@ -1828,7 +1830,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) {
nopExporterFactory := exportertest.NewNopFactory()
nopConnectorFactory := connectortest.NewNopFactory()

set := pipelinesSettings{
set := graphSettings{
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
ReceiverBuilder: receiver.NewBuilder(
Expand Down Expand Up @@ -2006,3 +2008,124 @@ func expectedInstances(m map[component.ID]*PipelineConfig, pID component.ID) (in
}
return r, e
}

func newBadReceiverFactory() receiver.Factory {
return receiver.NewFactory("bf", func() component.Config {
return &struct{}{}
})
}

func newBadProcessorFactory() processor.Factory {
return processor.NewFactory("bf", func() component.Config {
return &struct{}{}
})
}

func newBadExporterFactory() exporter.Factory {
return exporter.NewFactory("bf", func() component.Config {
return &struct{}{}
})
}

func newBadConnectorFactory() connector.Factory {
return connector.NewFactory("bf", func() component.Config {
return &struct{}{}
})
}

func newErrReceiverFactory() receiver.Factory {
return receiver.NewFactory("err",
func() component.Config { return &struct{}{} },
receiver.WithTraces(func(context.Context, receiver.CreateSettings, component.Config, consumer.Traces) (receiver.Traces, error) {
return &errComponent{}, nil
}, component.StabilityLevelUndefined),
receiver.WithLogs(func(context.Context, receiver.CreateSettings, component.Config, consumer.Logs) (receiver.Logs, error) {
return &errComponent{}, nil
}, component.StabilityLevelUndefined),
receiver.WithMetrics(func(context.Context, receiver.CreateSettings, component.Config, consumer.Metrics) (receiver.Metrics, error) {
return &errComponent{}, nil
}, component.StabilityLevelUndefined),
)
}

func newErrProcessorFactory() processor.Factory {
return processor.NewFactory("err",
func() component.Config { return &struct{}{} },
processor.WithTraces(func(context.Context, processor.CreateSettings, component.Config, consumer.Traces) (processor.Traces, error) {
return &errComponent{}, nil
}, component.StabilityLevelUndefined),
processor.WithLogs(func(context.Context, processor.CreateSettings, component.Config, consumer.Logs) (processor.Logs, error) {
return &errComponent{}, nil
}, component.StabilityLevelUndefined),
processor.WithMetrics(func(context.Context, processor.CreateSettings, component.Config, consumer.Metrics) (processor.Metrics, error) {
return &errComponent{}, nil
}, component.StabilityLevelUndefined),
)
}

func newErrExporterFactory() exporter.Factory {
return exporter.NewFactory("err",
func() component.Config { return &struct{}{} },
exporter.WithTraces(func(context.Context, exporter.CreateSettings, component.Config) (exporter.Traces, error) {
return &errComponent{}, nil
}, component.StabilityLevelUndefined),
exporter.WithLogs(func(context.Context, exporter.CreateSettings, component.Config) (exporter.Logs, error) {
return &errComponent{}, nil
}, component.StabilityLevelUndefined),
exporter.WithMetrics(func(context.Context, exporter.CreateSettings, component.Config) (exporter.Metrics, error) {
return &errComponent{}, nil
}, component.StabilityLevelUndefined),
)
}

func newErrConnectorFactory() connector.Factory {
return connector.NewFactory("err", func() component.Config {
return &struct{}{}
},
connector.WithTracesToTraces(func(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Traces, error) {
return &errComponent{}, nil
}, component.StabilityLevelUnmaintained),
connector.WithTracesToMetrics(func(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Traces, error) {
return &errComponent{}, nil
}, component.StabilityLevelUnmaintained),
connector.WithTracesToLogs(func(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Traces, error) {
return &errComponent{}, nil
}, component.StabilityLevelUnmaintained),

connector.WithMetricsToTraces(func(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Metrics, error) {
return &errComponent{}, nil
}, component.StabilityLevelUnmaintained),
connector.WithMetricsToMetrics(func(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Metrics, error) {
return &errComponent{}, nil
}, component.StabilityLevelUnmaintained),
connector.WithMetricsToLogs(func(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Metrics, error) {
return &errComponent{}, nil
}, component.StabilityLevelUnmaintained),

connector.WithLogsToTraces(func(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Logs, error) {
return &errComponent{}, nil
}, component.StabilityLevelUnmaintained),
connector.WithLogsToMetrics(func(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Logs, error) {
return &errComponent{}, nil
}, component.StabilityLevelUnmaintained),
connector.WithLogsToLogs(func(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Logs, error) {
return &errComponent{}, nil
}, component.StabilityLevelUnmaintained),
)
}

type errComponent struct {
consumertest.Consumer
}

func (e errComponent) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (e errComponent) Start(context.Context, component.Host) error {
return errors.New("my error")
}

func (e errComponent) Shutdown(context.Context) error {
return errors.New("my error")
}
Loading

0 comments on commit 10ae965

Please sign in to comment.