Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/loadbalancing] Add top level sending_queue, retry_on_failure and timeout settings #36094

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .chloggen/feat_loadbalancing-exporter-queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: loadbalancingexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adding sending_queue, retry_on_failure and timeout settings to loadbalancing exporter configuration

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35378,16826]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
When switching to top-level sending_queue configuration - users should carefully review queue size
In some rare cases setting top-level queue size to n*queueSize might be not enough to prevent data loss

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
78 changes: 74 additions & 4 deletions exporter/loadbalancingexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ The `loadbalancingexporter` will, irrespective of the chosen resolver (`static`,

## Configuration

Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using the processor.
Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using the exporter.

* The `otlp` property configures the template used for building the OTLP exporter. Refer to the OTLP Exporter documentation for information on which options are available. Note that the `endpoint` property should not be set and will be overridden by this exporter with the backend endpoint.
* The `resolver` accepts a `static` node, a `dns`, a `k8s` service or `aws_cloud_map`. If all four are specified, an `errMultipleResolversProvided` error will be thrown.
Expand Down Expand Up @@ -90,6 +90,7 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th
* `traceID`: Routes spans based on their `traceID`. Invalid for metrics.
* `metric`: Routes metrics based on their metric name. Invalid for spans.
* `streamID`: Routes metrics based on their datapoint streamID. That's the unique hash of all it's attributes, plus the attributes and identifying information of its resource, scope, and metric data
* loadbalancing exporter supports set of standard [queuing, batching, retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs more documentation: what happens when a failure occurs and then the ring is changed? Will it be directed to a new backend (yes!)? This expected behavior should be explicitly documented to our users.


Simple example

Expand Down Expand Up @@ -117,9 +118,9 @@ exporters:
- backend-2:4317
- backend-3:4317
- backend-4:4317
# Notice to config a headless service DNS in Kubernetes
# Notice to config a headless service DNS in Kubernetes
# dns:
# hostname: otelcol-headless.observability.svc.cluster.local
# hostname: otelcol-headless.observability.svc.cluster.local

service:
pipelines:
Expand All @@ -137,6 +138,75 @@ service:
- loadbalancing
```

Persistent queue, retry and timeout usage example:

```yaml
receivers:
otlp:
protocols:
grpc:
endpoint: localhost:4317

processors:

exporters:
loadbalancing:
timeout: 10s
retry_on_failure:
enabled: true
initial_interval: 5s
max_interval: 30s
max_elapsed_time: 300s
sending_queue:
# please take a note that otlp.sending_queue will be
# disabled automatically in this case to avoid data loss
enabled: true
num_consumers: 2
queue_size: 1000
storage: file_storage/otc
routing_key: "service"
protocol:
otlp:
# all options from the OTLP exporter are supported
# except the endpoint
timeout: 1s
# doesn't take any effect because loadbalancing.sending_queue
# is enabled
sending_queue:
enabled: true
resolver:
static:
hostnames:
- backend-1:4317
- backend-2:4317
- backend-3:4317
- backend-4:4317
# Notice to config a headless service DNS in Kubernetes
# dns:
# hostname: otelcol-headless.observability.svc.cluster.local

extensions:
file_storage/otc:
directory: /var/lib/storage/otc
timeout: 10s

service:
extensions: [file_storage]
pipelines:
traces:
receivers:
- otlp
processors: []
exporters:
- loadbalancing
logs:
receivers:
- otlp
processors: []
exporters:
- loadbalancing
```

Kubernetes resolver example (For a more specific example: [example/k8s-resolver](./example/k8s-resolver/README.md))

```yaml
Expand Down Expand Up @@ -334,7 +404,7 @@ service:

## Metrics

The following metrics are recorded by this processor:
The following metrics are recorded by this exporter:

* `otelcol_loadbalancer_num_resolutions` represents the total number of resolutions performed by the resolver specified in the tag `resolver`, split by their outcome (`success=true|false`). For the static resolver, this should always be `1` with the tag `success=true`.
* `otelcol_loadbalancer_num_backends` informs how many backends are currently in use. It should always match the number of items specified in the configuration file in case the `static` resolver is used, and should eventually (seconds) catch up with the DNS changes. Note that DNS caches that might exist between the load balancer and the record authority will influence how long it takes for the load balancer to see the change.
Expand Down
6 changes: 6 additions & 0 deletions exporter/loadbalancingexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

"github.com/aws/aws-sdk-go-v2/service/servicediscovery/types"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/otlpexporter"
)

Expand All @@ -30,6 +32,10 @@ const (

// Config defines configuration for the exporter.
type Config struct {
TimeoutSettings exporterhelper.TimeoutConfig `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"`

Protocol Protocol `mapstructure:"protocol"`
Resolver ResolverSettings `mapstructure:"resolver"`
RoutingKey string `mapstructure:"routing_key"`
Expand Down
94 changes: 88 additions & 6 deletions exporter/loadbalancingexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,22 @@ package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata"
)

const (
zapEndpointKey = "endpoint"
)

// NewFactory creates a factory for the exporter.
func NewFactory() exporter.Factory {
return exporter.NewFactory(
Expand All @@ -32,20 +40,94 @@ func createDefaultConfig() component.Config {
otlpDefaultCfg.Endpoint = "placeholder:4317"

return &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavior should be clearly documented. I think it would even make sense to log a warning in case users tried to use that option.

That said, there ARE users relying on this feature at the moment. What should we do about them? I think we should copy their current values to the load-balancer level, which would give them roughly the same desired behavior they have today.

So, here's how it could work:

  1. if the OTLP Exporter options do not include the resiliency options, use our new defaults
  2. if there are, copy the ones from the OTLP section into the top-level config, and set the OTLP one to the default values, writing a log message stating that this is happening
  3. if there are options at both levels, the ones at the load-balancing level wins, as that's the newest one and we can assume it's the latest intent from the user (but log a WARN in this case, it's serious enough)

I don't think we need a feature flag or deprecation plan for this.

QueueSettings: exporterhelper.NewDefaultQueueConfig(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
Protocol: Protocol{
OTLP: *otlpDefaultCfg,
},
}
}

func createTracesExporter(_ context.Context, params exporter.Settings, cfg component.Config) (exporter.Traces, error) {
return newTracesExporter(params, cfg)
func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config {
oCfg := cfg.Protocol.OTLP
oCfg.Endpoint = endpoint

return oCfg
}

func buildExporterSettings(params exporter.Settings, endpoint string) exporter.Settings {
// Override child exporter ID to segregate metrics from loadbalancing top level
childName := endpoint
if params.ID.Name() != "" {
childName = fmt.Sprintf("%s_%s", params.ID.Name(), childName)
}
params.ID = component.NewIDWithName(params.ID.Type(), childName)
// Add "endpoint" attribute to child exporter logger to segregate logs from loadbalancing top level
params.Logger = params.Logger.With(zap.String(zapEndpointKey, endpoint))

return params
}

func createTracesExporter(ctx context.Context, params exporter.Settings, cfg component.Config) (exporter.Traces, error) {
c := cfg.(*Config)
exporter, err := newTracesExporter(params, cfg)
if err != nil {
return nil, fmt.Errorf("cannot configure loadbalancing traces exporter: %w", err)
}

return exporterhelper.NewTraces(
ctx,
params,
cfg,
exporter.ConsumeTraces,
exporterhelper.WithStart(exporter.Start),
exporterhelper.WithShutdown(exporter.Shutdown),
exporterhelper.WithCapabilities(exporter.Capabilities()),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithQueue(c.QueueSettings),
exporterhelper.WithRetry(c.BackOffConfig),
)
}

func createLogsExporter(_ context.Context, params exporter.Settings, cfg component.Config) (exporter.Logs, error) {
return newLogsExporter(params, cfg)
func createLogsExporter(ctx context.Context, params exporter.Settings, cfg component.Config) (exporter.Logs, error) {
c := cfg.(*Config)
exporter, err := newLogsExporter(params, cfg)
if err != nil {
return nil, fmt.Errorf("cannot configure loadbalancing logs exporter: %w", err)
}

return exporterhelper.NewLogs(
ctx,
params,
cfg,
exporter.ConsumeLogs,
exporterhelper.WithStart(exporter.Start),
exporterhelper.WithShutdown(exporter.Shutdown),
exporterhelper.WithCapabilities(exporter.Capabilities()),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithQueue(c.QueueSettings),
exporterhelper.WithRetry(c.BackOffConfig),
)
}

func createMetricsExporter(_ context.Context, params exporter.Settings, cfg component.Config) (exporter.Metrics, error) {
return newMetricsExporter(params, cfg)
func createMetricsExporter(ctx context.Context, params exporter.Settings, cfg component.Config) (exporter.Metrics, error) {
c := cfg.(*Config)
exporter, err := newMetricsExporter(params, cfg)
if err != nil {
return nil, fmt.Errorf("cannot configure loadbalancing metrics exporter: %w", err)
}

return exporterhelper.NewMetrics(
ctx,
params,
cfg,
exporter.ConsumeMetrics,
exporterhelper.WithStart(exporter.Start),
exporterhelper.WithShutdown(exporter.Shutdown),
exporterhelper.WithCapabilities(exporter.Capabilities()),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithQueue(c.QueueSettings),
exporterhelper.WithRetry(c.BackOffConfig),
)
}
65 changes: 65 additions & 0 deletions exporter/loadbalancingexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,20 @@ package loadbalancingexporter

import (
"context"
"fmt"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.opentelemetry.io/collector/otelcol/otelcoltest"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata"
)

func TestTracesExporterGetsCreatedWithValidConfiguration(t *testing.T) {
Expand Down Expand Up @@ -58,3 +68,58 @@ func TestOTLPConfigIsValid(t *testing.T) {
// verify
assert.NoError(t, otlpCfg.Validate())
}

func TestBuildExporterConfig(t *testing.T) {
// prepare
factories, err := otelcoltest.NopFactories()
require.NoError(t, err)

factories.Exporters[metadata.Type] = NewFactory()
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33594
// nolint:staticcheck
cfg, err := otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", "test-build-exporter-config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

c := cfg.Exporters[component.NewID(metadata.Type)]
require.NotNil(t, c)

// test
defaultCfg := otlpexporter.NewFactory().CreateDefaultConfig().(*otlpexporter.Config)
exporterCfg := buildExporterConfig(c.(*Config), "the-endpoint")

// verify
grpcSettings := defaultCfg.ClientConfig
grpcSettings.Endpoint = "the-endpoint"
assert.Equal(t, grpcSettings, exporterCfg.ClientConfig)

assert.Equal(t, defaultCfg.TimeoutConfig, exporterCfg.TimeoutConfig)
assert.Equal(t, defaultCfg.QueueConfig, exporterCfg.QueueConfig)
assert.Equal(t, defaultCfg.RetryConfig, exporterCfg.RetryConfig)
}

func TestBuildExporterSettings(t *testing.T) {
// prepare
creationParams := exportertest.NewNopSettings()
testEndpoint := "the-endpoint"
observedZapCore, observedLogs := observer.New(zap.InfoLevel)
creationParams.Logger = zap.New(observedZapCore)

// test
exporterParams := buildExporterSettings(creationParams, testEndpoint)
exporterParams.Logger.Info("test")

// verify
expectedID := component.NewIDWithName(
creationParams.ID.Type(),
fmt.Sprintf("%s_%s", creationParams.ID.Name(), testEndpoint),
)
assert.Equal(t, expectedID, exporterParams.ID)

allLogs := observedLogs.All()
require.Equal(t, 1, observedLogs.Len())
assert.Contains(t,
allLogs[0].Context,
zap.String(zapEndpointKey, testEndpoint),
)
}
2 changes: 1 addition & 1 deletion exporter/loadbalancingexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.112.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.112.0
go.opentelemetry.io/collector/config/configretry v1.18.0
go.opentelemetry.io/collector/config/configtelemetry v0.112.0
go.opentelemetry.io/collector/confmap v1.18.0
go.opentelemetry.io/collector/consumer v0.112.0
Expand Down Expand Up @@ -113,7 +114,6 @@ require (
go.opentelemetry.io/collector/config/configgrpc v0.112.0 // indirect
go.opentelemetry.io/collector/config/confignet v1.18.0 // indirect
go.opentelemetry.io/collector/config/configopaque v1.18.0 // indirect
go.opentelemetry.io/collector/config/configretry v1.18.0 // indirect
go.opentelemetry.io/collector/config/configtls v1.18.0 // indirect
go.opentelemetry.io/collector/config/internal v0.112.0 // indirect
go.opentelemetry.io/collector/confmap/provider/envprovider v1.18.0 // indirect
Expand Down
4 changes: 3 additions & 1 deletion exporter/loadbalancingexporter/log_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ func newLogsExporter(params exporter.Settings, cfg component.Config) (*logExport
exporterFactory := otlpexporter.NewFactory()
cfFunc := func(ctx context.Context, endpoint string) (component.Component, error) {
oCfg := buildExporterConfig(cfg.(*Config), endpoint)
return exporterFactory.CreateLogs(ctx, params, &oCfg)
oParams := buildExporterSettings(params, endpoint)

return exporterFactory.CreateLogs(ctx, oParams, &oCfg)
}

lb, err := newLoadBalancer(params.Logger, cfg, cfFunc, telemetry)
Expand Down
Loading