Skip to content

Commit

Permalink
Catchup with otel version since stackdriver released 0.17.0 (#2613)
Browse files Browse the repository at this point in the history
* Catchup with otel version since stackdriver released 0.17.0

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>

* Update README

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Mar 9, 2021
1 parent 16d8ca9 commit d58b47e
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 192 deletions.
55 changes: 42 additions & 13 deletions exporter/stackdriverexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,20 @@ The following configuration options are supported:
- `number_of_workers` (optional): NumberOfWorkers sets the number of go rountines that send requests. The minimum number of workers is 1.
- `resource_mappings` (optional): ResourceMapping defines mapping of resources from source (OpenCensus) to target (Stackdriver).
- `label_mappings` (optional): Optional flag signals whether we can proceed with transformation if a label is missing in the resource.
- `retry_on_failure` (optional): Configuration for how to handle retries when sending data to Google Cloud fails.
- `enabled` (default = true)
- `initial_interval` (default = 5s): Time to wait after the first failure before retrying; ignored if `enabled` is `false`
- `max_interval` (default = 30s): Is the upper bound on backoff; ignored if `enabled` is `false`
- `max_elapsed_time` (default = 120s): Is the maximum amount of time spent trying to send a batch; ignored if `enabled` is `false`
- `sending_queue` (optional): Configuration for how to buffer traces before sending.
- `enabled` (default = true)
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false`
- `queue_size` (default = 5000): Maximum number of batches kept in memory before data; ignored if `enabled` is `false`;
User should calculate this as `num_seconds * requests_per_second` where:
- `num_seconds` is the number of seconds to buffer in case of a backend outage
- `requests_per_second` is the average number of requests per seconds.

Additional configuration for the trace exporter:

- `trace.bundle_delay_threshold` (optional): Starting from the time that the first span is added to a bundle, once this delay has passed, handle the bundle. If not set, uses the exporter default.
- `trace.bundle_count_threshold` (optional): Once a bundle has this many spans, handle the bundle. Since only one span at a time is added to a bundle, no bundle will exceed this threshold, so it also serves as a limit. If not set, uses the exporter default.
- `trace.bundle_byte_threshold` (optional): Once the number of bytes in current bundle reaches this threshold, handle the bundle. This triggers handling, but does not cap the total size of a bundle. If not set, uses the exporter default.
- `trace.bundle_byte_limit` (optional): The maximum size of a bundle, in bytes. Zero means unlimited.
- `trace.buffer_max_bytes` (optional): The maximum number of bytes that the Bundler will keep in memory before returning ErrOverflow. If not set, uses the exporter default.
Note: These `retry_on_failure` and `sending_queue` are provided (and documented) by the [Exporter Helper](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/exporterhelper#configuration)

Additional configuration for the metric exporter:

Expand Down Expand Up @@ -48,12 +54,15 @@ exporters:
- source_key: source.label1
target_key: target_label_1

trace:
bundle_delay_threshold: 2s
bundle_count_threshold: 50
bundle_byte_threshold: 15e3
bundle_byte_limit: 0
buffer_max_bytes: 8e6
retry_on_failure:
enabled: true
initial_interval: 5s
max_interval: 30s
max_elapsed_time: 120s
sending_queue:
enabled: true
num_consumers: 2
queue_size: 50

metric:
prefix: prefix
Expand All @@ -70,3 +79,23 @@ following proxy environment variables:
If set at Collector start time then exporters, regardless of protocol,
will or will not proxy traffic as defined by these environment variables.
# Recommendations
It is recommended to always run a [batch processor](https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor/batchprocessor)
and [memory limiter](https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor/memorylimiter) for tracing pipelines to ensure
optimal network usage and avoiding memory overruns. You may also want to run an additional
[sampler](https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor/probabilisticsamplerprocessor), depending on your needs.
# Deprecatations
The previous trace configuration (v0.21.0) has been deprecated in favor of the common configuration options available in OpenTelemetry. These will cause a failure to start
and should be migrated:
- `trace.bundle_delay_threshold` (optional): Use `batch` processor instead ([docs](https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor/batchprocessor)).
- `trace.bundle_count_threshold` (optional): Use `batch` processor instead ([docs](https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor/batchprocessor)).
- `trace.bundle_byte_threshold` (optional): Use `memorylimiter` processor instead ([docs](https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor/memorylimiter))
- `trace.bundle_byte_limit` (optional): Use `memorylimiter` processor instead ([docs](https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor/memorylimiter))
- `trace.buffer_max_bytes` (optional): Use `memorylimiter` processor instead ([docs](https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor/memorylimiter))
18 changes: 5 additions & 13 deletions exporter/stackdriverexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package stackdriverexporter

import (
"time"

"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"google.golang.org/api/option"
Expand All @@ -30,26 +28,20 @@ type Config struct {
Endpoint string `mapstructure:"endpoint"`
// Only has effect if Endpoint is not ""
UseInsecure bool `mapstructure:"use_insecure"`

// Timeout for all API calls. If not set, defaults to 12 seconds.
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
ResourceMappings []ResourceMapping `mapstructure:"resource_mappings"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

ResourceMappings []ResourceMapping `mapstructure:"resource_mappings"`
// GetClientOptions returns additional options to be passed
// to the underlying Google Cloud API client.
// Must be set programmatically (no support via declarative config).
// Optional.
GetClientOptions func() []option.ClientOption

TraceConfig TraceConfig `mapstructure:"trace"`
MetricConfig MetricConfig `mapstructure:"metric"`
NumOfWorkers int `mapstructure:"number_of_workers"`
}

