Skip to content

Commit

Permalink
Add Google Pubsub as an OTLP exporter/receiver
Browse files Browse the repository at this point in the history
Allows export and reception of OTLP data over Google Pubsub. With the
ability to set subscriptions and topics for each of the components.
  • Loading branch information
alexvanboxel committed Feb 3, 2021
1 parent 9677f5b commit 85ce288
Show file tree
Hide file tree
Showing 25 changed files with 4,490 additions and 6 deletions.
4 changes: 4 additions & 0 deletions cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dynatraceexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/f5cloudexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/gcloudpubsubexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/honeycombexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerthrifthttpexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter"
Expand All @@ -56,6 +57,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/dockerstatsreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/gcloudpubsubreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jmxreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver"
Expand Down Expand Up @@ -114,6 +116,7 @@ func components() (component.Factories, error) {
wavefrontreceiver.NewFactory(),
windowsperfcountersreceiver.NewFactory(),
zookeeperreceiver.NewFactory(),
gcloudpubsubreceiver.NewFactory(),
}

receivers = append(receivers, extraReceivers()...)
Expand Down Expand Up @@ -149,6 +152,7 @@ func components() (component.Factories, error) {
splunkhecexporter.NewFactory(),
stackdriverexporter.NewFactory(),
sumologicexporter.NewFactory(),
gcloudpubsubexporter.NewFactory(),
}
for _, exp := range factories.Exporters {
exporters = append(exporters, exp)
Expand Down
1 change: 1 addition & 0 deletions exporter/gcloudpubsubexporter/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
23 changes: 23 additions & 0 deletions exporter/gcloudpubsubexporter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Google Pubsub Exporter

This exporter sends OTLP messages to a Google Cloud [Pubsub](https://cloud.google.com/pubsub) topic.

The following configuration options are supported:

* `project` (Required): The Google Cloud Project of the topics.
* `validate_existence`(Optional): Checks the existence of the topic, but this requires admin permissions to validate
the existence.
* `traces_topic` (Optional): The topic name to send OTLP trace data over, this is the name within the project.
* `metrics_topic` (Optional): The topic name to send OTLP metric data over, this is the name within the project.
* `logs_topic` (Optional): The topic name to send OTLP log data over, this is the name within the project.

```yaml
exporters:
gcloudpubsub:
project: my-project
validate_existence: false
traces_topic: otlp-traces
metrics_topic: otlp-metrics
logs_topic: otlp-logs
```
36 changes: 36 additions & 0 deletions exporter/gcloudpubsubexporter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gcloudpubsubexporter

import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
ProjectID string `mapstructure:"project"`
UserAgent string `mapstructure:"user_agent"`
Endpoint string `mapstructure:"endpoint"`
// Only has effect if Endpoint is not ""
UseInsecure bool `mapstructure:"use_insecure"`
// Timeout for all API calls. If not set, defaults to 12 seconds.
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.

ValidateExistence bool `mapstructure:"validate_existence"`
MetricsTopic string `mapstructure:"metrics_topic"`
TracesTopic string `mapstructure:"traces_topic"`
LogsTopic string `mapstructure:"logs_topic"`
}
64 changes: 64 additions & 0 deletions exporter/gcloudpubsubexporter/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gcloudpubsubexporter

import (
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

func TestLoadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.Nil(t, err)

factory := NewFactory()
factories.Exporters[configmodels.Type(typeStr)] = factory
cfg, err := configtest.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), factories,
)

require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Exporters), 2)

r0 := cfg.Exporters["gcloudpubsub"]
assert.Equal(t, r0, factory.CreateDefaultConfig())

r1 := cfg.Exporters["gcloudpubsub/customname"].(*Config)
assert.Equal(t, r1,
&Config{
ExporterSettings: configmodels.ExporterSettings{TypeVal: configmodels.Type(typeStr), NameVal: "gcloudpubsub/customname"},
ProjectID: "my-project",
UserAgent: "opentelemetry-collector-contrib {{version}}",
Endpoint: "test-endpoint",
UseInsecure: true,
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 20 * time.Second,
},
ValidateExistence: true,
TracesTopic: "otlp-traces",
MetricsTopic: "otlp-metrics",
LogsTopic: "otlp-logs",
})
}
166 changes: 166 additions & 0 deletions exporter/gcloudpubsubexporter/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gcloudpubsubexporter

import (
"context"
"fmt"

"cloud.google.com/go/pubsub"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
"google.golang.org/api/option"
"google.golang.org/grpc"
)

