Skip to content

Commit

Permalink
[exporter/datasetexporter]: Make export of resources and scopes more …
Browse files Browse the repository at this point in the history
…flexible (#27683)

**Description:** Make export of resources and scopes more flexible

**Link to tracking Issue:** #27651 , #27649 

**Testing:** Unit tests

**Documentation:** <Describe the documentation added.>

**Note:**: This PR is on top of this PR -
https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/27663/files
- since it's introducing some helper functions and it's fixing NPE.

Fixes #27651
Fixes #27649

---------

Signed-off-by: Paschalis Tsilias <paschalis.tsilias@gmail.com>
Signed-off-by: Christian Kruse <ctkruse99@gmail.com>
Co-authored-by: Paschalis Tsilias <tpaschalis@users.noreply.github.com>
Co-authored-by: Juraci Paixão Kröhling <juraci@kroehling.de>
Co-authored-by: Dmitrii Anoshin <anoshindx@gmail.com>
Co-authored-by: Andrzej Stencel <astencel@sumologic.com>
Co-authored-by: hovavza <147598197+hovavza@users.noreply.github.com>
Co-authored-by: Daniel Jaglowski <jaglows3@gmail.com>
Co-authored-by: Brandon Johnson <brandon.johnson@bluemedora.com>
Co-authored-by: Miel Donkers <miel.donkers@gmail.com>
Co-authored-by: bryan-aguilar <46550959+bryan-aguilar@users.noreply.github.com>
Co-authored-by: sakulali <sakulali@126.com>
Co-authored-by: Christian Kruse <ctkruse99@gmail.com>
Co-authored-by: gord02 <53834349+gord02@users.noreply.github.com>
Co-authored-by: bagmeg <joonsoo181005@gmail.com>
Co-authored-by: Yang Song <songy23@users.noreply.github.com>
Co-authored-by: Pablo Baeyens <pbaeyens31+github@gmail.com>
Co-authored-by: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com>
Co-authored-by: OpenTelemetry Bot <107717825+opentelemetrybot@users.noreply.github.com>
Co-authored-by: Curtis Robert <crobert@splunk.com>
Co-authored-by: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com>
Co-authored-by: Antoine Toulme <antoine@lunar-ocean.com>
  • Loading branch information
1 parent 56a5497 commit e347ff3
Show file tree
Hide file tree
Showing 12 changed files with 972 additions and 122 deletions.
27 changes: 27 additions & 0 deletions .chloggen/27651-make-resource-scope-more-flexible.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: datasetexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Make export of resources and scopes more flexible

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27651, 27649]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
142 changes: 140 additions & 2 deletions exporter/datasetexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,153 @@ Make sure to provide the appropriate server host value in the `serverHost` attri
- `retry_max_elapsed_time` (default = 300s): Is the maximum amount of time spent trying to send a buffer.
- `retry_shutdown_timeout` (default = 30s): The maximum time for which it will try to send data to the DataSet during shutdown. This value should be shorter than container's grace period.
- `logs`:
- `export_scope_info_on_event` (default = false): Include LogRecord scope information (if available) on the DataSet event.
- `decompose_complex_message_field` (default = true): Set this to false to disable decomposing complex body / message field types (e.g. a map) into separate fields.
- `export_resource_info_on_event` (default = false): Include LogRecord resource information (if available) on the DataSet event.
- `export_resource_prefix` (default = 'resource.attributes.'): A prefix string for the resource, if `export_resource_info_on_event` is enabled.
- `export_scope_info_on_event` (default = true): Include LogRecord scope information (if available) on the DataSet event.
- `export_scope_prefix` (default = 'scope.attributes.'): A prefix string for the scope, if `export_scope_info_on_event` is enabled.
- `export_separator` (default = '.'): The separator to add between keys when flattening nested structures (maps, arrays).
- `export_distinguishing_suffix` (default = '_'): A suffix string to resolve naming collisions when flattening.
- `decompose_complex_message_field` (default = false): Decompose complex body / message field types (e.g. a maps, arrays) into separate fields.
- `decomposed_complex_message_prefix` (default = 'body.map.'): A prefix string to use when a complex message is decomposed.
- `traces`:
- `export_separator` (default = '.'): The separator to add between keys when flattening nested structures (maps, arrays).
- `export_distinguishing_suffix` (default = '_'): A suffix string to resolve naming collisions when flattening.
- `server_host`:
- `server_host` (default = ''): Specifies the server host to be used for the events.
- `use_hostname` (default = true): Determines whether the `hostname` of the node should be used as the server host for the events. When set to `true`, the node's `hostname` is automatically used.
- `retry_on_failure`: See [retry_on_failure](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
- `sending_queue`: See [sending_queue](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
- `timeout`: See [timeout](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)


### Attributes

Enabled attributes are exported in the order:

1. Log properties
2. Body
3. Resource attributes
4. Scope attributes
5. Log attributes

If there is a name conflict, the `export_distinguishing_suffix` value is appended to the later attribute's name. If the `export_distinguishing_suffix` value is an empty string, then the value from the last attribute is used.

#### Example

Example LogRecord:
```
Log
- body:
- b: 1
- x: "b"
- resource:
- r: 2
- x: "r"
- scope:
- s: 3
- x: "s"
- attribute:
- a: 4
- x: "a"
- map:
- m1: 5
- m2: 6
```

Then the event will look like:
* Default settings for `logs`:
* Event:
```
- message: "{\"b\": 1, \"x\": \"b\"}"
- scope.attributes.s: 3
- scope.attributes.x: "s"
- a: 4
- x: "a"
- map.m1: 5
- map.m2: 6
```
* Everything enabled:
* Configuration:
```
logs:
export_resource_info_on_event: true
export_resource_prefix: "r."
export_scope_info_on_event: true
export_scope_prefix: "s."
decompose_complex_message_field: true
decomposed_complex_message_prefix: "m."
export_separator: "-"
export_distinguishing_suffix: "_"
```
* Event:
```
- message: "{\"b\": 1, \"x\": \"b\"}"
- m.b: 1
- m.x: "b"
- r.r: 2
- r.x: "r"
- s.s: 3
- s.x: "s"
- a: 4
- x: "a"
- map-m1: 5
- map-m2: 6
```
* Everything enabled, prefixes are empty strings:
* Configuration:
```
logs:
export_resource_info_on_event: true
export_resource_prefix: ""
export_scope_info_on_event: true
export_scope_prefix: ""
decompose_complex_message_field: true
decomposed_complex_message_prefix: ""
export_separator: "-"
export_distinguishing_suffix: "_"
```
* Event:
```
- message: "{\"b\": 1, \"x\": \"b\"}"
- b: 1
- x: "b"
- r: 2
- x_: "r"
- s: 3
- x__: "s"
- a: 4
- x___: "a"
- map-m1: 5
- map-m2: 6
```
* Everything enabled, prefixes are empty strings, suffix is empty string:
* Configuration:
```
logs:
export_resource_info_on_event: true
export_resource_prefix: ""
export_scope_info_on_event: true
export_scope_prefix: ""
decompose_complex_message_field: true
decomposed_complex_message_prefix: ""
export_separator: "-"
export_distinguishing_suffix: ""
```
* Event:
```
- message: "{\"b\": 1, \"x\": \"b\"}"
- b: 1
- r: 2
- s: 3
- a: 4
- x: "a"
- map-m1: 5
- map-m2: 6
```

Field names can have `.` dots, `_` underscores, and `-` hyphens. You must escape slashes in Search and PowerQueries. For example, search the field name `app.kubernetes.io/component` as `app.kubernetes.io\/component`.


### Example

```yaml
Expand Down
59 changes: 54 additions & 5 deletions exporter/datasetexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,90 @@ import (
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

const exportSeparatorDefault = "."
const exportDistinguishingSuffix = "_"

// exportSettings configures separator and distinguishing suffixes for all exported fields
type exportSettings struct {
// ExportSeparator is separator used when flattening exported attributes
// Default value: .
ExportSeparator string `mapstructure:"export_separator"`

// ExportDistinguishingSuffix is suffix used to be appended to the end of attribute name in case of collision
// Default value: _
ExportDistinguishingSuffix string `mapstructure:"export_distinguishing_suffix"`
}

// newDefaultExportSettings returns the default settings for exportSettings.
func newDefaultExportSettings() exportSettings {
return exportSettings{
ExportSeparator: exportSeparatorDefault,
ExportDistinguishingSuffix: exportDistinguishingSuffix,
}
}

type TracesSettings struct {
// exportSettings configures separator and distinguishing suffixes for all exported fields
exportSettings `mapstructure:",squash"`
}

// newDefaultTracesSettings returns the default settings for TracesSettings.
func newDefaultTracesSettings() TracesSettings {
return TracesSettings{}
return TracesSettings{
exportSettings: newDefaultExportSettings(),
}
}

const logsExportResourceInfoDefault = false
const logsExportResourcePrefixDefault = "resource.attributes."
const logsExportScopeInfoDefault = true
const logsExportScopePrefixDefault = "scope.attributes."
const logsDecomposeComplexMessageFieldDefault = false
const logsDecomposedComplexMessageFieldPrefixDefault = "body.map."

type LogsSettings struct {
// ExportResourceInfo is optional flag to signal that the resource info is being exported to DataSet while exporting Logs.
// This is especially useful when reducing DataSet billable log volume.
// Default value: false.
// Default value: false
ExportResourceInfo bool `mapstructure:"export_resource_info_on_event"`

// ExportResourcePrefix is prefix for the resource attributes when they are exported (see ExportResourceInfo).
// Default value: resource.attributes.
ExportResourcePrefix string `mapstructure:"export_resource_prefix"`

// ExportScopeInfo is an optional flag that signals if scope info should be exported (when available) with each event. If scope
// information is not utilized, it makes sense to disable exporting it since it will result in increased billable log volume.
// Default value: true
ExportScopeInfo bool `mapstructure:"export_scope_info_on_event"`

// ExportScopePrefix is prefix for the scope attributes when they are exported (see ExportScopeInfo).
// Default value: scope.attributes.
ExportScopePrefix string `mapstructure:"export_scope_prefix"`

// DecomposeComplexMessageField is an optional flag to signal that message / body of complex types (e.g. a map) should be
// decomposed / deconstructed into multiple fields. This is usually done outside of the main DataSet integration on the
// client side (e.g. as part of the attribute processor or similar) or on the server side (DataSet server side JSON parser
// for message field) and that's why this functionality is disabled by default.
DecomposeComplexMessageField bool `mapstructure:"decompose_complex_message_field"`

// DecomposedComplexMessagePrefix is prefix for the decomposed complex message (see DecomposeComplexMessageField).
// Default value: body.map.
DecomposedComplexMessagePrefix string `mapstructure:"decomposed_complex_message_prefix"`

// exportSettings configures separator and distinguishing suffixes for all exported fields
exportSettings `mapstructure:",squash"`
}

// newDefaultLogsSettings returns the default settings for LogsSettings.
func newDefaultLogsSettings() LogsSettings {
return LogsSettings{
ExportResourceInfo: logsExportResourceInfoDefault,
ExportScopeInfo: logsExportScopeInfoDefault,
DecomposeComplexMessageField: logsDecomposeComplexMessageFieldDefault,
ExportResourceInfo: logsExportResourceInfoDefault,
ExportResourcePrefix: logsExportResourcePrefixDefault,
ExportScopeInfo: logsExportScopeInfoDefault,
ExportScopePrefix: logsExportScopePrefixDefault,
DecomposeComplexMessageField: logsDecomposeComplexMessageFieldDefault,
DecomposedComplexMessagePrefix: logsDecomposedComplexMessageFieldPrefixDefault,
exportSettings: newDefaultExportSettings(),
}
}

Expand Down
27 changes: 25 additions & 2 deletions exporter/datasetexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,14 @@ func TestConfigUseDefaults(t *testing.T) {
assert.Equal(t, "secret", string(config.APIKey))
assert.Equal(t, bufferMaxLifetime, config.MaxLifetime)
assert.Equal(t, logsExportResourceInfoDefault, config.LogsSettings.ExportResourceInfo)
assert.Equal(t, logsExportResourcePrefixDefault, config.LogsSettings.ExportResourcePrefix)
assert.Equal(t, logsExportScopeInfoDefault, config.LogsSettings.ExportScopeInfo)
assert.Equal(t, logsExportScopePrefixDefault, config.LogsSettings.ExportScopePrefix)
assert.Equal(t, logsDecomposeComplexMessageFieldDefault, config.LogsSettings.DecomposeComplexMessageField)
assert.Equal(t, exportSeparatorDefault, config.LogsSettings.exportSettings.ExportSeparator)
assert.Equal(t, exportDistinguishingSuffix, config.LogsSettings.exportSettings.ExportDistinguishingSuffix)
assert.Equal(t, exportSeparatorDefault, config.TracesSettings.exportSettings.ExportSeparator)
assert.Equal(t, exportDistinguishingSuffix, config.TracesSettings.exportSettings.ExportDistinguishingSuffix)
}

func TestConfigValidate(t *testing.T) {
Expand Down Expand Up @@ -106,7 +112,24 @@ func TestConfigString(t *testing.T) {
MaxLifetime: 123,
GroupBy: []string{"field1", "field2"},
},
TracesSettings: TracesSettings{},
TracesSettings: TracesSettings{
exportSettings: exportSettings{
ExportSeparator: "TTT",
ExportDistinguishingSuffix: "UUU",
},
},
LogsSettings: LogsSettings{
ExportResourceInfo: true,
ExportResourcePrefix: "AAA",
ExportScopeInfo: true,
ExportScopePrefix: "BBB",
DecomposeComplexMessageField: true,
DecomposedComplexMessagePrefix: "EEE",
exportSettings: exportSettings{
ExportSeparator: "CCC",
ExportDistinguishingSuffix: "DDD",
},
},
ServerHostSettings: ServerHostSettings{
ServerHost: "foo-bar",
UseHostName: false,
Expand All @@ -117,7 +140,7 @@ func TestConfigString(t *testing.T) {
}

assert.Equal(t,
"DatasetURL: https://example.com; BufferSettings: {MaxLifetime:123ns GroupBy:[field1 field2] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s RetryShutdownTimeout:0s}; LogsSettings: {ExportResourceInfo:false ExportScopeInfo:false DecomposeComplexMessageField:false}; TracesSettings: {}; ServerHostSettings: {UseHostName:false ServerHost:foo-bar}; RetrySettings: {Enabled:true InitialInterval:5s RandomizationFactor:0.5 Multiplier:1.5 MaxInterval:30s MaxElapsedTime:5m0s}; QueueSettings: {Enabled:true NumConsumers:10 QueueSize:1000 StorageID:<nil>}; TimeoutSettings: {Timeout:5s}",
"DatasetURL: https://example.com; BufferSettings: {MaxLifetime:123ns GroupBy:[field1 field2] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s RetryShutdownTimeout:0s}; LogsSettings: {ExportResourceInfo:true ExportResourcePrefix:AAA ExportScopeInfo:true ExportScopePrefix:BBB DecomposeComplexMessageField:true DecomposedComplexMessagePrefix:EEE exportSettings:{ExportSeparator:CCC ExportDistinguishingSuffix:DDD}}; TracesSettings: {exportSettings:{ExportSeparator:TTT ExportDistinguishingSuffix:UUU}}; ServerHostSettings: {UseHostName:false ServerHost:foo-bar}; RetrySettings: {Enabled:true InitialInterval:5s RandomizationFactor:0.5 Multiplier:1.5 MaxInterval:30s MaxElapsedTime:5m0s}; QueueSettings: {Enabled:true NumConsumers:10 QueueSize:1000 StorageID:<nil>}; TimeoutSettings: {Timeout:5s}",
config.String(),
)
}
Expand Down
41 changes: 23 additions & 18 deletions exporter/datasetexporter/datasetexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,42 +86,47 @@ func buildKey(prefix string, separator string, key string, depth int) string {
return res
}

func updateWithPrefixedValuesMap(target map[string]interface{}, prefix string, separator string, source map[string]interface{}, depth int) {
func updateWithPrefixedValuesMap(target map[string]interface{}, prefix string, separator string, suffix string, source map[string]interface{}, depth int) {
for k, v := range source {
key := buildKey(prefix, separator, k, depth)
updateWithPrefixedValues(target, key, separator, v, depth+1)
updateWithPrefixedValues(target, key, separator, suffix, v, depth+1)
}
}

func updateWithPrefixedValuesArray(target map[string]interface{}, prefix string, separator string, source []interface{}, depth int) {
func updateWithPrefixedValuesArray(target map[string]interface{}, prefix string, separator string, suffix string, source []interface{}, depth int) {
for i, v := range source {
key := buildKey(prefix, separator, strconv.FormatInt(int64(i), 10), depth)
updateWithPrefixedValues(target, key, separator, v, depth+1)
updateWithPrefixedValues(target, key, separator, suffix, v, depth+1)
}
}

func updateWithPrefixedValues(target map[string]interface{}, prefix string, separator string, source interface{}, depth int) {
st := reflect.TypeOf(source)
if st == nil {
target[prefix] = source
return
}
switch st.Kind() {
case reflect.Map:
updateWithPrefixedValuesMap(target, prefix, separator, source.(map[string]interface{}), depth)
case reflect.Array, reflect.Slice:
updateWithPrefixedValuesArray(target, prefix, separator, source.([]interface{}), depth)
default:
func updateWithPrefixedValues(target map[string]interface{}, prefix string, separator string, suffix string, source interface{}, depth int) {
setValue := func() {
for {
// now the last value wins
// Should the first value win?
_, found := target[prefix]
if found {
prefix += separator
if found && len(suffix) > 0 {
prefix += suffix
} else {
target[prefix] = source
break
}
}
}

st := reflect.TypeOf(source)
if st == nil {
setValue()
return
}
switch st.Kind() {
case reflect.Map:
updateWithPrefixedValuesMap(target, prefix, separator, suffix, source.(map[string]interface{}), depth)
case reflect.Array, reflect.Slice:
updateWithPrefixedValuesArray(target, prefix, separator, suffix, source.([]interface{}), depth)
default:
setValue()
}
}

Expand Down
Loading

0 comments on commit e347ff3

Please sign in to comment.