Skip to content

Commit

Permalink
Merge pull request #2230 from keboola/petr-hosek-PSGO-834
Browse files Browse the repository at this point in the history
Add telemetry and memory utilities for MirrorMap and MirrorTree
  • Loading branch information
hosekpeter authored Feb 10, 2025
2 parents 20013c8 + a7a0159 commit 97e6093
Show file tree
Hide file tree
Showing 25 changed files with 321 additions and 31 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ services:
# Stream Service
- "8001:8001" # API
- "9001:9001" # metrics
- "8010:8010" # source
# Apps Proxy
- "8002:8002" # Proxy
- "9002:9002" # metrics
Expand Down
75 changes: 74 additions & 1 deletion internal/pkg/service/common/etcdop/watch_mirror_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ import (
"context"
"maps"
"sync"
"time"
"unsafe"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/ctxattr"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/op"
"github.com/keboola/keboola-as-code/internal/pkg/telemetry"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
"github.com/keboola/keboola-as-code/internal/pkg/utils/memory"
)

// MirrorMap [T,K, V] is an in memory Go map filled via the etcd Watch API from a RestartableWatchStream[T].
Expand Down Expand Up @@ -71,9 +76,17 @@ func (s MirrorMapSetup[T, K, V]) BuildMirror() *MirrorMap[T, K, V] {
}
}

