Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] Add OTel mapping mode for metrics #34248

Merged
merged 35 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
5815374
Add metrics grouping for otel mode
carsonip Jul 25, 2024
c816d4d
Add test
carsonip Jul 25, 2024
50a46fa
Add hack
carsonip Jul 25, 2024
786d0f0
Emit dynamic templates
carsonip Jul 25, 2024
4d03f98
Fix dynamic template name
carsonip Jul 26, 2024
98f5d7c
Workaround tsdb not supporting bool dimension
carsonip Jul 26, 2024
9d806e9
Temporarily use start_time instead of start_timestamp
carsonip Jul 26, 2024
2b5c24f
Revert to use start_timestamp
carsonip Jul 26, 2024
c8663a0
Add complex object to fix histogram falling into doc _ignored
carsonip Jul 29, 2024
fdcf8ca
Only emit start_timestamp when it is non-zero
carsonip Jul 29, 2024
91e10ae
Fix otel test
carsonip Jul 29, 2024
76ad969
Add sum to test
carsonip Jul 29, 2024
552a0b2
Fix tests
carsonip Jul 29, 2024
53b0253
Revert "Workaround tsdb not supporting bool dimension"
carsonip Jul 30, 2024
51bca38
Merge branch 'main' into otel-mode-metrics
carsonip Aug 12, 2024
8ded84c
Bump go-docappender to v2.3.0
carsonip Aug 12, 2024
497fbb5
go mod tidy
carsonip Aug 12, 2024
bcae04a
Remove go mod replace
carsonip Aug 12, 2024
6246e32
Fix NumberDataPointValueTypeEmpty dynamicTemplates
carsonip Aug 12, 2024
c48624f
Rename to KindUnflattenableObject
carsonip Aug 12, 2024
66b3cf4
Merge branch 'main' into otel-mode-metrics
carsonip Aug 12, 2024
0190df2
Fix typo
carsonip Aug 13, 2024
2998221
Fix quantiles
carsonip Aug 13, 2024
ff9d86a
Dynamically map summary
carsonip Aug 13, 2024
8ccf3a1
Use summary_metrics instead of summary_gauge
carsonip Aug 13, 2024
88128ac
Add FIXME
carsonip Aug 13, 2024
7086ea3
Exclude DS attr in metric hash
carsonip Aug 13, 2024
e371e53
Fix test
carsonip Aug 13, 2024
69ac0b3
Change quantiles from FIXME to TODO
carsonip Aug 13, 2024
dd1a70e
Merge branch 'main' into otel-mode-metrics
carsonip Aug 13, 2024
03228a4
Add changelog
carsonip Aug 13, 2024
2290cc0
Revert go mod diff
carsonip Aug 13, 2024
5c7d991
Make linter happy
carsonip Aug 13, 2024
20ed43f
Make ECS use mapHashExcludeDataStreamAttr
carsonip Aug 14, 2024
bd7cbd7
Describe metricDpToDynamicTemplate
carsonip Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add metrics grouping for otel mode
  • Loading branch information
carsonip committed Jul 25, 2024
commit 58153745fa416c88465b30e11f623b2f5cbeb089
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ func (doc *Document) iterJSONFlat(w *json.Visitor, otel bool) error {
return nil
}