type TraceConfig struct {
BundleDelayThreshold time.Duration `mapstructure:"bundle_delay_threshold"`
BundleCountThreshold int `mapstructure:"bundle_count_threshold"`
BundleByteThreshold int `mapstructure:"bundle_byte_threshold"`
BundleByteLimit int `mapstructure:"bundle_byte_limit"`
BufferMaxBytes int `mapstructure:"buffer_max_bytes"`
}

type MetricConfig struct {
Expand Down
17 changes: 10 additions & 7 deletions exporter/stackdriverexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,16 @@ func TestLoadConfig(t *testing.T) {
TargetType: "target-resource2",
},
},
NumOfWorkers: 3,
TraceConfig: TraceConfig{
BundleDelayThreshold: 2 * time.Second,
BundleCountThreshold: 50,
BundleByteThreshold: 15000,
BundleByteLimit: 0,
BufferMaxBytes: 8000000,
RetrySettings: exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 10 * time.Second,
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 2,
QueueSize: 10,
},
MetricConfig: MetricConfig{
Prefix: "prefix",
Expand Down
2 changes: 2 additions & 0 deletions exporter/stackdriverexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func createDefaultConfig() configmodels.Exporter {
NameVal: typeStr,
},
TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: defaultTimeout},
RetrySettings: exporterhelper.DefaultRetrySettings(),
QueueSettings: exporterhelper.DefaultQueueSettings(),
UserAgent: "opentelemetry-collector-contrib {{version}}",
}
}
Expand Down
4 changes: 2 additions & 2 deletions exporter/stackdriverexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ go 1.14

