Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add all-in-one OTEL component #2262

Merged
merged 11 commits into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ run-crossdock.log
cmd/opentelemetry-collector/cmd/collector/opentelemetry-collector-*
cmd/opentelemetry-collector/cmd/agent/opentelemetry-agent-*
cmd/opentelemetry-collector/cmd/ingester/opentelemetry-ingester-*
cmd/opentelemetry-collector/cmd/all-in-one/opentelemetry-all-in-one-*
__pycache__
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ build-otel-agent:
build-otel-ingester:
cd ${OTEL_COLLECTOR_DIR}/cmd/ingester && $(GOBUILD) -o ./opentelemetry-ingester-$(GOOS)-$(GOARCH) $(BUILD_INFO) main.go

.PHONY: build-otel-all-in-one
build-otel-all-in-one:
cd ${OTEL_COLLECTOR_DIR}/cmd/all-in-one && $(GOBUILD) -o ./opentelemetry-all-in-one-$(GOOS)-$(GOARCH) $(BUILD_INFO) main.go

.PHONY: build-ingester
build-ingester:
$(GOBUILD) -o ./cmd/ingester/ingester-$(GOOS)-$(GOARCH) $(BUILD_INFO) ./cmd/ingester/main.go
Expand Down Expand Up @@ -278,7 +282,7 @@ build-binaries-arm64:
GOOS=linux GOARCH=arm64 $(MAKE) build-platform-binaries

.PHONY: build-platform-binaries
build-platform-binaries: build-agent build-collector build-query build-ingester build-all-in-one build-examples build-tracegen build-otel-collector build-otel-agent build-otel-ingester
build-platform-binaries: build-agent build-collector build-query build-ingester build-all-in-one build-examples build-tracegen build-otel-collector build-otel-agent build-otel-ingester build-otel-all-in-one

.PHONY: build-all-platforms
build-all-platforms: build-binaries-linux build-binaries-windows build-binaries-darwin build-binaries-s390x build-binaries-arm64
Expand All @@ -303,6 +307,7 @@ docker-images-jaeger-backend:
docker build -t $(DOCKER_NAMESPACE)/jaeger-opentelemetry-collector:${DOCKER_TAG} -f ${OTEL_COLLECTOR_DIR}/cmd/collector/Dockerfile cmd/opentelemetry-collector/cmd/collector --build-arg ARCH=$(GOARCH)
docker build -t $(DOCKER_NAMESPACE)/jaeger-opentelemetry-agent:${DOCKER_TAG} -f ${OTEL_COLLECTOR_DIR}/cmd/agent/Dockerfile cmd/opentelemetry-collector/cmd/agent --build-arg ARCH=$(GOARCH)
docker build -t $(DOCKER_NAMESPACE)/jaeger-opentelemetry-ingester:${DOCKER_TAG} -f ${OTEL_COLLECTOR_DIR}/cmd/ingester/Dockerfile cmd/opentelemetry-collector/cmd/ingester --build-arg ARCH=$(GOARCH)
docker build -t $(DOCKER_NAMESPACE)/jaeger-opentelemetry-all-in-one:${DOCKER_TAG} -f ${OTEL_COLLECTOR_DIR}/cmd/all-in-one/Dockerfile cmd/opentelemetry-collector/cmd/all-in-one --build-arg ARCH=$(GOARCH)

.PHONY: docker-images-tracegen
docker-images-tracegen:
Expand Down
7 changes: 6 additions & 1 deletion cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,12 @@ func startQuery(
) *queryApp.Server {
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, baseFactory.Namespace(metrics.NSOptions{Name: "query"}))
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
server := queryApp.NewServer(svc, qs, qOpts, opentracing.GlobalTracer())
server := queryApp.NewServer(svc.Logger, qs, qOpts, opentracing.GlobalTracer())
go func() {
pavolloffay marked this conversation as resolved.
Show resolved Hide resolved
for s := range server.HealthCheckStatus() {
svc.SetHealthCheckStatus(s)
}
}()
if err := server.Start(); err != nil {
svc.Logger.Fatal("Could not start jaeger-query service", zap.Error(err))
}
Expand Down
134 changes: 53 additions & 81 deletions cmd/opentelemetry-collector/app/defaults/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/extension/healthcheckextension"
"go.opentelemetry.io/collector/processor/resourceprocessor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/jaegerreceiver"
Expand All @@ -30,40 +29,60 @@ import (
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/grpcplugin"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/memory"
kafkaRec "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/kafka"
"github.com/jaegertracing/jaeger/ports"
)

