Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Refactor service/host into service/internal/graph #10854

Merged
merged 4 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 0 additions & 74 deletions service/host.go

This file was deleted.

164 changes: 164 additions & 0 deletions service/internal/graph/host.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package graph // import "go.opentelemetry.io/collector/service/internal/graph"

import (
"net/http"
"path"
"runtime"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service/extensions"
"go.opentelemetry.io/collector/service/internal/zpages"
)

// TODO: remove as part of https://github.com/open-telemetry/opentelemetry-collector/issues/7370 for service 1.0
type getExporters interface {
GetExporters() map[component.DataType]map[component.ID]component.Component
}

var _ getExporters = (*Host)(nil)
var _ component.Host = (*Host)(nil)

type Host struct {
mx-psi marked this conversation as resolved.
Show resolved Hide resolved
AsyncErrorChannel chan error
Receivers *receiver.Builder
Processors *processor.Builder
Exporters *exporter.Builder
Connectors *connector.Builder
Extensions *extension.Builder

BuildInfo component.BuildInfo

Pipelines *Graph
ServiceExtensions *extensions.Extensions
}

func (host *Host) GetFactory(kind component.Kind, componentType component.Type) component.Factory {
switch kind {
case component.KindReceiver:
return host.Receivers.Factory(componentType)
case component.KindProcessor:
return host.Processors.Factory(componentType)
case component.KindExporter:
return host.Exporters.Factory(componentType)
case component.KindConnector:
return host.Connectors.Factory(componentType)
case component.KindExtension:
return host.Extensions.Factory(componentType)
}
return nil
}

func (host *Host) GetExtensions() map[component.ID]component.Component {
return host.ServiceExtensions.GetExtensions()
}

// Deprecated: [0.79.0] This function will be removed in the future.
// Several components in the contrib repository use this function so it cannot be removed
// before those cases are removed. In most cases, use of this function can be replaced by a
// connector. See https://github.com/open-telemetry/opentelemetry-collector/issues/7370 and
// https://github.com/open-telemetry/opentelemetry-collector/pull/7390#issuecomment-1483710184
// for additional information.
func (host *Host) GetExporters() map[component.DataType]map[component.ID]component.Component {
return host.Pipelines.GetExporters()
}

func (host *Host) NotifyComponentStatusChange(source *component.InstanceID, event *component.StatusEvent) {
host.ServiceExtensions.NotifyComponentStatusChange(source, event)
if event.Status() == component.StatusFatalError {
host.AsyncErrorChannel <- event.Err()
}
}

const (
// Paths
zServicePath = "servicez"
zPipelinePath = "pipelinez"
zExtensionPath = "extensionz"
zFeaturePath = "featurez"
)

var (
// InfoVar is a singleton instance of the Info struct.
runtimeInfoVar [][2]string
)

func init() {
runtimeInfoVar = [][2]string{
{"StartTimestamp", time.Now().String()},
{"Go", runtime.Version()},
{"OS", runtime.GOOS},
{"Arch", runtime.GOARCH},
// Add other valuable runtime information here.
}
}

func (host *Host) RegisterZPages(mux *http.ServeMux, pathPrefix string) {
mux.HandleFunc(path.Join(pathPrefix, zServicePath), host.zPagesRequest)
mux.HandleFunc(path.Join(pathPrefix, zPipelinePath), host.Pipelines.HandleZPages)
mux.HandleFunc(path.Join(pathPrefix, zExtensionPath), host.ServiceExtensions.HandleZPages)
mux.HandleFunc(path.Join(pathPrefix, zFeaturePath), handleFeaturezRequest)
}

func (host *Host) zPagesRequest(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Service " + host.BuildInfo.Command})
zpages.WriteHTMLPropertiesTable(w, zpages.PropertiesTableData{Name: "Build Info", Properties: getBuildInfoProperties(host.BuildInfo)})
zpages.WriteHTMLPropertiesTable(w, zpages.PropertiesTableData{Name: "Runtime Info", Properties: runtimeInfoVar})
zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{
Name: "Pipelines",
ComponentEndpoint: zPipelinePath,
Link: true,
})
zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{
Name: "Extensions",
ComponentEndpoint: zExtensionPath,
Link: true,
})
zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{
Name: "Features",
ComponentEndpoint: zFeaturePath,
Link: true,
})
zpages.WriteHTMLPageFooter(w)
}

func handleFeaturezRequest(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Feature Gates"})
zpages.WriteHTMLFeaturesTable(w, getFeaturesTableData())
zpages.WriteHTMLPageFooter(w)
}

func getFeaturesTableData() zpages.FeatureGateTableData {
data := zpages.FeatureGateTableData{}
featuregate.GlobalRegistry().VisitAll(func(gate *featuregate.Gate) {
data.Rows = append(data.Rows, zpages.FeatureGateTableRowData{
ID: gate.ID(),
Enabled: gate.IsEnabled(),
Description: gate.Description(),
Stage: gate.Stage().String(),
FromVersion: gate.FromVersion(),
ToVersion: gate.ToVersion(),
ReferenceURL: gate.ReferenceURL(),
})
})
return data
}