require (
contrib.go.opencensus.io/exporter/stackdriver v0.13.5
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v0.16.0
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v0.17.0
github.com/census-instrumentation/opencensus-proto v0.3.0
github.com/stretchr/testify v1.7.0
go.opencensus.io v0.23.0
go.opentelemetry.io/collector v0.21.1-0.20210308033310-65c4c4a1b383
go.opentelemetry.io/otel v0.17.0
go.opentelemetry.io/otel/sdk v0.16.0
go.opentelemetry.io/otel/sdk v0.17.0
go.opentelemetry.io/otel/trace v0.17.0
go.uber.org/zap v1.16.0
google.golang.org/api v0.40.0
Expand Down
10 changes: 4 additions & 6 deletions exporter/stackdriverexporter/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/DataDog/zstd v1.4.4/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v0.16.0 h1:ljU7eS7Fe0eGWEJxhoIjGANPEhx2f5PKTbDjvT61Kwk=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v0.16.0/go.mod h1:TLDTgf8D4fD8Y1DizdJKtfIjkHJZU1J+mieFB1qS5T8=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v0.17.0 h1:DwS87jXZh62gXN8QAG0c+qYQApxOPq1CMbVHCKt4w5o=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v0.17.0/go.mod h1:yZU4uaWc5QB5NYG0oAG1/1x8JKs1gOsy4sevZ9zR2Ok=
github.com/HdrHistogram/hdrhistogram-go v0.9.0/go.mod h1:nxrse8/Tzg2tg3DZcZjm6qEclQKK70g0KxO61gFFZD4=
github.com/HdrHistogram/hdrhistogram-go v1.0.1 h1:GX8GAYDuhlFQnI2fRDHQhTlkHMz8bEn0jTI6LJU0mpw=
github.com/HdrHistogram/hdrhistogram-go v1.0.1/go.mod h1:BWJ+nMSHY3L41Zj7CA3uXnloDp7xxV0YvstAE7nKTaM=
Expand Down Expand Up @@ -139,7 +139,6 @@ github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/
github.com/aws/aws-sdk-go v1.37.8 h1:9kywcbuz6vQuTf+FD+U7FshafrHzmqUCjgAEiLuIJ8U=
github.com/aws/aws-sdk-go v1.37.8/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down Expand Up @@ -1059,15 +1058,14 @@ go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/collector v0.21.1-0.20210308033310-65c4c4a1b383 h1:ZQxlOelL8x2D9uiCN0r7aiCHIttvPfyF6XDJN4t7jhs=
go.opentelemetry.io/collector v0.21.1-0.20210308033310-65c4c4a1b383/go.mod h1:sBkAGYUQSh1f+owCK0aPV2uLcUB6rPHEOWjdvuE2SdQ=
go.opentelemetry.io/otel v0.16.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA=
go.opentelemetry.io/otel v0.17.0 h1:6MKOu8WY4hmfpQ4oQn34u6rYhnf2sWf1LXYO/UFm71U=
go.opentelemetry.io/otel v0.17.0/go.mod h1:Oqtdxmf7UtEvL037ohlgnaYa1h7GtMh0NcSd9eqkC9s=
go.opentelemetry.io/otel/metric v0.17.0 h1:t+5EioN8YFXQ2EH+1j6FHCKMUj+57zIDSnSGr/mWuug=
go.opentelemetry.io/otel/metric v0.17.0/go.mod h1:hUz9lH1rNXyEwWAhIWCMFWKhYtpASgSnObJFnU26dJ0=
go.opentelemetry.io/otel/oteltest v0.17.0 h1:TyAihUowTDLqb4+m5ePAsR71xPJaTBJl4KDArIdi9k4=
go.opentelemetry.io/otel/oteltest v0.17.0/go.mod h1:JT/LGFxPwpN+nlsTiinSYjdIx3hZIGqHCpChcIZmdoE=
go.opentelemetry.io/otel/sdk v0.16.0 h1:5o+fkNsOfH5Mix1bHUApNBqeDcAYczHDa7Ix+R73K2U=
go.opentelemetry.io/otel/sdk v0.16.0/go.mod h1:Jb0B4wrxerxtBeapvstmAZvJGQmvah4dHgKSngDpiCo=
go.opentelemetry.io/otel/sdk v0.17.0 h1:eHXQwanmbtSHM/GcJYbJ8FyyH/sT9a0e+1Z9ZWkF7Ug=
go.opentelemetry.io/otel/sdk v0.17.0/go.mod h1:INs1PePjjF2hf842AXsxGTe5lH023QfLTZRFPiV/RUk=
go.opentelemetry.io/otel/trace v0.17.0 h1:SBOj64/GAOyWzs5F680yW1ITIfJkm6cJWL2YAvuL9xY=
go.opentelemetry.io/otel/trace v0.17.0/go.mod h1:bIujpqg6ZL6xUTubIUgziI1jSaUPthmabA/ygf/6Cfg=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand Down
6 changes: 3 additions & 3 deletions exporter/stackdriverexporter/spandata.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ func pdataLinksToOTLinks(links pdata.SpanLinkSlice) []apitrace.Link {
return otLinks
}

func pdataEventsToOTMessageEvents(events pdata.SpanEventSlice) []export.Event {
func pdataEventsToOTMessageEvents(events pdata.SpanEventSlice) []apitrace.Event {
size := events.Len()
otEvents := make([]export.Event, 0, size)
otEvents := make([]apitrace.Event, 0, size)
for i := 0; i < size; i++ {
event := events.At(i)
otEvents = append(otEvents, export.Event{
otEvents = append(otEvents, apitrace.Event{
Name: event.Name(),
Attributes: pdataAttributesToOTAttributes(event.Attributes(), pdata.NewResource()),
Time: time.Unix(0, int64(event.Timestamp())),
Expand Down
2 changes: 1 addition & 1 deletion exporter/stackdriverexporter/spandata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestPDataResourceSpansToOTSpanData_endToEnd(t *testing.T) {
Name: "End-To-End Here",
StartTime: startTime,
EndTime: endTime,
MessageEvents: []trace.Event{
MessageEvents: []apitrace.Event{
{
Time: startTime,
Name: "start",
Expand Down
83 changes: 6 additions & 77 deletions exporter/stackdriverexporter/stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"strings"
"time"

"contrib.go.opencensus.io/exporter/stackdriver"
cloudtrace "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
Expand All @@ -35,8 +34,6 @@ import (
"google.golang.org/grpc"
)

const name = "stackdriver"

// traceExporter is a wrapper struct of OT cloud trace exporter
type traceExporter struct {
texporter *cloudtrace.Exporter
Expand All @@ -47,14 +44,6 @@ type metricsExporter struct {
mexporter *stackdriver.Exporter
}

func (*traceExporter) Name() string {
return name
}

func (*metricsExporter) Name() string {
return name
}

func (te *traceExporter) Shutdown(ctx context.Context) error {
return te.texporter.Shutdown(ctx)
}
Expand Down Expand Up @@ -111,14 +100,6 @@ func newStackdriverTraceExporter(cfg *Config, params component.ExporterCreatePar
return nil, err
}
topts = append(topts, cloudtrace.WithTraceClientOptions(copts))
if cfg.NumOfWorkers > 0 {
topts = append(topts, cloudtrace.WithMaxNumberOfWorkers(cfg.NumOfWorkers))
}

topts, err = appendBundleOptions(topts, cfg.TraceConfig)
if err != nil {
return nil, err
}

exp, err := cloudtrace.NewExporter(topts...)
if err != nil {
Expand All @@ -134,60 +115,9 @@ func newStackdriverTraceExporter(cfg *Config, params component.ExporterCreatePar
exporterhelper.WithShutdown(tExp.Shutdown),
// Disable exporterhelper Timeout, since we are using a custom mechanism
// within exporter itself
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}))
}

func appendBundleOptions(topts []cloudtrace.Option, cfg TraceConfig) ([]cloudtrace.Option, error) {
topts, err := validateAndAppendDurationOption(topts, "BundleDelayThreshold", cfg.BundleDelayThreshold, cloudtrace.WithBundleDelayThreshold(cfg.BundleDelayThreshold))
if err != nil {
return nil, err
}

topts, err = validateAndAppendIntOption(topts, "BundleCountThreshold", cfg.BundleCountThreshold, cloudtrace.WithBundleCountThreshold(cfg.BundleCountThreshold))
if err != nil {
return nil, err
}

topts, err = validateAndAppendIntOption(topts, "BundleByteThreshold", cfg.BundleByteThreshold, cloudtrace.WithBundleByteThreshold(cfg.BundleByteThreshold))
if err != nil {
return nil, err
}

topts, err = validateAndAppendIntOption(topts, "BundleByteLimit", cfg.BundleByteLimit, cloudtrace.WithBundleByteLimit(cfg.BundleByteLimit))
if err != nil {
return nil, err
}

topts, err = validateAndAppendIntOption(topts, "BufferMaxBytes", cfg.BufferMaxBytes, cloudtrace.WithBufferMaxBytes(cfg.BufferMaxBytes))
if err != nil {
return nil, err
}

return topts, nil
}

func validateAndAppendIntOption(topts []cloudtrace.Option, name string, val int, opt cloudtrace.Option) ([]cloudtrace.Option, error) {
if val < 0 {
return nil, fmt.Errorf("invalid value for: %s", name)
}

if val > 0 {
topts = append(topts, opt)
}

return topts, nil
}

func validateAndAppendDurationOption(topts []cloudtrace.Option, name string, val time.Duration, opt cloudtrace.Option) ([]cloudtrace.Option, error) {
if val < 0 {
return nil, fmt.Errorf("invalid value for: %s", name)
}

if val > 0 {
topts = append(topts, opt)
}

return topts, nil
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithRetry(cfg.RetrySettings))
}

func newStackdriverMetricsExporter(cfg *Config, params component.ExporterCreateParams) (component.MetricsExporter, error) {
Expand Down Expand Up @@ -220,9 +150,6 @@ func newStackdriverMetricsExporter(cfg *Config, params component.ExporterCreateP
options.TraceClientOptions = copts
options.MonitoringClientOptions = copts

if cfg.NumOfWorkers > 0 {
options.NumberOfWorkers = cfg.NumOfWorkers
}
if cfg.MetricConfig.SkipCreateMetricDescriptor {
options.SkipCMD = true
}
Expand All @@ -246,7 +173,9 @@ func newStackdriverMetricsExporter(cfg *Config, params component.ExporterCreateP
exporterhelper.WithShutdown(mExp.Shutdown),
// Disable exporterhelper Timeout, since we are using a custom mechanism
// within exporter itself
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}))
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithRetry(cfg.RetrySettings))
}

// pushMetrics calls StackdriverExporter.PushMetricsProto on each element of the given metrics
Expand Down
Loading

0 comments on commit d58b47e

Please sign in to comment.