const name = "gcloudpubsub"

// pubsubExporter is a wrapper struct of OT cloud trace exporter
type pubsubExporter struct {
instanceName string
logger *zap.Logger

tracesTopicName string
metricsTopicName string
logsTopicName string

tracesTopic *pubsub.Topic
metricsTopic *pubsub.Topic
logsTopic *pubsub.Topic

//
userAgent string
config *Config
//
client *pubsub.Client
}

func (*pubsubExporter) Name() string {
return name
}

func (ex *pubsubExporter) Start(ctx context.Context, _ component.Host) error {
if ex.client == nil {
copts, _ := ex.generateClientOptions()
client, _ := pubsub.NewClient(context.Background(), ex.config.ProjectID, copts...)
ex.client = client
}

if ex.tracesTopic == nil && ex.tracesTopicName != "" {
ex.tracesTopic = ex.client.TopicInProject(ex.tracesTopicName, ex.config.ProjectID)
if ex.config.ValidateExistence {
tctx, cancel := context.WithTimeout(ctx, ex.config.Timeout)
defer cancel()
exist, err := ex.tracesTopic.Exists(tctx)
if err != nil {
return err
}
if !exist {
return fmt.Errorf("trace subscription %s doesn't exist", ex.tracesTopic)
}
}
}
if ex.metricsTopic == nil && ex.metricsTopicName != "" {
ex.metricsTopic = ex.client.TopicInProject(ex.metricsTopicName, ex.config.ProjectID)
if ex.config.ValidateExistence {
tctx, cancel := context.WithTimeout(ctx, ex.config.Timeout)
defer cancel()
exist, err := ex.metricsTopic.Exists(tctx)
if err != nil {
return err
}
if !exist {
return fmt.Errorf("metric subscription %s doesn't exist", ex.tracesTopic)
}
}
}
if ex.logsTopic == nil && ex.logsTopicName != "" {
ex.logsTopic = ex.client.TopicInProject(ex.logsTopicName, ex.config.ProjectID)
if ex.config.ValidateExistence {
tctx, cancel := context.WithTimeout(ctx, ex.config.Timeout)
defer cancel()
exist, err := ex.logsTopic.Exists(tctx)
if err != nil {
return err
}
if !exist {
return fmt.Errorf("log subscription %s doesn't exist", ex.tracesTopic)
}
}
}
return nil
}

func (ex *pubsubExporter) Shutdown(context.Context) error {
if ex.tracesTopic != nil {
ex.tracesTopic.Stop()
ex.tracesTopic = nil
}
if ex.metricsTopic != nil {
ex.metricsTopic.Stop()
ex.metricsTopic = nil
}
if ex.logsTopic != nil {
ex.logsTopic.Stop()
ex.logsTopic = nil
}
if ex.client != nil {
ex.client.Close()
ex.client = nil
}
return nil
}

func (ex *pubsubExporter) generateClientOptions() ([]option.ClientOption, error) {
var copts []option.ClientOption
if ex.userAgent != "" {
copts = append(copts, option.WithUserAgent(ex.userAgent))
}
if ex.config.Endpoint != "" {
if ex.config.UseInsecure {
var dialOpts []grpc.DialOption
if ex.userAgent != "" {
dialOpts = append(dialOpts, grpc.WithUserAgent(ex.userAgent))
}
conn, _ := grpc.Dial(ex.config.Endpoint, append(dialOpts, grpc.WithInsecure())...)
copts = append(copts, option.WithGRPCConn(conn))
} else {
copts = append(copts, option.WithEndpoint(ex.config.Endpoint))
}
}
return copts, nil
}

func (ex *pubsubExporter) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
bytes, _ := td.ToOtlpProtoBytes()
message := &pubsub.Message{Data: bytes}
_ = ex.tracesTopic.Publish(ctx, message)
return nil
}

func (ex *pubsubExporter) ConsumeMetrics(ctx context.Context, td pdata.Metrics) error {
bytes, _ := td.ToOtlpProtoBytes()
message := &pubsub.Message{Data: bytes}
_ = ex.metricsTopic.Publish(ctx, message)
return nil
}

func (ex *pubsubExporter) ConsumeLogs(ctx context.Context, td pdata.Logs) error {
bytes, _ := td.ToOtlpProtoBytes()
message := &pubsub.Message{Data: bytes}
_ = ex.logsTopic.Publish(ctx, message)
return nil
}
Loading

0 comments on commit 85ce288

Please sign in to comment.