Skip to content

Commit

Permalink
[exporter/elasticsearch] Emit _doc_count for metric documents in OTel…
Browse files Browse the repository at this point in the history
… mode when data point attribute _doc_count is true (#35348)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
Emit _doc_count for metric documents in OTel mode when data point
attribute _doc_count is true

**Link to tracking Issue:** <Issue number if applicable>

**Testing:** <Describe what testing was performed and which tests were
added.>

**Documentation:** <Describe the documentation added.>
  • Loading branch information
carsonip committed Sep 25, 2024
1 parent 350f4bd commit 24ea6bc
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 0 deletions.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_otel-mode-doc-count.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Emit _doc_count for metric documents in OTel mode when data point attribute _doc_count is true

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35348]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
47 changes: 47 additions & 0 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,53 @@ func TestExporterMetrics(t *testing.T) {
assert.Equal(t, `{"some.resource.attribute":["foo","bar"]}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
})

t.Run("otel mode _doc_count", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "otel"
})

metrics := pmetric.NewMetrics()
resourceMetric := metrics.ResourceMetrics().AppendEmpty()
scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty()

sumMetric := scopeMetric.Metrics().AppendEmpty()
sumMetric.SetName("sum")
sumDP := sumMetric.SetEmptySum().DataPoints().AppendEmpty()
sumDP.SetIntValue(0)

summaryMetric := scopeMetric.Metrics().AppendEmpty()
summaryMetric.SetName("summary")
summaryDP := summaryMetric.SetEmptySummary().DataPoints().AppendEmpty()
summaryDP.SetSum(1)
summaryDP.SetCount(10)
fillAttributeMap(summaryDP.Attributes(), map[string]any{
"_doc_count": true,
})

mustSendMetrics(t, exporter, metrics)

rec.WaitItems(2)
expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.summary":"summary_metrics"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","_doc_count":10,"attributes":{"_doc_count":true},"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"summary":{"sum":1.0,"value_count":10}},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`),
},
{
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.sum":"gauge_long"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"sum":0},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`),
},
}

assertItemsEqual(t, expected, rec.Items(), false)

})

t.Run("publish summary", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
Expand Down
24 changes: 24 additions & 0 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type dataPoint interface {
Attributes() pcommon.Map
Value() (pcommon.Value, error)
DynamicTemplate(pmetric.Metric) string
DocCount() uint64
}

const (
Expand Down Expand Up @@ -284,6 +285,7 @@ func (m *encodeModel) upsertMetricDataPointValueOTelMode(documents map[uint32]ob
if err != nil {
return err
}

// documents is per-resource. Therefore, there is no need to hash resource attributes
hash := metricOTelHash(dp, scope.Attributes(), metric.Unit())
var (
Expand All @@ -302,6 +304,12 @@ func (m *encodeModel) upsertMetricDataPointValueOTelMode(documents map[uint32]ob
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)
}

// Emit _doc_count if data point contains attribute _doc_count: true
if val, ok := dp.Attributes().Get("_doc_count"); ok && val.Bool() {
docCount := dp.DocCount()
document.AddInt("_doc_count", int64(docCount))
}

switch value.Type() {
case pcommon.ValueTypeMap:
m := pcommon.NewMap()
Expand Down Expand Up @@ -340,6 +348,10 @@ func (dp summaryDataPoint) DynamicTemplate(_ pmetric.Metric) string {
return "summary_metrics"
}

func (dp summaryDataPoint) DocCount() uint64 {
return dp.Count()
}

type exponentialHistogramDataPoint struct {
pmetric.ExponentialHistogramDataPoint
}
Expand Down Expand Up @@ -367,6 +379,10 @@ func (dp exponentialHistogramDataPoint) DynamicTemplate(_ pmetric.Metric) string
return "histogram"
}

func (dp exponentialHistogramDataPoint) DocCount() uint64 {
return dp.Count()
}

type histogramDataPoint struct {
pmetric.HistogramDataPoint
}
Expand All @@ -379,6 +395,10 @@ func (dp histogramDataPoint) DynamicTemplate(_ pmetric.Metric) string {
return "histogram"
}

func (dp histogramDataPoint) DocCount() uint64 {
return dp.HistogramDataPoint.Count()
}

func histogramToValue(dp pmetric.HistogramDataPoint) (pcommon.Value, error) {
// Histogram conversion function is from
// https://github.com/elastic/apm-data/blob/3b28495c3cbdc0902983134276eb114231730249/input/otlp/metrics.go#L277
Expand Down Expand Up @@ -475,6 +495,10 @@ func (dp numberDataPoint) DynamicTemplate(metric pmetric.Metric) string {
return ""
}

func (dp numberDataPoint) DocCount() uint64 {
return 1
}

var errInvalidNumberDataPoint = errors.New("invalid number data point")

func (m *encodeModel) encodeResourceOTelMode(document *objmodel.Document, resource pcommon.Resource, resourceSchemaURL string) {
Expand Down
2 changes: 2 additions & 0 deletions exporter/elasticsearchexporter/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ func fillAttributeMap(attrs pcommon.Map, m map[string]any) {
attrs.EnsureCapacity(len(m))
for k, v := range m {
switch vv := v.(type) {
case bool:
attrs.PutBool(k, vv)
case string:
attrs.PutStr(k, vv)
case []string:
Expand Down

0 comments on commit 24ea6bc

Please sign in to comment.