Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] Add OTel mapping mode for metrics #34248

Merged
merged 35 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
5815374
Add metrics grouping for otel mode
carsonip Jul 25, 2024
c816d4d
Add test
carsonip Jul 25, 2024
50a46fa
Add hack
carsonip Jul 25, 2024
786d0f0
Emit dynamic templates
carsonip Jul 25, 2024
4d03f98
Fix dynamic template name
carsonip Jul 26, 2024
98f5d7c
Workaround tsdb not supporting bool dimension
carsonip Jul 26, 2024
9d806e9
Temporarily use start_time instead of start_timestamp
carsonip Jul 26, 2024
2b5c24f
Revert to use start_timestamp
carsonip Jul 26, 2024
c8663a0
Add complex object to fix histogram falling into doc _ignored
carsonip Jul 29, 2024
fdcf8ca
Only emit start_timestamp when it is non-zero
carsonip Jul 29, 2024
91e10ae
Fix otel test
carsonip Jul 29, 2024
76ad969
Add sum to test
carsonip Jul 29, 2024
552a0b2
Fix tests
carsonip Jul 29, 2024
53b0253
Revert "Workaround tsdb not supporting bool dimension"
carsonip Jul 30, 2024
51bca38
Merge branch 'main' into otel-mode-metrics
carsonip Aug 12, 2024
8ded84c
Bump go-docappender to v2.3.0
carsonip Aug 12, 2024
497fbb5
go mod tidy
carsonip Aug 12, 2024
bcae04a
Remove go mod replace
carsonip Aug 12, 2024
6246e32
Fix NumberDataPointValueTypeEmpty dynamicTemplates
carsonip Aug 12, 2024
c48624f
Rename to KindUnflattenableObject
carsonip Aug 12, 2024
66b3cf4
Merge branch 'main' into otel-mode-metrics
carsonip Aug 12, 2024
0190df2
Fix typo
carsonip Aug 13, 2024
2998221
Fix quantiles
carsonip Aug 13, 2024
ff9d86a
Dynamically map summary
carsonip Aug 13, 2024
8ccf3a1
Use summary_metrics instead of summary_gauge
carsonip Aug 13, 2024
88128ac
Add FIXME
carsonip Aug 13, 2024
7086ea3
Exclude DS attr in metric hash
carsonip Aug 13, 2024
e371e53
Fix test
carsonip Aug 13, 2024
69ac0b3
Change quantiles from FIXME to TODO
carsonip Aug 13, 2024
dd1a70e
Merge branch 'main' into otel-mode-metrics
carsonip Aug 13, 2024
03228a4
Add changelog
carsonip Aug 13, 2024
2290cc0
Revert go mod diff
carsonip Aug 13, 2024
5c7d991
Make linter happy
carsonip Aug 13, 2024
20ed43f
Make ECS use mapHashExcludeDataStreamAttr
carsonip Aug 14, 2024
bd7cbd7
Describe metricDpToDynamicTemplate
carsonip Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add test
  • Loading branch information
carsonip committed Jul 25, 2024
commit c816d4da64473ce4f73dae2e4b784d248615c695
3 changes: 2 additions & 1 deletion exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ func (e *elasticsearchExporter) pushMetricsData(
resourceDocs[fIndex] = make(map[uint32]objmodel.Document)
}

if err = e.model.upsertMetricDataPointValue(resourceDocs[fIndex], resource, scope, metric, dp, dpValue); err != nil {
if err = e.model.upsertMetricDataPointValue(resourceDocs[fIndex], resource,
resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), metric, dp, dpValue); err != nil {
return err
}
return nil
Expand Down
44 changes: 44 additions & 0 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,50 @@ func TestExporterMetrics(t *testing.T) {

assertItemsEqual(t, expected, rec.Items(), false)
})

t.Run("publish histogram otel mode", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "otel"
})