func (m *MirrorMap[T, K, V]) StartMirroring(ctx context.Context, wg *sync.WaitGroup, logger log.Logger) (initErr <-chan error) {
// StartMirroring initializes the mirroring process for the MirrorMap by starting a watcher and processing events.
// It locks and updates the internal map on event changes, captures telemetry, and invokes registered callbacks.
// Returns a channel of initialization errors if the consumer fails to start.
func (m *MirrorMap[T, K, V]) StartMirroring(ctx context.Context, wg *sync.WaitGroup, logger log.Logger, tel telemetry.Telemetry) (initErr <-chan error) {
ctx = ctxattr.ContextWith(ctx, attribute.String("stream.prefix", m.stream.WatchedPrefix()))

// Start telemetry collection in a separate goroutine.
// This routine collects metrics about the memory usage and the state of the MirrorMap.
wg.Add(1)
go m.startTelemetryCollection(ctx, wg, logger, tel)

consumer := newConsumerSetup(m.stream).
WithForEach(func(events []WatchEvent[T], header *Header, restart bool) {
update := MirrorUpdate{Header: header, Restart: restart}
Expand Down Expand Up @@ -248,3 +261,63 @@ func (m *MirrorMap[T, K, V]) ForEach(fn func(K, V) (stop bool)) {
}
}
}

func (m *MirrorMap[T, K, V]) recordTelemetry(ctx context.Context, tel telemetry.Telemetry) {
tel.Meter().
IntCounter(
"keboola.go.mirror.map.num.keys",
"Number of keys in the mirror map.",
"count",
).
Add(
ctx,
int64(len(m.mapData)),
metric.WithAttributes(attribute.String("prefix", m.stream.WatchedPrefix())),
)
}

func (m *MirrorMap[T, K, V]) recordMemoryTelemetry(ctx context.Context, tel telemetry.Telemetry) {
// Variable to track memory consumed
var memoryConsumed int64

// Measure base memory usage of the map structure
memoryConsumed += int64(unsafe.Sizeof(*m))

// Lock the map to measure memory usage of its elements
for k, v := range m.mapData {
memoryConsumed += int64(unsafe.Sizeof(k)) // Add key size
memoryConsumed += int64(memory.Size(v)) // Add value size
}

// Emit metrics for the current map memory
meter := tel.Meter()

// Track memory consumed by the map itself
meter.IntCounter(
"keboola.go.mirror.map.memory.usage.bytes",
"Memory consumed by the MirrorMap, including keys and values.",
"bytes",
).Add(ctx, memoryConsumed, metric.WithAttributes(attribute.String("prefix", m.stream.WatchedPrefix())))
}

// Function for periodic telemetry collection (runs as a goroutine).
func (m *MirrorMap[T, K, V]) startTelemetryCollection(ctx context.Context, wg *sync.WaitGroup, logger log.Logger, tel telemetry.Telemetry) {
defer wg.Done()

ticker := time.NewTicker(30 * time.Second) // Emit telemetry every 5 minutes
defer ticker.Stop()

for {
select {
case <-ticker.C:
// Emit telemetry metrics
m.mapLock.RLock()
m.recordTelemetry(ctx, tel)
m.recordMemoryTelemetry(ctx, tel)
m.mapLock.RUnlock()
case <-ctx.Done():
logger.Debugf(ctx, "Telemetry collection stopped: %v", ctx.Err())
return
}
}
}
22 changes: 16 additions & 6 deletions internal/pkg/service/common/etcdop/watch_mirror_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/op"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde"
"github.com/keboola/keboola-as-code/internal/pkg/telemetry"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
"github.com/keboola/keboola-as-code/internal/pkg/utils/etcdhelper"
)
Expand All @@ -36,18 +37,21 @@ func TestMirrorMap(t *testing.T) {
// Setup mirroring of the etcd prefix tree to the memory, with custom key and value mapping.
// The result are in-memory KV pairs "<first name> <last name>" => <age>.
logger := log.NewDebugLogger()
tel := telemetry.NewForTest(t)
mirror := SetupMirrorMap[testUser](
pfx.GetAllAndWatch(ctx, client, etcd.WithPrevKV()),
func(key string, value testUser) testUserFullName {
return testUserFullName(value.FirstName + " " + value.LastName)
},
func(key string, value testUser, rawValue *op.KeyValue, oldValue *int) int { return value.Age },
func(key string, value testUser, rawValue *op.KeyValue, oldValue *int) int {
return value.Age
},
).
WithFilter(func(event WatchEvent[testUser]) bool {
return !strings.Contains(event.Kv.String(), "/ignore")
}).
BuildMirror()
errCh := mirror.StartMirroring(ctx, wg, logger)
errCh := mirror.StartMirroring(ctx, wg, logger, tel)

// waitForSync: it waits until the memory mirror is synchronized with the revision of the last change
var header *op.Header
Expand Down Expand Up @@ -155,12 +159,15 @@ func TestMirror_WithOnUpdate(t *testing.T) {
// Setup mirroring of the etcd prefix tree to the memory, with custom key and value mapping.
// The result are in-memory KV pairs "<first name> <last name>" => <age>.
logger := log.NewDebugLogger()
tel := telemetry.NewForTest(t)
mirror := SetupMirrorMap[testUser](
pfx.GetAllAndWatch(ctx, client, etcd.WithPrevKV()),
func(key string, value testUser) testUserFullName {
return testUserFullName(value.FirstName + " " + value.LastName)
},
func(key string, value testUser, rawValue *op.KeyValue, oldValue *int) int { return value.Age },
func(key string, value testUser, rawValue *op.KeyValue, oldValue *int) int {
return value.Age
},
).
WithFilter(func(event WatchEvent[testUser]) bool {
return !strings.Contains(event.Kv.String(), "/ignore")
Expand All @@ -169,7 +176,7 @@ func TestMirror_WithOnUpdate(t *testing.T) {
updateCh <- update
}).
BuildMirror()
errCh := mirror.StartMirroring(ctx, wg, logger)
errCh := mirror.StartMirroring(ctx, wg, logger, tel)

// waitForSync: it waits until the memory mirror is synchronized with the revision of the last change
var header *op.Header
Expand Down Expand Up @@ -246,12 +253,15 @@ func TestMirrorMap_WithOnChanges(t *testing.T) {
// Setup mirroring of the etcd prefix tree to the memory, with custom key and value mapping.
// The result are in-memory KV pairs "<first name> <last name>" => <age>.
logger := log.NewDebugLogger()
tel := telemetry.NewForTest(t)
mirror := SetupMirrorMap[testUser](
pfx.GetAllAndWatch(ctx, client, etcd.WithPrevKV()),
func(key string, value testUser) testUserFullName {
return testUserFullName(value.FirstName + " " + value.LastName)
},
func(key string, value testUser, rawValue *op.KeyValue, oldValue *int) int { return value.Age },
func(key string, value testUser, rawValue *op.KeyValue, oldValue *int) int {
return value.Age
},
).
WithFilter(func(event WatchEvent[testUser]) bool {
return !strings.Contains(event.Kv.String(), "/ignore")
Expand All @@ -260,7 +270,7 @@ func TestMirrorMap_WithOnChanges(t *testing.T) {
changesCh <- changes
}).
BuildMirror()
errCh := mirror.StartMirroring(ctx, wg, logger)
errCh := mirror.StartMirroring(ctx, wg, logger, tel)

// waitForSync: it waits until the memory mirror is synchronized with the revision of the last change
var header *op.Header
Expand Down
76 changes: 75 additions & 1 deletion internal/pkg/service/common/etcdop/watch_mirror_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@ package etcdop
import (
"context"
"sync"
"time"
"unsafe"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/ctxattr"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/op"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/prefixtree"
"github.com/keboola/keboola-as-code/internal/pkg/telemetry"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
"github.com/keboola/keboola-as-code/internal/pkg/utils/memory"
)

// MirrorTree [T,V] is an in memory AtomicTree filled via the etcd Watch API from a RestartableWatchStream[T].
Expand Down Expand Up @@ -85,9 +90,14 @@ func (s MirrorTreeSetup[T, V]) BuildMirror() *MirrorTree[T, V] {
}
}

func (m *MirrorTree[T, V]) StartMirroring(ctx context.Context, wg *sync.WaitGroup, logger log.Logger) (initErr <-chan error) {
func (m *MirrorTree[T, V]) StartMirroring(ctx context.Context, wg *sync.WaitGroup, logger log.Logger, tel telemetry.Telemetry) (initErr <-chan error) {
ctx = ctxattr.ContextWith(ctx, attribute.String("stream.prefix", m.stream.WatchedPrefix()))

wg.Add(1)
// Launching a goroutine to start collecting telemetry data for the MirrorTree.
// This allows asynchronous monitoring of metrics related to the tree's performance and usage.
go m.startTelemetryCollection(ctx, wg, tel, logger)

consumer := newConsumerSetup(m.stream).
WithForEach(func(events []WatchEvent[T], header *Header, restart bool) {
update := MirrorUpdate{Header: header, Restart: restart}
Expand Down Expand Up @@ -161,6 +171,9 @@ func (m *MirrorTree[T, V]) StartMirroring(ctx context.Context, wg *sync.WaitGrou
m.updatedLock.Unlock()
})

m.recordTelemetry(ctx, tel)
m.recordMemoryTelemetry(ctx, tel)

// Call callbacks
for _, fn := range m.onUpdate {
go fn(update)
Expand Down Expand Up @@ -270,3 +283,64 @@ func (m *MirrorTree[T, V]) WalkAll(fn func(key string, value V) (stop bool)) {
func (m *MirrorTree[T, V]) ToMap() map[string]V {
return m.tree.ToMap()
}

// recordTelemetry captures and reports the number of keys in the MirrorTree using the provided telemetry system.
func (m *MirrorTree[T, V]) recordTelemetry(ctx context.Context, tel telemetry.Telemetry) {
tel.Meter().
IntCounter(
"keboola.go.mirror.tree.num.keys",
"Number of keys in the mirror tree.",
"count",
).
Add(
ctx,
int64(m.tree.Len()),
metric.WithAttributes(attribute.String("prefix", m.stream.WatchedPrefix())))
}

func (m *MirrorTree[T, V]) recordMemoryTelemetry(ctx context.Context, tel telemetry.Telemetry) {
// Initialize a variable to track memory allocated for the tree
var memoryConsumed int64

// Measure base memory usage of the tree structure
memoryConsumed += int64(unsafe.Sizeof(*m))

// Measure size of the tree nodes and their elements
m.tree.AtomicReadOnly(func(t prefixtree.TreeReadOnly[V]) {
t.WalkAll(func(key string, value V) (stop bool) {
memoryConsumed += int64(len(key)) // Account for key size
memoryConsumed += int64(memory.Size(value)) // Account for value size
return false
})
})

// Emit telemetry
meter := tel.Meter()

// Gauge for tree memory consumption
meter.IntCounter(
"keboola.go.mirror_tree.memory.usage.bytes",
"Memory consumed by the MirrorTree, including keys and values.",
"bytes",
).Add(ctx, memoryConsumed, metric.WithAttributes(attribute.String("prefix", m.stream.WatchedPrefix())))
}

// startTelemetryCollection begins periodic telemetry reporting for the MirrorTree, including memory usage and key count.
// It runs until the given context is canceled and ensures the provided wait group is marked as done upon completion.
func (m *MirrorTree[T, V]) startTelemetryCollection(ctx context.Context, wg *sync.WaitGroup, tel telemetry.Telemetry, log log.Logger) {
defer wg.Done()

ticker := time.NewTicker(30 * time.Second) // Emit telemetry every 30 seconds
defer ticker.Stop()

for {
select {
case <-ticker.C:
m.recordTelemetry(ctx, tel)
m.recordMemoryTelemetry(ctx, tel)
case <-ctx.Done():
log.Debugf(ctx, "Telemetry collection for tree stopped: %v", ctx.Err())
return
}
}
}
13 changes: 9 additions & 4 deletions internal/pkg/service/common/etcdop/watch_mirror_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/op"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde"
"github.com/keboola/keboola-as-code/internal/pkg/telemetry"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
"github.com/keboola/keboola-as-code/internal/pkg/utils/etcdhelper"
)
Expand Down Expand Up @@ -44,6 +45,7 @@ func TestMirrorTree(t *testing.T) {
// Setup mirroring of the etcd prefix tree to the memory, with custom key and value mapping.
// The result are in-memory KV pairs "<first name> <last name>" => <age>.
logger := log.NewDebugLogger()
tel := telemetry.NewForTest(t)
mirror := SetupMirrorTree[testUser](
pfx.GetAllAndWatch(ctx, client, etcd.WithPrevKV()),
func(key string, value testUser) string { return value.FirstName + " " + value.LastName },
Expand All @@ -53,7 +55,7 @@ func TestMirrorTree(t *testing.T) {
return !strings.Contains(event.Kv.String(), "/ignore")
}).
BuildMirror()
errCh := mirror.StartMirroring(ctx, wg, logger)
errCh := mirror.StartMirroring(ctx, wg, logger, tel)

// waitForSync: it waits until the memory mirror is synchronized with the revision of the last change
var header *op.Header
Expand Down Expand Up @@ -161,6 +163,7 @@ func TestMirrorTree_WithOnUpdate(t *testing.T) {
// Setup mirroring of the etcd prefix tree to the memory, with custom key and value mapping.
// The result are in-memory KV pairs "<first name> <last name>" => <age>.
logger := log.NewDebugLogger()
tel := telemetry.NewForTest(t)
mirror := SetupMirrorTree[testUser](
pfx.GetAllAndWatch(ctx, client, etcd.WithPrevKV()),
func(key string, value testUser) string { return value.FirstName + " " + value.LastName },
Expand All @@ -173,7 +176,7 @@ func TestMirrorTree_WithOnUpdate(t *testing.T) {
updateCh <- update
}).
BuildMirror()
errCh := mirror.StartMirroring(ctx, wg, logger)
errCh := mirror.StartMirroring(ctx, wg, logger, tel)

// waitForSync: it waits until the memory mirror is synchronized with the revision of the last change
var header *op.Header
Expand Down Expand Up @@ -250,6 +253,7 @@ func TestMirrorTree_WithOnChanges(t *testing.T) {
// Setup mirroring of the etcd prefix tree to the memory, with custom key and value mapping.
// The result are in-memory KV pairs "<first name> <last name>" => <age>.
logger := log.NewDebugLogger()
tel := telemetry.NewForTest(t)
mirror := SetupMirrorTree[testUser](
pfx.GetAllAndWatch(ctx, client, etcd.WithPrevKV()),
func(key string, value testUser) string { return value.FirstName + " " + value.LastName },
Expand All @@ -262,7 +266,7 @@ func TestMirrorTree_WithOnChanges(t *testing.T) {
changesCh <- changes
}).
BuildMirror()
errCh := mirror.StartMirroring(ctx, wg, logger)
errCh := mirror.StartMirroring(ctx, wg, logger, tel)

// waitForSync: it waits until the memory mirror is synchronized with the revision of the last change
var header *op.Header
Expand Down Expand Up @@ -371,13 +375,14 @@ func TestFullMirrorTree(t *testing.T) {

// Setup full mirroring of the etcd prefix tree to the memory.
logger := log.NewDebugLogger()
tel := telemetry.NewForTest(t)
mirror := SetupFullMirrorTree(
pfx.GetAllAndWatch(ctx, client, etcd.WithPrevKV())).
WithFilter(func(event WatchEvent[testUser]) bool {
return !strings.Contains(event.Kv.String(), "/ignore")
}).
BuildMirror()
errCh := mirror.StartMirroring(ctx, wg, logger)
errCh := mirror.StartMirroring(ctx, wg, logger, tel)

// waitForSync: it waits until the memory mirror is synchronized with the revision of the last change
var header *op.Header
Expand Down
Loading

0 comments on commit 97e6093

Please sign in to comment.