Skip to content

Commit bddf4a0

Browse files
committed
fix: metric tasks_queue_compaction_tasks_by_hook
1 parent 41bc2c6 commit bddf4a0

File tree

7 files changed

+282
-261
lines changed

7 files changed

+282
-261
lines changed

pkg/metrics/metrics.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ var (
4343
// TasksQueueLength shows the current length of the task queue
4444
TasksQueueLength = "{PREFIX}tasks_queue_length"
4545
// TasksQueueCompactionOperationsTotal counts compaction operations per hook
46-
TasksQueueCompactionOperationsTotal = "{PREFIX}tasks_queue_compaction_operations_total"
47-
// TasksQueueCompactionTasksByHook shows count of tasks per hook in queue (only when count > 20)
48-
TasksQueueCompactionTasksByHook = "{PREFIX}tasks_queue_compaction_tasks_by_hook"
46+
TasksQueueCompactionOperationsTotal = "d8_telemetry_{PREFIX}tasks_queue_compaction_operations_total"
47+
// TasksQueueCompactionTasksByHook shows the number of tasks in queue for each hook when count exceeds 20
48+
TasksQueueCompactionTasksByHook = "d8_telemetry_{PREFIX}tasks_queue_compaction_tasks_by_hook"
4949

5050
// ============================================================================
5151
// Hook Execution Metrics
@@ -103,8 +103,8 @@ func InitMetrics(prefix string) {
103103
// ============================================================================
104104
TasksQueueActionDurationSeconds = ReplacePrefix(TasksQueueActionDurationSeconds, prefix)
105105
TasksQueueLength = ReplacePrefix(TasksQueueLength, prefix)
106-
TasksQueueCompactionInQueueTasks = ReplacePrefix(TasksQueueCompactionInQueueTasks, prefix)
107-
TasksQueueCompactionReached = ReplacePrefix(TasksQueueCompactionReached, prefix)
106+
TasksQueueCompactionOperationsTotal = ReplacePrefix(TasksQueueCompactionOperationsTotal, prefix)
107+
TasksQueueCompactionTasksByHook = ReplacePrefix(TasksQueueCompactionTasksByHook, prefix)
108108

109109
// ============================================================================
110110
// Hook Execution Metrics
@@ -351,10 +351,10 @@ func RegisterTaskQueueMetrics(metricStorage metricsstorage.Storage) error {
351351
return fmt.Errorf("failed to register %s: %w", TasksQueueCompactionOperationsTotal, err)
352352
}
353353

354-
// Register tasks by hook gauge (only when count > 20)
354+
// Register compaction tasks by hook gauge
355355
_, err = metricStorage.RegisterGauge(
356356
TasksQueueCompactionTasksByHook, compactionLabels,
357-
options.WithHelp("Gauge showing count of tasks per hook in queue (only when count > 20)"),
357+
options.WithHelp("Gauge showing the number of tasks in queue for each hook when the count exceeds 20. Updated in real-time as tasks are added/removed and during compaction operations."),
358358
)
359359
if err != nil {
360360
return fmt.Errorf("failed to register %s: %w", TasksQueueCompactionTasksByHook, err)

pkg/task/queue/task_counter.go

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,22 @@ import (
55

66
metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage"
77

8+
"github.com/flant/shell-operator/pkg/metrics"
89
"github.com/flant/shell-operator/pkg/task"
910
)
1011

11-
const taskCap = 100
12+
const (
13+
taskCap = 100
14+
compactionMetricsThreshold = 20
15+
)
1216

1317
type TaskCounter struct {
1418
mu sync.RWMutex
1519

1620
queueName string
1721
counter map[string]uint
1822
reachedCap map[string]struct{}
23+
hookCounter map[string]uint // tracks tasks by hook name
1924
metricStorage metricsstorage.Storage
2025
countableTypes map[task.TaskType]struct{}
2126
}
@@ -29,6 +34,7 @@ func NewTaskCounter(name string, countableTypes map[task.TaskType]struct{}, metr
2934
queueName: name,
3035
counter: make(map[string]uint, 32),
3136
reachedCap: make(map[string]struct{}, 32),
37+
hookCounter: make(map[string]uint, 32),
3238
metricStorage: metricStorage,
3339
countableTypes: countableTypes,
3440
}
@@ -52,7 +58,6 @@ func (tc *TaskCounter) Add(task task.Task) {
5258
}
5359

5460
counter++
55-
5661
tc.counter[id] = counter
5762

5863
if counter == taskCap {
@@ -123,3 +128,36 @@ func (tc *TaskCounter) ResetReachedCap() {
123128

124129
tc.reachedCap = make(map[string]struct{}, 32)
125130
}
131+
132+
// UpdateHookMetricsFromSnapshot updates metrics for all hooks based on a snapshot of hook counts.
133+
// Only hooks with task count above the threshold are published to avoid metric cardinality explosion.
134+
func (tc *TaskCounter) UpdateHookMetricsFromSnapshot(hookCounts map[string]uint) {
135+
if tc.metricStorage == nil {
136+
return
137+
}
138+
139+
tc.mu.Lock()
140+
defer tc.mu.Unlock()
141+
142+
// Clear tracking for hooks that are no longer above threshold
143+
for hookName := range tc.hookCounter {
144+
if count, exists := hookCounts[hookName]; !exists || count <= compactionMetricsThreshold {
145+
// Hook dropped below threshold or disappeared - stop tracking it
146+
delete(tc.hookCounter, hookName)
147+
}
148+
}
149+
150+
// Update metrics only for hooks above threshold
151+
for hookName, count := range hookCounts {
152+
if count > compactionMetricsThreshold {
153+
// Track and publish metric
154+
tc.hookCounter[hookName] = count
155+
156+
labels := map[string]string{
157+
"queue_name": tc.queueName,
158+
"hook": hookName,
159+
}
160+
tc.metricStorage.GaugeSet(metrics.TasksQueueCompactionTasksByHook, float64(count), labels)
161+
}
162+
}
163+
}
Lines changed: 97 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,98 +1,99 @@
11
package queue
22

3-
// import (
4-
// "sync"
5-
// "testing"
6-
7-
// "github.com/stretchr/testify/require"
8-
9-
// "github.com/flant/shell-operator/pkg/hook/task_metadata"
10-
// "github.com/flant/shell-operator/pkg/metric"
11-
// "github.com/flant/shell-operator/pkg/metrics"
12-
// "github.com/flant/shell-operator/pkg/task"
13-
// )
14-
15-
// func TestTaskCounterRemoveUsesQueueName(t *testing.T) {
16-
// metricStorage := metric.NewStorageMock(t)
17-
18-
// type gaugeCall struct {
19-
// metric string
20-
// value float64
21-
// labels map[string]string
22-
// }
23-
24-
// var (
25-
// mu sync.Mutex
26-
// calls []gaugeCall
27-
// )
28-
29-
// metricStorage.GaugeSetMock.Set(func(metric string, value float64, labels map[string]string) {
30-
// cloned := make(map[string]string, len(labels))
31-
// for k, v := range labels {
32-
// cloned[k] = v
33-
// }
34-
35-
// mu.Lock()
36-
// calls = append(calls, gaugeCall{
37-
// metric: metric,
38-
// value: value,
39-
// labels: cloned,
40-
// })
41-
// mu.Unlock()
42-
// })
43-
44-
// tc := NewTaskCounter("main", nil, metricStorage)
45-
46-
// testTask := task.NewTask(task_metadata.HookRun).
47-
// WithCompactionID("test-hook")
48-
49-
// tc.Add(testTask)
50-
// tc.Remove(testTask)
51-
52-
// mu.Lock()
53-
// require.NotEmpty(t, calls)
54-
// lastCall := calls[len(calls)-1]
55-
// mu.Unlock()
56-
57-
// require.Equal(t, metrics.TasksQueueCompactionInQueueTasks, lastCall.metric)
58-
// require.Equal(t, float64(0), lastCall.value)
59-
// require.Equal(t, "main", lastCall.labels["queue_name"])
60-
// require.Equal(t, "test-hook", lastCall.labels["task_id"])
61-
// }
62-
63-
// func TestTaskCounterRemoveClearsReachedCap(t *testing.T) {
64-
// metricStorage := metric.NewStorageMock(t)
65-
66-
// var (
67-
// mu sync.Mutex
68-
// reachedValues []float64
69-
// )
70-
71-
// metricStorage.GaugeSetMock.Set(func(metric string, value float64, labels map[string]string) {
72-
// if metric == metrics.TasksQueueCompactionReached {
73-
// mu.Lock()
74-
// reachedValues = append(reachedValues, value)
75-
// mu.Unlock()
76-
// }
77-
// })
78-
79-
// tc := NewTaskCounter("main", nil, metricStorage)
80-
81-
// testTask := task.NewTask(task_metadata.HookRun).
82-
// WithCompactionID("test-hook")
83-
84-
// for i := 0; i < taskCap; i++ {
85-
// tc.Add(testTask)
86-
// }
87-
88-
// require.True(t, tc.IsAnyCapReached())
89-
90-
// tc.Remove(testTask)
91-
92-
// require.False(t, tc.IsAnyCapReached())
93-
94-
// mu.Lock()
95-
// require.NotEmpty(t, reachedValues)
96-
// require.Equal(t, float64(0), reachedValues[len(reachedValues)-1])
97-
// mu.Unlock()
98-
// }
3+
import (
4+
"sync"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/flant/shell-operator/pkg/metric"
10+
"github.com/flant/shell-operator/pkg/metrics"
11+
)
12+
13+
func TestTaskCounterUpdateHookMetricsFromSnapshot(t *testing.T) {
14+
metricStorage := metric.NewStorageMock(t)
15+
16+
type gaugeCall struct {
17+
metric string
18+
value float64
19+
labels map[string]string
20+
}
21+
22+
var (
23+
mu sync.Mutex
24+
calls []gaugeCall
25+
)
26+
27+
metricStorage.GaugeSetMock.Set(func(metric string, value float64, labels map[string]string) {
28+
cloned := make(map[string]string, len(labels))
29+
for k, v := range labels {
30+
cloned[k] = v
31+
}
32+
33+
mu.Lock()
34+
calls = append(calls, gaugeCall{
35+
metric: metric,
36+
value: value,
37+
labels: cloned,
38+
})
39+
mu.Unlock()
40+
})
41+
42+
tc := NewTaskCounter("main", nil, metricStorage)
43+
44+
// Simulate initial state with hooks above threshold by setting up a snapshot
45+
initialSnapshot := map[string]uint{
46+
"hook1": 26,
47+
"hook2": 31,
48+
"hook3": 51,
49+
}
50+
tc.UpdateHookMetricsFromSnapshot(initialSnapshot)
51+
52+
mu.Lock()
53+
calls = nil // Clear previous calls
54+
mu.Unlock()
55+
56+
// Update with new snapshot where:
57+
// - hook1 still has high count (25 tasks)
58+
// - hook2 dropped below threshold (15 tasks) - should not be published
59+
// - hook3 is completely gone (0 tasks in new snapshot) - should not be published
60+
// - hook4 is new (30 tasks)
61+
newSnapshot := map[string]uint{
62+
"hook1": 25,
63+
"hook2": 15,
64+
"hook4": 30,
65+
}
66+
67+
tc.UpdateHookMetricsFromSnapshot(newSnapshot)
68+
69+
mu.Lock()
70+
defer mu.Unlock()
71+
72+
// Verify that metrics were set correctly
73+
require.NotEmpty(t, calls)
74+
75+
// Build a map of last call for each hook
76+
lastCallByHook := make(map[string]gaugeCall)
77+
for _, call := range calls {
78+
if call.metric == metrics.TasksQueueCompactionTasksByHook {
79+
hook := call.labels["hook"]
80+
lastCallByHook[hook] = call
81+
}
82+
}
83+
84+
// hook1: should have value 25
85+
require.Contains(t, lastCallByHook, "hook1")
86+
require.Equal(t, float64(25), lastCallByHook["hook1"].value)
87+
require.Equal(t, "main", lastCallByHook["hook1"].labels["queue_name"])
88+
89+
// hook2: should NOT be published (below threshold)
90+
require.NotContains(t, lastCallByHook, "hook2")
91+
92+
// hook3: should NOT be published (removed from snapshot and was above threshold before)
93+
require.NotContains(t, lastCallByHook, "hook3")
94+
95+
// hook4: should have value 30
96+
require.Contains(t, lastCallByHook, "hook4")
97+
require.Equal(t, float64(30), lastCallByHook["hook4"].value)
98+
require.Equal(t, "main", lastCallByHook["hook4"].labels["queue_name"])
99+
}

0 commit comments

Comments
 (0)