diff --git a/.chloggen/export-logs-resource-info-based-configuration.yaml b/.chloggen/export-logs-resource-info-based-configuration.yaml new file mode 100644 index 000000000000..8e4872151206 --- /dev/null +++ b/.chloggen/export-logs-resource-info-based-configuration.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: datasetexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Allow include Logs resource info export to DataSet based on new export_resource_info_on_event configuration. Fix timestamp handling." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [20660] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/exporter/datasetexporter/README.md b/exporter/datasetexporter/README.md index 3e2548869915..ceef4497e2ea 100644 --- a/exporter/datasetexporter/README.md +++ b/exporter/datasetexporter/README.md @@ -33,6 +33,8 @@ If you do not want to specify `api_key` in the file, you can use the [builtin fu - `traces`: - `aggregate` (default = false): Count the number of spans and errors belonging to a trace. - `max_wait` (default = 5s): The maximum waiting for all spans from single trace to arrive; ignored if `aggregate` is false. +- `logs`: + - `export_resource_info_on_event` (default = false): Include resource info to DataSet Event while exporting Logs. This is especially useful when reducing DataSet billable log volume. - `retry_on_failure`: See [retry_on_failure](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) - `sending_queue`: See [sending_queue](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) - `timeout`: See [timeout](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) diff --git a/exporter/datasetexporter/config.go b/exporter/datasetexporter/config.go index 8eddae82a873..69a2d532b7e2 100644 --- a/exporter/datasetexporter/config.go +++ b/exporter/datasetexporter/config.go @@ -32,6 +32,22 @@ func newDefaultTracesSettings() TracesSettings { } } +const logsExportResourceInfoDefault = false + +type LogsSettings struct { + // ExportResourceInfo is optional flag to signal that the resource info is being exported to DataSet while exporting Logs. + // This is especially useful when reducing DataSet billable log volume. + // Default value: false. + ExportResourceInfo bool `mapstructure:"export_resource_info_on_event"` +} + +// newDefaultLogsSettings returns the default settings for LogsSettings. +func newDefaultLogsSettings() LogsSettings { + return LogsSettings{ + ExportResourceInfo: logsExportResourceInfoDefault, + } +} + const bufferMaxLifetime = 5 * time.Second const bufferRetryInitialInterval = 5 * time.Second const bufferRetryMaxInterval = 30 * time.Second @@ -61,6 +77,7 @@ type Config struct { APIKey configopaque.String `mapstructure:"api_key"` BufferSettings `mapstructure:"buffer"` TracesSettings `mapstructure:"traces"` + LogsSettings `mapstructure:"logs"` exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` exporterhelper.QueueSettings `mapstructure:"sending_queue"` exporterhelper.TimeoutSettings `mapstructure:"timeout"` @@ -96,7 +113,8 @@ func (c *Config) String() string { s += fmt.Sprintf("%s: %+v; ", "TracesSettings", c.TracesSettings) s += fmt.Sprintf("%s: %+v; ", "RetrySettings", c.RetrySettings) s += fmt.Sprintf("%s: %+v; ", "QueueSettings", c.QueueSettings) - s += fmt.Sprintf("%s: %+v", "TimeoutSettings", c.TimeoutSettings) + s += fmt.Sprintf("%s: %+v; ", "TimeoutSettings", c.TimeoutSettings) + s += fmt.Sprintf("%s: %+v", "LogsSettings", c.LogsSettings) return s } @@ -123,6 +141,7 @@ func (c *Config) convert() (*ExporterConfig, error) { }, }, tracesSettings: c.TracesSettings, + logsSettings: c.LogsSettings, }, nil } @@ -130,4 +149,5 @@ func (c *Config) convert() (*ExporterConfig, error) { type ExporterConfig struct { datasetConfig *datasetConfig.DataSetConfig tracesSettings TracesSettings + logsSettings LogsSettings } diff --git a/exporter/datasetexporter/config_test.go b/exporter/datasetexporter/config_test.go index 66c38e6d09d4..8bd938926da0 100644 --- a/exporter/datasetexporter/config_test.go +++ b/exporter/datasetexporter/config_test.go @@ -43,6 +43,7 @@ func TestConfigUseDefaults(t *testing.T) { assert.Equal(t, "secret", string(config.APIKey)) assert.Equal(t, bufferMaxLifetime, config.MaxLifetime) assert.Equal(t, tracesMaxWait, config.TracesSettings.MaxWait) + assert.Equal(t, logsExportResourceInfoDefault, config.LogsSettings.ExportResourceInfo) } func TestConfigValidate(t *testing.T) { @@ -114,7 +115,22 @@ func TestConfigString(t *testing.T) { } assert.Equal(t, - "DatasetURL: https://example.com; BufferSettings: {MaxLifetime:123ns GroupBy:[field1 field2] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s}; TracesSettings: {Aggregate:true MaxWait:45s}; RetrySettings: {Enabled:true InitialInterval:5s RandomizationFactor:0.5 Multiplier:1.5 MaxInterval:30s MaxElapsedTime:5m0s}; QueueSettings: {Enabled:true NumConsumers:10 QueueSize:1000 StorageID:}; TimeoutSettings: {Timeout:5s}", + "DatasetURL: https://example.com; BufferSettings: {MaxLifetime:123ns GroupBy:[field1 field2] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s}; TracesSettings: {Aggregate:true MaxWait:45s}; RetrySettings: {Enabled:true InitialInterval:5s RandomizationFactor:0.5 Multiplier:1.5 MaxInterval:30s MaxElapsedTime:5m0s}; QueueSettings: {Enabled:true NumConsumers:10 QueueSize:1000 StorageID:}; TimeoutSettings: {Timeout:5s}; LogsSettings: {ExportResourceInfo:false}", config.String(), ) } + +func TestConfigUseProvidedExportResourceInfoValue(t *testing.T) { + f := NewFactory() + config := f.CreateDefaultConfig().(*Config) + configMap := confmap.NewFromStringMap(map[string]interface{}{ + "dataset_url": "https://example.com", + "api_key": "secret", + "logs": map[string]any{ + "export_resource_info_on_event": true, + }, + }) + err := config.Unmarshal(configMap) + assert.Nil(t, err) + assert.Equal(t, true, config.LogsSettings.ExportResourceInfo) +} diff --git a/exporter/datasetexporter/datasetexporter.go b/exporter/datasetexporter/datasetexporter.go index 0c73d16dd108..6bbbc88b1521 100644 --- a/exporter/datasetexporter/datasetexporter.go +++ b/exporter/datasetexporter/datasetexporter.go @@ -24,6 +24,7 @@ type DatasetExporter struct { logger *zap.Logger session string spanTracker *spanTracker + exporterCfg *ExporterConfig } func newDatasetExporter(entity string, config *Config, logger *zap.Logger) (*DatasetExporter, error) { @@ -60,6 +61,7 @@ func newDatasetExporter(entity string, config *Config, logger *zap.Logger) (*Dat session: uuid.New().String(), logger: logger, spanTracker: tracker, + exporterCfg: exporterCfg, }, nil } diff --git a/exporter/datasetexporter/factory.go b/exporter/datasetexporter/factory.go index 24e067d9b2d9..b76b3bf4b519 100644 --- a/exporter/datasetexporter/factory.go +++ b/exporter/datasetexporter/factory.go @@ -27,6 +27,7 @@ func createDefaultConfig() component.Config { return &Config{ BufferSettings: newDefaultBufferSettings(), TracesSettings: newDefaultTracesSettings(), + LogsSettings: newDefaultLogsSettings(), RetrySettings: exporterhelper.NewDefaultRetrySettings(), QueueSettings: exporterhelper.NewDefaultQueueSettings(), TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), diff --git a/exporter/datasetexporter/factory_test.go b/exporter/datasetexporter/factory_test.go index 77a4d3a288c8..6d2df780a697 100644 --- a/exporter/datasetexporter/factory_test.go +++ b/exporter/datasetexporter/factory_test.go @@ -49,6 +49,7 @@ func TestLoadConfig(t *testing.T) { APIKey: "key-minimal", BufferSettings: newDefaultBufferSettings(), TracesSettings: newDefaultTracesSettings(), + LogsSettings: newDefaultLogsSettings(), RetrySettings: exporterhelper.NewDefaultRetrySettings(), QueueSettings: exporterhelper.NewDefaultQueueSettings(), TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), @@ -67,6 +68,7 @@ func TestLoadConfig(t *testing.T) { RetryMaxElapsedTime: bufferRetryMaxElapsedTime, }, TracesSettings: newDefaultTracesSettings(), + LogsSettings: newDefaultLogsSettings(), RetrySettings: exporterhelper.NewDefaultRetrySettings(), QueueSettings: exporterhelper.NewDefaultQueueSettings(), TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), @@ -87,6 +89,9 @@ func TestLoadConfig(t *testing.T) { TracesSettings: TracesSettings{ MaxWait: 3 * time.Second, }, + LogsSettings: LogsSettings{ + ExportResourceInfo: true, + }, RetrySettings: exporterhelper.RetrySettings{ Enabled: true, InitialInterval: 11 * time.Nanosecond, @@ -133,7 +138,7 @@ func createExporterTests() []CreateTest { { name: "broken", config: &Config{}, - expectedError: fmt.Errorf("cannot get DataSetExpoter: cannot convert config: DatasetURL: ; BufferSettings: {MaxLifetime:0s GroupBy:[] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s}; TracesSettings: {Aggregate:false MaxWait:0s}; RetrySettings: {Enabled:false InitialInterval:0s RandomizationFactor:0 Multiplier:0 MaxInterval:0s MaxElapsedTime:0s}; QueueSettings: {Enabled:false NumConsumers:0 QueueSize:0 StorageID:}; TimeoutSettings: {Timeout:0s}; config is not valid: api_key is required"), + expectedError: fmt.Errorf("cannot get DataSetExpoter: cannot convert config: DatasetURL: ; BufferSettings: {MaxLifetime:0s GroupBy:[] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s}; TracesSettings: {Aggregate:false MaxWait:0s}; RetrySettings: {Enabled:false InitialInterval:0s RandomizationFactor:0 Multiplier:0 MaxInterval:0s MaxElapsedTime:0s}; QueueSettings: {Enabled:false NumConsumers:0 QueueSize:0 StorageID:}; TimeoutSettings: {Timeout:0s}; LogsSettings: {ExportResourceInfo:false}; config is not valid: api_key is required"), }, { name: "valid", diff --git a/exporter/datasetexporter/logs_exporter.go b/exporter/datasetexporter/logs_exporter.go index 2912f8a50fa0..2aed24b9d027 100644 --- a/exporter/datasetexporter/logs_exporter.go +++ b/exporter/datasetexporter/logs_exporter.go @@ -17,6 +17,8 @@ import ( "go.opentelemetry.io/collector/pdata/plog" ) +var now = time.Now + func createLogsExporter(ctx context.Context, set exporter.CreateSettings, config component.Config) (exporter.Logs, error) { cfg := castConfig(config) e, err := newDatasetExporter("logs", cfg, set.Logger) @@ -63,17 +65,22 @@ func buildBody(attrs map[string]interface{}, value pcommon.Value) string { return message } -func buildEventFromLog(log plog.LogRecord, resource pcommon.Resource, scope pcommon.InstrumentationScope) *add_events.EventBundle { +func buildEventFromLog( + log plog.LogRecord, + resource pcommon.Resource, + scope pcommon.InstrumentationScope, + settings LogsSettings, +) *add_events.EventBundle { attrs := make(map[string]interface{}) event := add_events.Event{} + observedTs := log.ObservedTimestamp().AsTime() if sevNum := log.SeverityNumber(); sevNum > 0 { attrs["severity.number"] = sevNum event.Sev = int(sevNum) } if timestamp := log.Timestamp().AsTime(); !timestamp.Equal(time.Unix(0, 0)) { - attrs["timestamp"] = timestamp.String() event.Ts = strconv.FormatInt(timestamp.UnixNano(), 10) } @@ -86,8 +93,8 @@ func buildEventFromLog(log plog.LogRecord, resource pcommon.Resource, scope pcom if dropped := log.DroppedAttributesCount(); dropped > 0 { attrs["dropped_attributes_count"] = dropped } - if observed := log.ObservedTimestamp().AsTime(); !observed.Equal(time.Unix(0, 0)) { - attrs["observed.timestamp"] = observed.String() + if !observedTs.Equal(time.Unix(0, 0)) { + attrs["observed.timestamp"] = observedTs.String() } if sevText := log.SeverityText(); sevText != "" { attrs["severity.text"] = sevText @@ -100,11 +107,27 @@ func buildEventFromLog(log plog.LogRecord, resource pcommon.Resource, scope pcom attrs["trace_id"] = trace } + // Event needs to always have timestamp set otherwise it will get set to unix epoch start time + if event.Ts == "" { + // ObservedTimestamp should always be set, but in case if it's not, we fall back to + // current time + // TODO: We should probably also do a rate limited log message here since this + // could indicate an issue with the current setup in case most events don't contain + // a timestamp. + if !observedTs.Equal(time.Unix(0, 0)) { + event.Ts = strconv.FormatInt(observedTs.UnixNano(), 10) + } else { + event.Ts = strconv.FormatInt(now().UnixNano(), 10) + } + } + updateWithPrefixedValues(attrs, "attributes.", ".", log.Attributes().AsRaw(), 0) attrs["flags"] = log.Flags() attrs["flag.is_sampled"] = log.Flags().IsSampled() - updateWithPrefixedValues(attrs, "resource.attributes.", ".", resource.Attributes().AsRaw(), 0) + if settings.ExportResourceInfo { + updateWithPrefixedValues(attrs, "resource.attributes.", ".", resource.Attributes().AsRaw(), 0) + } attrs["scope.name"] = scope.Name() updateWithPrefixedValues(attrs, "scope.attributes.", ".", scope.Attributes().AsRaw(), 0) @@ -130,7 +153,7 @@ func (e *DatasetExporter) consumeLogs(_ context.Context, ld plog.Logs) error { logRecords := scopeLogs.At(j).LogRecords() for k := 0; k < logRecords.Len(); k++ { logRecord := logRecords.At(k) - events = append(events, buildEventFromLog(logRecord, resource, scope)) + events = append(events, buildEventFromLog(logRecord, resource, scope, e.exporterCfg.logsSettings)) } } } diff --git a/exporter/datasetexporter/logs_exporter_test.go b/exporter/datasetexporter/logs_exporter_test.go index e9530839f74e..e61d42932b3a 100644 --- a/exporter/datasetexporter/logs_exporter_test.go +++ b/exporter/datasetexporter/logs_exporter_test.go @@ -11,6 +11,7 @@ import ( "io" "net/http" "net/http/httptest" + "strconv" "sync/atomic" "testing" "time" @@ -158,21 +159,19 @@ var testLEventRaw = &add_events.Event{ Sev: 9, Ts: "1581452773000000789", Attrs: map[string]interface{}{ - "attributes.app": "server", - "attributes.instance_num": int64(1), - "body.str": "This is a log message", - "body.type": "Str", - "dropped_attributes_count": uint32(1), - "flag.is_sampled": false, - "flags": plog.LogRecordFlags(0), - "message": "OtelExporter - Log - This is a log message", - "resource.attributes.resource-attr": "resource-attr-val-1", - "scope.name": "", - "severity.number": plog.SeverityNumberInfo, - "severity.text": "Info", - "span_id": "0102040800000000", - "timestamp": "2020-02-11 20:26:13.000000789 +0000 UTC", - "trace_id": "08040201000000000000000000000000", + "attributes.app": "server", + "attributes.instance_num": int64(1), + "body.str": "This is a log message", + "body.type": "Str", + "dropped_attributes_count": uint32(1), + "flag.is_sampled": false, + "flags": plog.LogRecordFlags(0), + "message": "OtelExporter - Log - This is a log message", + "scope.name": "", + "severity.number": plog.SeverityNumberInfo, + "severity.text": "Info", + "span_id": "0102040800000000", + "trace_id": "08040201000000000000000000000000", }, } @@ -182,22 +181,20 @@ var testLEventReq = &add_events.Event{ Sev: testLEventRaw.Sev, Ts: testLEventRaw.Ts, Attrs: map[string]interface{}{ - "attributes.app": "server", - "attributes.instance_num": float64(1), - "body.str": "This is a log message", - "body.type": "Str", - "dropped_attributes_count": float64(1), - "flag.is_sampled": false, - "flags": float64(plog.LogRecordFlags(0)), - "message": "OtelExporter - Log - This is a log message", - "resource.attributes.resource-attr": "resource-attr-val-1", - "scope.name": "", - "severity.number": float64(plog.SeverityNumberInfo), - "severity.text": "Info", - "span_id": "0102040800000000", - "timestamp": "2020-02-11 20:26:13.000000789 +0000 UTC", - "trace_id": "08040201000000000000000000000000", - "bundle_key": "d41d8cd98f00b204e9800998ecf8427e", + "attributes.app": "server", + "attributes.instance_num": float64(1), + "body.str": "This is a log message", + "body.type": "Str", + "dropped_attributes_count": float64(1), + "flag.is_sampled": false, + "flags": float64(plog.LogRecordFlags(0)), + "message": "OtelExporter - Log - This is a log message", + "scope.name": "", + "severity.number": float64(plog.SeverityNumberInfo), + "severity.text": "Info", + "span_id": "0102040800000000", + "trace_id": "08040201000000000000000000000000", + "bundle_key": "d41d8cd98f00b204e9800998ecf8427e", }, } @@ -224,6 +221,102 @@ func TestBuildEventFromLog(t *testing.T) { ld, lr.ResourceLogs().At(0).Resource(), lr.ResourceLogs().At(0).ScopeLogs().At(0).Scope(), + newDefaultLogsSettings(), + ) + + assert.Equal(t, expected, was) +} + +func TestBuildEventFromLogExportResources(t *testing.T) { + lr := testdata.GenerateLogsOneLogRecord() + ld := lr.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + + defaultAttrs := testLEventRaw.Attrs + defaultAttrs["resource.attributes.resource-attr"] = "resource-attr-val-1" + + expected := &add_events.EventBundle{ + Event: &add_events.Event{ + Thread: testLEventRaw.Thread, + Log: testLEventRaw.Log, + Sev: testLEventRaw.Sev, + Ts: testLEventRaw.Ts, + Attrs: defaultAttrs, + }, + Thread: testLThread, + Log: testLLog, + } + was := buildEventFromLog( + ld, + lr.ResourceLogs().At(0).Resource(), + lr.ResourceLogs().At(0).ScopeLogs().At(0).Scope(), + LogsSettings{ + ExportResourceInfo: true, + }, + ) + + assert.Equal(t, expected, was) +} + +func TestBuildEventFromLogEventWithoutTimestampWithObservedTimestampUseObservedTimestamp(t *testing.T) { + // When LogRecord doesn't have timestamp set, but it has ObservedTimestamp set, + // ObservedTimestamp should be used + lr := testdata.GenerateLogsOneLogRecord() + ld := lr.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + + ld.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 0))) + ld.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Unix(1686235113, 0))) + + testLEventRaw.Ts = "1686235113000000000" + testLEventRaw.Attrs["observed.timestamp"] = "2023-06-08 14:38:33 +0000 UTC" + delete(testLEventRaw.Attrs, "timestamp") + delete(testLEventRaw.Attrs, "resource.attributes.resource-attr") + + expected := &add_events.EventBundle{ + Event: testLEventRaw, + Thread: testLThread, + Log: testLLog, + } + was := buildEventFromLog( + ld, + lr.ResourceLogs().At(0).Resource(), + lr.ResourceLogs().At(0).ScopeLogs().At(0).Scope(), + newDefaultLogsSettings(), + ) + + assert.Equal(t, expected, was) +} + +func TestBuildEventFromLogEventWithoutTimestampWithOutObservedTimestampUseCurrentTimestamp(t *testing.T) { + // When LogRecord doesn't have timestamp and ObservedTimestamp set, current timestamp + // should be used + // We mock current time to ensure stability across runs + + now = func() time.Time { return time.Unix(123456789, 0) } + currentTime := now() + assert.Equal(t, currentTime, time.Unix(123456789, 0)) + assert.Equal(t, strconv.FormatInt(currentTime.UnixNano(), 10), "123456789000000000") + + lr := testdata.GenerateLogsOneLogRecord() + ld := lr.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + + ld.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 0))) + ld.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 0))) + + testLEventRaw.Ts = strconv.FormatInt(currentTime.UnixNano(), 10) + delete(testLEventRaw.Attrs, "timestamp") + delete(testLEventRaw.Attrs, "observed.timestamp") + delete(testLEventRaw.Attrs, "resource.attributes.resource-attr") + + expected := &add_events.EventBundle{ + Event: testLEventRaw, + Thread: testLThread, + Log: testLLog, + } + was := buildEventFromLog( + ld, + lr.ResourceLogs().At(0).Resource(), + lr.ResourceLogs().At(0).ScopeLogs().At(0).Scope(), + newDefaultLogsSettings(), ) assert.Equal(t, expected, was) diff --git a/exporter/datasetexporter/testdata/config.yaml b/exporter/datasetexporter/testdata/config.yaml index fa6b2c5c2697..61360b810919 100644 --- a/exporter/datasetexporter/testdata/config.yaml +++ b/exporter/datasetexporter/testdata/config.yaml @@ -25,6 +25,8 @@ dataset/full: retry_max_elapsed_time: 23s traces: max_wait: 3s + logs: + export_resource_info_on_event: true retry_on_failure: enabled: true initial_interval: 11