Skip to content

Commit

Permalink
[exporter/datasetexporter]: Release unused resources after some time (o…
Browse files Browse the repository at this point in the history
…pen-telemetry#31293)

**Description:** Release unused resources after some time

This PR is:
* upgrading the used library from v0.17.0 to v0.18.0
* introducing new configuration option - `buffer.purge_older_than`

**Link to tracking Issue:** open-telemetry#31292

**Testing:** 

Issue has been in the underlying library -
scalyr/dataset-go#75 - where I have fixed the
issue.

![Screenshot 2024-02-16 at 12 01
40](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/122797378/f1c80807-69de-49c4-aa62-7edd211e3b34)


**Documentation:** I have added documentation to the newly added
configuration option - `buffer.purge_older_than`.

Fixes open-telemetry#31292
  • Loading branch information
martin-majlis-s1 authored and XinRanZhAWS committed Mar 13, 2024
1 parent 747b950 commit 1dec6fb
Show file tree
Hide file tree
Showing 15 changed files with 66 additions and 18 deletions.
27 changes: 27 additions & 0 deletions .chloggen/datasetexporter-update-to-0.18.0.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: datasetexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Release resources if they haven't been used for some time.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31292]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ require (
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.21 // indirect
github.com/scalyr/dataset-go v0.17.0 // indirect
github.com/scalyr/dataset-go v0.18.0 // indirect
github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.7.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ require (
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/samber/lo v1.38.1 // indirect
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.21 // indirect
github.com/scalyr/dataset-go v0.17.0 // indirect
github.com/scalyr/dataset-go v0.18.0 // indirect
github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.7.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions cmd/otelcontribcol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions exporter/datasetexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ Make sure to provide the appropriate server host value in the `serverHost` attri

- `debug` (default = false): Adds `session_key` to the server fields. It's useful for debugging throughput issues.
- `buffer`:
- `max_lifetime` (default = 5s): The maximum delay between sending batches from the same source.
- `max_lifetime` (default = 5s): The maximum delay between sending batches from the same session.
- `purge_older_than` (default = 30s): The maximum delay between receiving data for the same session after which resources associated with it are purged.
- `group_by` (default = []): The list of attributes based on which events should be grouped. They are moved from the event attributes to the session info and shown as server fields in the UI.
- `retry_initial_interval` (default = 5s): Time to wait after the first failure before retrying.
- `retry_max_interval` (default = 30s): Is the upper bound on backoff.
Expand Down Expand Up @@ -307,7 +308,7 @@ To enable metrics you have to:
### Available Metrics

Available metrics contain `dataset` in their name. There are counters related to the
number of processed events (`events`), buffers (`buffer`), and transferred bytes (`bytes`).
number of processed events (`events`), buffers (`buffer`), sessions (`sessions`), and transferred bytes (`bytes`).
There are also histograms related to response times (`responseTime`) and payload size (`payloadSize`).

There are several counters related to events/buffers:
Expand Down
4 changes: 4 additions & 0 deletions exporter/datasetexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,15 @@ func newDefaultLogsSettings() LogsSettings {
}

const bufferMaxLifetime = 5 * time.Second
const bufferPurgeOlderThan = 30 * time.Second
const bufferRetryInitialInterval = 5 * time.Second
const bufferRetryMaxInterval = 30 * time.Second
const bufferRetryMaxElapsedTime = 300 * time.Second
const bufferRetryShutdownTimeout = 30 * time.Second

type BufferSettings struct {
MaxLifetime time.Duration `mapstructure:"max_lifetime"`
PurgeOlderThan time.Duration `mapstructure:"purge_older_than"`
GroupBy []string `mapstructure:"group_by"`
RetryInitialInterval time.Duration `mapstructure:"retry_initial_interval"`
RetryMaxInterval time.Duration `mapstructure:"retry_max_interval"`
Expand All @@ -124,6 +126,7 @@ type BufferSettings struct {
func newDefaultBufferSettings() BufferSettings {
return BufferSettings{
MaxLifetime: bufferMaxLifetime,
PurgeOlderThan: bufferPurgeOlderThan,
GroupBy: []string{},
RetryInitialInterval: bufferRetryInitialInterval,
RetryMaxInterval: bufferRetryMaxInterval,
Expand Down Expand Up @@ -211,6 +214,7 @@ func (c *Config) convert() (*ExporterConfig, error) {
Tokens: datasetConfig.DataSetTokens{WriteLog: string(c.APIKey)},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxLifetime: c.BufferSettings.MaxLifetime,
PurgeOlderThan: c.BufferSettings.PurgeOlderThan,
MaxSize: buffer.LimitBufferSize,
GroupBy: c.BufferSettings.GroupBy,
RetryInitialInterval: c.BufferSettings.RetryInitialInterval,
Expand Down
7 changes: 4 additions & 3 deletions exporter/datasetexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,9 @@ func TestConfigString(t *testing.T) {
APIKey: "secret",
Debug: true,
BufferSettings: BufferSettings{
MaxLifetime: 123,
GroupBy: []string{"field1", "field2"},
MaxLifetime: 123,
PurgeOlderThan: 567,
GroupBy: []string{"field1", "field2"},
},
TracesSettings: TracesSettings{
exportSettings: exportSettings{
Expand Down Expand Up @@ -142,7 +143,7 @@ func TestConfigString(t *testing.T) {
}

assert.Equal(t,
"DatasetURL: https://example.com; APIKey: [REDACTED] (6); Debug: true; BufferSettings: {MaxLifetime:123ns GroupBy:[field1 field2] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s RetryShutdownTimeout:0s}; LogsSettings: {ExportResourceInfo:true ExportResourcePrefix:AAA ExportScopeInfo:true ExportScopePrefix:BBB DecomposeComplexMessageField:true DecomposedComplexMessagePrefix:EEE exportSettings:{ExportSeparator:CCC ExportDistinguishingSuffix:DDD}}; TracesSettings: {exportSettings:{ExportSeparator:TTT ExportDistinguishingSuffix:UUU}}; ServerHostSettings: {UseHostName:false ServerHost:foo-bar}; BackOffConfig: {Enabled:true InitialInterval:5s RandomizationFactor:0.5 Multiplier:1.5 MaxInterval:30s MaxElapsedTime:5m0s}; QueueSettings: {Enabled:true NumConsumers:10 QueueSize:1000 StorageID:<nil>}; TimeoutSettings: {Timeout:5s}",
"DatasetURL: https://example.com; APIKey: [REDACTED] (6); Debug: true; BufferSettings: {MaxLifetime:123ns PurgeOlderThan:567ns GroupBy:[field1 field2] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s RetryShutdownTimeout:0s}; LogsSettings: {ExportResourceInfo:true ExportResourcePrefix:AAA ExportScopeInfo:true ExportScopePrefix:BBB DecomposeComplexMessageField:true DecomposedComplexMessagePrefix:EEE exportSettings:{ExportSeparator:CCC ExportDistinguishingSuffix:DDD}}; TracesSettings: {exportSettings:{ExportSeparator:TTT ExportDistinguishingSuffix:UUU}}; ServerHostSettings: {UseHostName:false ServerHost:foo-bar}; BackOffConfig: {Enabled:true InitialInterval:5s RandomizationFactor:0.5 Multiplier:1.5 MaxInterval:30s MaxElapsedTime:5m0s}; QueueSettings: {Enabled:true NumConsumers:10 QueueSize:1000 StorageID:<nil>}; TimeoutSettings: {Timeout:5s}",
config.String(),
)
}
Expand Down
15 changes: 14 additions & 1 deletion exporter/datasetexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func TestLoadConfig(t *testing.T) {
APIKey: "key-lib",
BufferSettings: BufferSettings{
MaxLifetime: 345 * time.Millisecond,
PurgeOlderThan: bufferPurgeOlderThan,
GroupBy: []string{"attributes.container_id", "attributes.log.file.path"},
RetryInitialInterval: bufferRetryInitialInterval,
RetryMaxInterval: bufferRetryMaxInterval,
Expand All @@ -88,6 +89,7 @@ func TestLoadConfig(t *testing.T) {
Debug: true,
BufferSettings: BufferSettings{
MaxLifetime: 3456 * time.Millisecond,
PurgeOlderThan: 78 * time.Second,
GroupBy: []string{"body.map.kubernetes.pod_id", "body.map.kubernetes.docker_id", "body.map.stream"},
RetryInitialInterval: 21 * time.Second,
RetryMaxInterval: 22 * time.Second,
Expand Down Expand Up @@ -158,11 +160,16 @@ type CreateTest struct {
}

func createExporterTests() []CreateTest {
factory := NewFactory()
defaultCfg := factory.CreateDefaultConfig().(*Config)
defaultCfg.APIKey = "default-api-key"
defaultCfg.DatasetURL = "https://app.eu.scalyr.com"

return []CreateTest{
{
name: "broken",
config: &Config{},
expectedError: fmt.Errorf("cannot get DataSetExporter: cannot convert config: DatasetURL: ; APIKey: [REDACTED] (0); Debug: false; BufferSettings: {MaxLifetime:0s GroupBy:[] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s RetryShutdownTimeout:0s}; LogsSettings: {ExportResourceInfo:false ExportResourcePrefix: ExportScopeInfo:false ExportScopePrefix: DecomposeComplexMessageField:false DecomposedComplexMessagePrefix: exportSettings:{ExportSeparator: ExportDistinguishingSuffix:}}; TracesSettings: {exportSettings:{ExportSeparator: ExportDistinguishingSuffix:}}; ServerHostSettings: {UseHostName:false ServerHost:}; BackOffConfig: {Enabled:false InitialInterval:0s RandomizationFactor:0 Multiplier:0 MaxInterval:0s MaxElapsedTime:0s}; QueueSettings: {Enabled:false NumConsumers:0 QueueSize:0 StorageID:<nil>}; TimeoutSettings: {Timeout:0s}; config is not valid: api_key is required"),
expectedError: fmt.Errorf("cannot get DataSetExporter: cannot convert config: DatasetURL: ; APIKey: [REDACTED] (0); Debug: false; BufferSettings: {MaxLifetime:0s PurgeOlderThan:0s GroupBy:[] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s RetryShutdownTimeout:0s}; LogsSettings: {ExportResourceInfo:false ExportResourcePrefix: ExportScopeInfo:false ExportScopePrefix: DecomposeComplexMessageField:false DecomposedComplexMessagePrefix: exportSettings:{ExportSeparator: ExportDistinguishingSuffix:}}; TracesSettings: {exportSettings:{ExportSeparator: ExportDistinguishingSuffix:}}; ServerHostSettings: {UseHostName:false ServerHost:}; BackOffConfig: {Enabled:false InitialInterval:0s RandomizationFactor:0 Multiplier:0 MaxInterval:0s MaxElapsedTime:0s}; QueueSettings: {Enabled:false NumConsumers:0 QueueSize:0 StorageID:<nil>}; TimeoutSettings: {Timeout:0s}; config is not valid: api_key is required"),
},
{
name: "valid",
Expand All @@ -171,6 +178,7 @@ func createExporterTests() []CreateTest {
APIKey: "key-lib",
BufferSettings: BufferSettings{
MaxLifetime: 12345,
PurgeOlderThan: 78901,
GroupBy: []string{"attributes.container_id"},
RetryInitialInterval: time.Second,
RetryMaxInterval: time.Minute,
Expand All @@ -188,5 +196,10 @@ func createExporterTests() []CreateTest {
},
expectedError: nil,
},
{
name: "default",
config: defaultCfg,
expectedError: nil,
},
}
}
2 changes: 1 addition & 1 deletion exporter/datasetexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/google/uuid v1.6.0
// github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/filestorage v0.77.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.94.0
github.com/scalyr/dataset-go v0.17.0
github.com/scalyr/dataset-go v0.18.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/component v0.94.1
go.opentelemetry.io/collector/confmap v0.94.1
Expand Down
4 changes: 2 additions & 2 deletions exporter/datasetexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions exporter/datasetexporter/logs_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,7 @@ func TestConsumeLogsShouldSucceed(t *testing.T) {
Debug: true,
BufferSettings: BufferSettings{
MaxLifetime: 2 * time.Second,
PurgeOlderThan: 10 * time.Second,
GroupBy: []string{"attributes.container_id"},
RetryInitialInterval: time.Second,
RetryMaxInterval: time.Minute,
Expand Down
1 change: 1 addition & 0 deletions exporter/datasetexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dataset/full:
debug: true
buffer:
max_lifetime: 3456ms
purge_older_than: 78s
group_by:
- body.map.kubernetes.pod_id
- body.map.kubernetes.docker_id
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ require (
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/samber/lo v1.38.1 // indirect
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.21 // indirect
github.com/scalyr/dataset-go v0.17.0 // indirect
github.com/scalyr/dataset-go v0.18.0 // indirect
github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.7.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 1dec6fb

Please sign in to comment.