// FIXME: metrics.* as well
// Set of prefixes for the OTel attributes that needs to stay flattened
var otelPrefixSet = map[string]struct{}{
"attributes.": {},
Expand Down
107 changes: 107 additions & 0 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type encodeModel struct {

type dataPoint interface {
Timestamp() pcommon.Timestamp
StartTimestamp() pcommon.Timestamp
Attributes() pcommon.Map
}

Expand Down Expand Up @@ -321,6 +322,94 @@ func (m *encodeModel) upsertMetricDataPointValue(documents map[uint32]objmodel.D
return nil
}

func (m *encodeModel) upsertMetricDataPointValueOTelMode(documents map[uint32]objmodel.Document, resource pcommon.Resource, resourceSchemaUrl string, scope pcommon.InstrumentationScope, scopeSchemaUrl string, metric pmetric.Metric, dp dataPoint, value pcommon.Value) error {
// documents is per-resource. Therefore, there is no need to hash resource attributes
hash := metricOTelHash(dp, scope.Attributes(), metric.Unit())
var (
document objmodel.Document
ok bool
)
if document, ok = documents[hash]; !ok {
document.AddTimestamp("@timestamp", dp.Timestamp())
document.AddTimestamp("start_timestamp", dp.StartTimestamp())
document.AddString("unit", metric.Unit())

// At this point the data_stream attributes are expected to be in the record attributes,
// updated by the router.
// Move them to the top of the document and remove them from the record
attributeMap := dp.Attributes()

forEachDataStreamKey := func(fn func(key string)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactoring done in #34472

for _, key := range datastreamKeys {
fn(key)
}
}

forEachDataStreamKey(func(key string) {
if value, exists := attributeMap.Get(key); exists {
document.AddAttribute(key, value)
attributeMap.Remove(key)
}
})

document.AddAttributes("attributes", attributeMap)

// Resource
resourceMapVal := pcommon.NewValueMap()
resourceMap := resourceMapVal.Map()
resourceMap.PutStr("schema_url", resourceSchemaUrl)
resourceMap.PutInt("dropped_attributes_count", int64(resource.DroppedAttributesCount()))
resourceAttrMap := resourceMap.PutEmptyMap("attributes")

resource.Attributes().CopyTo(resourceAttrMap)

// Remove data_stream attributes from the resources attributes if present
forEachDataStreamKey(func(key string) {
resourceAttrMap.Remove(key)
})

document.Add("resource", objmodel.ValueFromAttribute(resourceMapVal))

// Scope
scopeMapVal := pcommon.NewValueMap()
scopeMap := scopeMapVal.Map()
if scope.Name() != "" {
scopeMap.PutStr("name", scope.Name())
}
if scope.Version() != "" {
scopeMap.PutStr("version", scope.Version())
}
if scopeSchemaUrl != "" {
scopeMap.PutStr("schema_url", scopeSchemaUrl)
}
if scope.DroppedAttributesCount() > 0 {
scopeMap.PutInt("dropped_attributes_count", int64(scope.DroppedAttributesCount()))
}
scopeAttributes := scope.Attributes()
if scopeAttributes.Len() > 0 {
scopeAttrMap := scopeMap.PutEmptyMap("attributes")
scopeAttributes.CopyTo(scopeAttrMap)

// Remove data_stream attributes from the scope attributes if present
forEachDataStreamKey(func(key string) {
scopeAttrMap.Remove(key)
})
}

if scopeMap.Len() > 0 {
document.Add("scope", objmodel.ValueFromAttribute(scopeMapVal))
}
}

metricsMap := pcommon.NewValueMap()
v := metricsMap.SetEmptyMap().PutEmpty(metric.Name())
value.CopyTo(v)
document.Add("metrics", objmodel.ValueFromAttribute(metricsMap))

documents[hash] = document
return nil
}

func histogramToValue(dp pmetric.HistogramDataPoint) (pcommon.Value, error) {
// Histogram conversion function is from
// https://github.com/elastic/apm-data/blob/3b28495c3cbdc0902983134276eb114231730249/input/otlp/metrics.go#L277
Expand Down Expand Up @@ -586,6 +675,24 @@ func metricHash(timestamp pcommon.Timestamp, attributes pcommon.Map) uint32 {
return hasher.Sum32()
}

func metricOTelHash(dp dataPoint, scopeAttrs pcommon.Map, unit string) uint32 {
hasher := fnv.New32a()

timestampBuf := make([]byte, 8)
binary.LittleEndian.PutUint64(timestampBuf, uint64(dp.Timestamp()))
hasher.Write(timestampBuf)

binary.LittleEndian.PutUint64(timestampBuf, uint64(dp.StartTimestamp()))
hasher.Write(timestampBuf)

hasher.Write([]byte(unit))

mapHash(hasher, scopeAttrs)
mapHash(hasher, dp.Attributes())

return hasher.Sum32()
}

func mapHash(hasher hash.Hash, m pcommon.Map) {
m.Range(func(k string, v pcommon.Value) bool {
hasher.Write([]byte(k))
Expand Down
Loading