func getBuildInfoProperties(buildInfo component.BuildInfo) [][2]string {
return [][2]string{
{"Command", buildInfo.Command},
{"Description", buildInfo.Description},
{"Version", buildInfo.Version},
}
}
40 changes: 20 additions & 20 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type Settings struct {
type Service struct {
buildInfo component.BuildInfo
telemetrySettings component.TelemetrySettings
host *serviceHost
host *graph.Host
collectorConf *confmap.Conf

reporter status.Reporter
Expand All @@ -81,14 +81,14 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
extendedConfig := obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled()
srv := &Service{
buildInfo: set.BuildInfo,
host: &serviceHost{
receivers: set.Receivers,
processors: set.Processors,
exporters: set.Exporters,
connectors: set.Connectors,
extensions: set.Extensions,
buildInfo: set.BuildInfo,
asyncErrorChannel: set.AsyncErrorChannel,
host: &graph.Host{
Receivers: set.Receivers,
Processors: set.Processors,
Exporters: set.Exporters,
Connectors: set.Connectors,
Extensions: set.Extensions,
BuildInfo: set.BuildInfo,
AsyncErrorChannel: set.AsyncErrorChannel,
},
collectorConf: set.CollectorConf,
}
Expand Down Expand Up @@ -136,7 +136,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
// Construct telemetry attributes from build info and config's resource attributes.
Resource: pcommonRes,
}
srv.reporter = status.NewReporter(srv.host.notifyComponentStatusChange, func(err error) {
srv.reporter = status.NewReporter(srv.host.NotifyComponentStatusChange, func(err error) {
if errors.Is(err, status.ErrStatusNotReady) {
logger.Warn("Invalid transition", zap.Error(err))
}
Expand Down Expand Up @@ -200,21 +200,21 @@ func (srv *Service) Start(ctx context.Context) error {
// enable status reporting
srv.reporter.Ready()

if err := srv.host.serviceExtensions.Start(ctx, srv.host); err != nil {
if err := srv.host.ServiceExtensions.Start(ctx, srv.host); err != nil {
return fmt.Errorf("failed to start extensions: %w", err)
}

if srv.collectorConf != nil {
if err := srv.host.serviceExtensions.NotifyConfig(ctx, srv.collectorConf); err != nil {
if err := srv.host.ServiceExtensions.NotifyConfig(ctx, srv.collectorConf); err != nil {
return err
}
}

if err := srv.host.pipelines.StartAll(ctx, srv.host, srv.reporter); err != nil {
if err := srv.host.Pipelines.StartAll(ctx, srv.host, srv.reporter); err != nil {
return fmt.Errorf("cannot start pipelines: %w", err)
}

if err := srv.host.serviceExtensions.NotifyPipelineReady(); err != nil {
if err := srv.host.ServiceExtensions.NotifyPipelineReady(); err != nil {
return err
}

Expand Down Expand Up @@ -257,15 +257,15 @@ func (srv *Service) Shutdown(ctx context.Context) error {
// Begin shutdown sequence.
srv.telemetrySettings.Logger.Info("Starting shutdown...")

if err := srv.host.serviceExtensions.NotifyPipelineNotReady(); err != nil {
if err := srv.host.ServiceExtensions.NotifyPipelineNotReady(); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to notify that pipeline is not ready: %w", err))
}

if err := srv.host.pipelines.ShutdownAll(ctx, srv.reporter); err != nil {
if err := srv.host.Pipelines.ShutdownAll(ctx, srv.reporter); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown pipelines: %w", err))
}

if err := srv.host.serviceExtensions.Shutdown(ctx); err != nil {
if err := srv.host.ServiceExtensions.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown extensions: %w", err))
}

Expand All @@ -282,9 +282,9 @@ func (srv *Service) initExtensions(ctx context.Context, cfg extensions.Config) e
extensionsSettings := extensions.Settings{
Telemetry: srv.telemetrySettings,
BuildInfo: srv.buildInfo,
Extensions: srv.host.extensions,
Extensions: srv.host.Extensions,
}
if srv.host.serviceExtensions, err = extensions.New(ctx, extensionsSettings, cfg, extensions.WithReporter(srv.reporter)); err != nil {
if srv.host.ServiceExtensions, err = extensions.New(ctx, extensionsSettings, cfg, extensions.WithReporter(srv.reporter)); err != nil {
return fmt.Errorf("failed to build extensions: %w", err)
}
return nil
Expand All @@ -293,7 +293,7 @@ func (srv *Service) initExtensions(ctx context.Context, cfg extensions.Config) e
// Creates the pipeline graph.
func (srv *Service) initGraph(ctx context.Context, set Settings, cfg Config) error {
var err error
if srv.host.pipelines, err = graph.Build(ctx, graph.Settings{
if srv.host.Pipelines, err = graph.Build(ctx, graph.Settings{
Telemetry: srv.telemetrySettings,
BuildInfo: srv.buildInfo,
ReceiverBuilder: set.Receivers,
Expand Down
4 changes: 2 additions & 2 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,10 +438,10 @@ func TestServiceFatalError(t *testing.T) {

go func() {
ev := component.NewFatalErrorEvent(assert.AnError)
srv.host.notifyComponentStatusChange(&component.InstanceID{}, ev)
srv.host.NotifyComponentStatusChange(&component.InstanceID{}, ev)
}()

err = <-srv.host.asyncErrorChannel
err = <-srv.host.AsyncErrorChannel

require.ErrorIs(t, err, assert.AnError)
}
Expand Down
Loading
Loading