metrics := pmetric.NewMetrics()
resourceMetrics := metrics.ResourceMetrics().AppendEmpty()
scopeA := resourceMetrics.ScopeMetrics().AppendEmpty()
metricSlice := scopeA.Metrics()
fooMetric := metricSlice.AppendEmpty()
fooMetric.SetName("metric.foo")
fooDps := fooMetric.SetEmptyHistogram().DataPoints()
fooDp := fooDps.AppendEmpty()
fooDp.ExplicitBounds().FromRaw([]float64{1.0, 2.0, 3.0})
fooDp.BucketCounts().FromRaw([]uint64{1, 2, 3, 4})
fooOtherDp := fooDps.AppendEmpty()
fooOtherDp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(3600, 0)))
fooOtherDp.ExplicitBounds().FromRaw([]float64{4.0, 5.0, 6.0})
fooOtherDp.BucketCounts().FromRaw([]uint64{4, 5, 6, 7})

mustSendMetrics(t, exporter, metrics)

rec.WaitItems(2)

expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default"}}`),
Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[4,5,6,7],"values":[2,4.5,5.5,6]}},"resource":{"dropped_attributes_count":0,"schema_url":""},"start_timestamp":"1970-01-01T00:00:00.000000000Z"}`),
},
{
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default"}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[1,2,3,4],"values":[0.5,1.5,2.5,3]}},"resource":{"dropped_attributes_count":0,"schema_url":""},"start_timestamp":"1970-01-01T00:00:00.000000000Z"}`),
},
}

assertItemsEqual(t, expected, rec.Items(), false)
})
}

func TestExporterTraces(t *testing.T) {
Expand Down
15 changes: 13 additions & 2 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ var resourceAttrsToPreserve = map[string]bool{
type mappingModel interface {
encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string) ([]byte, error)
encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error)
upsertMetricDataPointValue(map[uint32]objmodel.Document, pcommon.Resource, pcommon.InstrumentationScope, pmetric.Metric, dataPoint, pcommon.Value) error
upsertMetricDataPointValue(map[uint32]objmodel.Document, pcommon.Resource, string, pcommon.InstrumentationScope, string, pmetric.Metric, dataPoint, pcommon.Value) error
encodeDocument(objmodel.Document) ([]byte, error)
}

Expand Down Expand Up @@ -304,7 +304,18 @@ func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error)
return buf.Bytes(), nil
}

func (m *encodeModel) upsertMetricDataPointValue(documents map[uint32]objmodel.Document, resource pcommon.Resource, _ pcommon.InstrumentationScope, metric pmetric.Metric, dp dataPoint, value pcommon.Value) error {
func (m *encodeModel) upsertMetricDataPointValue(documents map[uint32]objmodel.Document, resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaUrl string, metric pmetric.Metric, dp dataPoint, value pcommon.Value) error {
switch m.mode {
case MappingECS:
return m.upsertMetricDataPointValueECSMode(documents, resource, resourceSchemaURL, scope, scopeSchemaUrl, metric, dp, value)
case MappingOTel:
return m.upsertMetricDataPointValueOTelMode(documents, resource, resourceSchemaURL, scope, scopeSchemaUrl, metric, dp, value)
default:
return errors.New("unsupported mode")
}
}

func (m *encodeModel) upsertMetricDataPointValueECSMode(documents map[uint32]objmodel.Document, resource pcommon.Resource, resourceSchemaURL string, _ pcommon.InstrumentationScope, scopeSchemaUrl string, metric pmetric.Metric, dp dataPoint, value pcommon.Value) error {
hash := metricHash(dp.Timestamp(), dp.Attributes())
var (
document objmodel.Document
Expand Down
2 changes: 2 additions & 0 deletions exporter/elasticsearchexporter/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ func TestEncodeMetric(t *testing.T) {
require.NoError(t, err)
err = model.upsertMetricDataPointValue(docs,
metrics.ResourceMetrics().At(0).Resource(),
"",
metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope(),
"",
metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0),
metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i),
val)
Expand Down
Loading