Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: [processor/interval] Implement the main logic #30827

Closed
wants to merge 11 commits into from
Next Next commit
[internal/exp/metrics] Add a new internal package for handling metric…
… staleness

It's a glorified wrapper over a Map type, which allows values to be expired based on a pre-supplied interval.
  • Loading branch information
RichieSams committed Feb 21, 2024
commit 064769e01e931af7bb08fa218da151613c98815f
8 changes: 6 additions & 2 deletions internal/exp/metrics/go.mod
Original file line number Diff line number Diff line change
@@ -1,28 +1,32 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil
go 1.20

require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.95.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/pdata v1.2.0
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/grpc v1.61.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

go 1.21
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil

toolchain go1.21.1
5 changes: 5 additions & 0 deletions internal/exp/metrics/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 97 additions & 0 deletions internal/exp/metrics/staleness/priority_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package staleness

import (
"container/heap"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

type priorityQueueImpl []*queueItem

type queueItem struct {
key identity.Stream
prio time.Time
index int
}

func (pq priorityQueueImpl) Len() int { return len(pq) }

func (pq priorityQueueImpl) Less(i, j int) bool {
// We want Pop to give us the lowest priority
return pq[i].prio.Before(pq[j].prio)
}

func (pq priorityQueueImpl) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}

func (pq *priorityQueueImpl) Push(x any) {
n := len(*pq)
item := x.(*queueItem)
item.index = n
*pq = append(*pq, item)
}

func (pq *priorityQueueImpl) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}

func (pq *priorityQueueImpl) Update(item *queueItem, newPrio time.Time) {
item.prio = newPrio
heap.Fix(pq, item.index)
}

type PriorityQueue struct {
inner priorityQueueImpl
itemLookup map[identity.Stream]*queueItem
}

func NewPriorityQueue() *PriorityQueue {
pq := &PriorityQueue{
inner: priorityQueueImpl{},
itemLookup: map[identity.Stream]*queueItem{},
}
heap.Init(&pq.inner)

return pq
}

func (pq *PriorityQueue) Update(id identity.Stream, newPrio time.Time) {
item, ok := pq.itemLookup[id]
if !ok {
item = &queueItem{
key: id,
prio: newPrio,
}
heap.Push(&pq.inner, item)
pq.itemLookup[id] = item
} else {
pq.inner.Update(item, newPrio)
}
}

func (pq *PriorityQueue) Peek() (identity.Stream, time.Time) {
val := pq.inner[0]
return val.key, val.prio
}

func (pq *PriorityQueue) Pop() (identity.Stream, time.Time) {
val := heap.Pop(&pq.inner).(*queueItem)
return val.key, val.prio
}

func (pq *PriorityQueue) Len() int {
return pq.inner.Len()
}
111 changes: 111 additions & 0 deletions internal/exp/metrics/staleness/priority_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package staleness

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

func TestPriorityQueueImpl(t *testing.T) {
t.Parallel()

pq := NewPriorityQueue()

idA := generateStreamID(t, map[string]any{
"aaa": "123",
})
idB := generateStreamID(t, map[string]any{
"bbb": "456",
})
idC := generateStreamID(t, map[string]any{
"ccc": "789",
})

initialTime := time.Time{}
prioA := initialTime.Add(2 * time.Second)
prioB := initialTime.Add(1 * time.Second)
prioC := initialTime.Add(3 * time.Second)

pq.Update(idA, prioA)
pq.Update(idB, prioB)
pq.Update(idC, prioC)

// The first item should be B
id, prio := pq.Peek()
require.Equal(t, idB, id)
require.Equal(t, prioB, prio)

// If we peek again, nothing should change
id, prio = pq.Peek()
require.Equal(t, idB, id)
require.Equal(t, prioB, prio)

// Pop should return the same thing
id, prio = pq.Pop()
require.Equal(t, idB, id)
require.Equal(t, prioB, prio)

// Now if we peek again, it should be the next item
id, prio = pq.Peek()
require.Equal(t, idA, id)
require.Equal(t, prioA, prio)

// Pop should return the same thing
id, prio = pq.Pop()
require.Equal(t, idA, id)
require.Equal(t, prioA, prio)

// One last time
id, prio = pq.Peek()
require.Equal(t, idC, id)
require.Equal(t, prioC, prio)

// Pop should return the same thing
id, prio = pq.Pop()
require.Equal(t, idC, id)
require.Equal(t, prioC, prio)

// The queue should now be empty
require.Equal(t, 0, pq.Len())
}

func generateStreamID(t *testing.T, attributes map[string]any) identity.Stream {
res := pcommon.NewResource()
err := res.Attributes().FromRaw(map[string]any{
"foo": "bar",
"asdf": "qwer",
})
require.NoError(t, err)

scope := pcommon.NewInstrumentationScope()
scope.SetName("TestScope")
scope.SetVersion("v1.2.3")
err = scope.Attributes().FromRaw(map[string]any{
"aaa": "bbb",
"ccc": "ddd",
})
require.NoError(t, err)

metric := pmetric.NewMetric()

sum := metric.SetEmptySum()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)

dp := sum.DataPoints().AppendEmpty()
dp.SetStartTimestamp(678)
dp.SetTimestamp(789)
dp.SetDoubleValue(123.456)
err = dp.Attributes().FromRaw(attributes)
require.NoError(t, err)

return identity.OfStream(identity.OfResourceMetric(res, scope, metric), dp)
}
55 changes: 55 additions & 0 deletions internal/exp/metrics/staleness/staleness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package staleness

import (
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

// We override how Now() is returned, so we can have deterministic tests
var nowFunc = time.Now

type Map[T any] interface {
Load(key identity.Stream) (T, bool)
Store(key identity.Stream, value T)
Delete(key identity.Stream)
// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
Range(f func(key identity.Stream, value T) bool)
}

type Staleness[T any] struct {
max time.Duration

Map[T]
pq *PriorityQueue
}

func NewStaleness[T any](max time.Duration, newMap Map[T]) *Staleness[T] {
return &Staleness[T]{
max: max,
Map: newMap,
pq: NewPriorityQueue(),
}
}

func (s *Staleness[T]) Store(id identity.Stream, value T) {
s.pq.Update(id, nowFunc())
s.Map.Store(id, value)
}

func (s *Staleness[T]) ExpireOldEntries() {
now := nowFunc()

for {
_, ts := s.pq.Peek()
if now.Sub(ts) < s.max {
break
}
id, _ := s.pq.Pop()
s.Map.Delete(id)
}
}
Loading