const (
// Agent component
Agent ComponentType = iota
// Collector component
Collector
// Ingester component
Ingester
// AllInOne component
AllInOne

gRPCEndpoint = "localhost:14250"
httpThriftBinaryEndpoint = "localhost:14268"
udpThriftCompactEndpoint = "localhost:6831"
udpThriftBinaryEndpoint = "localhost:6832"
)

// CollectorConfig creates default collector configuration.
// It enables default Jaeger receivers, processors and exporters.
func CollectorConfig(storageType string, zipkinHostPort string, factories config.Factories) (*configmodels.Config, error) {
exporters, err := createExporters(storageType, factories)
// ComponentType defines component Jaeger type.
type ComponentType int

// ComponentSettings struct configures generation of the default config
type ComponentSettings struct {
ComponentType ComponentType
Factories config.Factories
StorageType string
ZipkinHostPort string
}

// CreateDefaultConfig creates default configuration.
func (c ComponentSettings) CreateDefaultConfig() (*configmodels.Config, error) {
exporters, err := createExporters(c.ComponentType, c.StorageType, c.Factories)
if err != nil {
return nil, err
}
receivers := createCollectorReceivers(zipkinHostPort, factories)
hc := factories.Extensions["health_check"].CreateDefaultConfig()
receivers := createReceivers(c.ComponentType, c.ZipkinHostPort, c.Factories)
processors := configmodels.Processors{}
resProcessor := factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config)
resProcessor := c.Factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config)
if len(resProcessor.Labels) > 0 {
processors[resProcessor.Name()] = resProcessor
}
hc := c.Factories.Extensions["health_check"].CreateDefaultConfig()
return &configmodels.Config{
Receivers: receivers,
Processors: processors,
Exporters: exporters,
Extensions: configmodels.Extensions{"health_check": hc},
Extensions: configmodels.Extensions{hc.Name(): hc},
Service: configmodels.Service{
Extensions: []string{"health_check"},
Extensions: []string{hc.Name()},
Pipelines: configmodels.Pipelines{
"traces": {
string(configmodels.TracesDataType): {
InputType: configmodels.TracesDataType,
Receivers: receiverNames(receivers),
Processors: processorNames(processors),
Expand All @@ -74,11 +93,15 @@ func CollectorConfig(storageType string, zipkinHostPort string, factories config
}, nil
}

func createCollectorReceivers(zipkinHostPort string, factories config.Factories) configmodels.Receivers {
jaeger := factories.Receivers["jaeger"].CreateDefaultConfig().(*jaegerreceiver.Config)
if jaeger.Protocols == nil {
jaeger.Protocols = map[string]*receiver.SecureReceiverSettings{}
func createReceivers(component ComponentType, zipkinHostPort string, factories config.Factories) configmodels.Receivers {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be func (c ComponentSettings) createReceivers() to avoid passing params? Same for createExporters.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have done this on purpose - be explicit on the required API contract.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The public API is easy to use. I like to keep the internal APIs clear in terms of the required arguments.

if component == Ingester {
kafkaReceiver := factories.Receivers[kafkaRec.TypeStr].CreateDefaultConfig().(*kafkaRec.Config)
return configmodels.Receivers{
kafkaReceiver.Name(): kafkaReceiver,
}
}

jaeger := factories.Receivers["jaeger"].CreateDefaultConfig().(*jaegerreceiver.Config)
// The CreateDefaultConfig is enabling protocols from flags
// we do not want to override it here
if _, ok := jaeger.Protocols["grpc"]; !ok {
Expand All @@ -95,21 +118,33 @@ func createCollectorReceivers(zipkinHostPort string, factories config.Factories)
},
}
}
if component == Agent || component == AllInOne {
enableAgentUDPEndpoints(jaeger)
}
recvs := map[string]configmodels.Receiver{
"jaeger": jaeger,
}
if zipkinHostPort != ports.PortToHostPort(0) {
if zipkinHostPort != "" && zipkinHostPort != ports.PortToHostPort(0) {
zipkin := factories.Receivers["zipkin"].CreateDefaultConfig().(*zipkinreceiver.Config)
zipkin.Endpoint = zipkinHostPort
recvs["zipkin"] = zipkin
}
return recvs
}

func createExporters(storageTypes string, factories config.Factories) (configmodels.Exporters, error) {
func createExporters(component ComponentType, storageTypes string, factories config.Factories) (configmodels.Exporters, error) {
if component == Agent {
jaegerExporter := factories.Exporters["jaeger"]
return configmodels.Exporters{
"jaeger": jaegerExporter.CreateDefaultConfig(),
}, nil
}
exporters := configmodels.Exporters{}
for _, s := range strings.Split(storageTypes, ",") {
switch s {
case "memory":
mem := factories.Exporters[memory.TypeStr].CreateDefaultConfig()
exporters[memory.TypeStr] = mem
case "cassandra":
cass := factories.Exporters[cassandra.TypeStr].CreateDefaultConfig()
exporters[cassandra.TypeStr] = cass
Expand All @@ -129,41 +164,7 @@ func createExporters(storageTypes string, factories config.Factories) (configmod
return exporters, nil
}

// AgentConfig creates default agent configuration.
// It enables Jaeger receiver with UDP endpoints and Jaeger exporter.
func AgentConfig(factories config.Factories) *configmodels.Config {
jaegerExporter := factories.Exporters["jaeger"]
exporters := configmodels.Exporters{
"jaeger": jaegerExporter.CreateDefaultConfig(),
}
hc := factories.Extensions["health_check"].CreateDefaultConfig().(*healthcheckextension.Config)
processors := configmodels.Processors{}
resProcessor := factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config)
if len(resProcessor.Labels) > 0 {
processors[resProcessor.Name()] = resProcessor
}
receivers := createAgentReceivers(factories)
return &configmodels.Config{
Receivers: receivers,
Processors: processors,
Exporters: exporters,
Extensions: configmodels.Extensions{"health_check": hc},
Service: configmodels.Service{
Extensions: []string{"health_check"},
Pipelines: map[string]*configmodels.Pipeline{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: receiverNames(receivers),
Processors: processorNames(processors),
Exporters: exporterNames(exporters),
},
},
},
}
}

func createAgentReceivers(factories config.Factories) configmodels.Receivers {
jaeger := factories.Receivers["jaeger"].CreateDefaultConfig().(*jaegerreceiver.Config)
func enableAgentUDPEndpoints(jaeger *jaegerreceiver.Config) configmodels.Receivers {
if _, ok := jaeger.Protocols["thrift_compact"]; !ok {
jaeger.Protocols["thrift_compact"] = &receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
Expand All @@ -184,35 +185,6 @@ func createAgentReceivers(factories config.Factories) configmodels.Receivers {
return recvs
}

// IngesterConfig creates default ingester configuration.
// It enables Jaeger kafka receiver and storage backend.
func IngesterConfig(storageType string, factories config.Factories) (*configmodels.Config, error) {
exporters, err := createExporters(storageType, factories)
if err != nil {
return nil, err
}
kafkaReceiver := factories.Receivers[kafkaRec.TypeStr].CreateDefaultConfig().(*kafkaRec.Config)
receivers := configmodels.Receivers{
kafkaReceiver.Name(): kafkaReceiver,
}
hc := factories.Extensions["health_check"].CreateDefaultConfig()
return &configmodels.Config{
Receivers: receivers,
Exporters: exporters,
Extensions: configmodels.Extensions{"health_check": hc},
Service: configmodels.Service{
Extensions: []string{"health_check"},
Pipelines: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: receiverNames(receivers),
Exporters: exporterNames(exporters),
},
},
},
}, nil
}

func receiverNames(receivers configmodels.Receivers) []string {
var names []string
for _, v := range receivers {
Expand Down
Loading