From 5021ca7a7e6525373b16b259046480edb5b6def1 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Thu, 18 Aug 2022 03:23:24 -0400 Subject: [PATCH] Resolve bug where TA doesn't allocate all targets (#1039) * Resolve bug where TA doesn't allocate all targets * Cleanup cleanup --- cmd/otel-allocator/allocation/allocator.go | 28 +++++---- .../allocation/allocator_test.go | 57 +++++++++++++++---- cmd/otel-allocator/allocation/http.go | 4 +- cmd/otel-allocator/main.go | 4 +- 4 files changed, 68 insertions(+), 25 deletions(-) diff --git a/cmd/otel-allocator/allocation/allocator.go b/cmd/otel-allocator/allocation/allocator.go index 93c41b0a48..c59e3fb503 100644 --- a/cmd/otel-allocator/allocation/allocator.go +++ b/cmd/otel-allocator/allocation/allocator.go @@ -40,6 +40,10 @@ type TargetItem struct { Collector *collector } +func (t TargetItem) hash() string { + return t.JobName + t.TargetURL + t.Label.Fingerprint().String() +} + // Create a struct that holds collector - and jobs for that collector // This struct will be parsed into endpoint with collector and jobs info @@ -59,11 +63,15 @@ type Allocator struct { collectors map[string]*collector // all current collectors - TargetItems map[string]*TargetItem + targetItems map[string]*TargetItem log logr.Logger } +func (allocator *Allocator) TargetItems() map[string]*TargetItem { + return allocator.targetItems +} + // findNextCollector finds the next collector with less number of targets. func (allocator *Allocator) findNextCollector() *collector { var col *collector @@ -91,7 +99,7 @@ func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) { allocator.targetsWaiting = make(map[string]TargetItem, len(targets)) // Set new data for _, i := range targets { - allocator.targetsWaiting[i.JobName+i.TargetURL] = i + allocator.targetsWaiting[i.hash()] = i } } @@ -133,16 +141,16 @@ func (allocator *Allocator) ReallocateCollectors() { timer := prometheus.NewTimer(timeToAssign.WithLabelValues("ReallocateCollectors")) defer timer.ObserveDuration() defer allocator.m.Unlock() - allocator.TargetItems = make(map[string]*TargetItem) + allocator.targetItems = make(map[string]*TargetItem) allocator.processWaitingTargets() } // removeOutdatedTargets removes targets that are no longer available. func (allocator *Allocator) removeOutdatedTargets() { - for k := range allocator.TargetItems { + for k := range allocator.targetItems { if _, ok := allocator.targetsWaiting[k]; !ok { - allocator.collectors[allocator.TargetItems[k].Collector.Name].NumTargets-- - delete(allocator.TargetItems, k) + allocator.collectors[allocator.targetItems[k].Collector.Name].NumTargets-- + delete(allocator.targetItems, k) } } } @@ -150,9 +158,9 @@ func (allocator *Allocator) removeOutdatedTargets() { // processWaitingTargets processes the newly set targets. func (allocator *Allocator) processWaitingTargets() { for k, v := range allocator.targetsWaiting { - if _, ok := allocator.TargetItems[k]; !ok { + if _, ok := allocator.targetItems[k]; !ok { col := allocator.findNextCollector() - allocator.TargetItems[k] = &v + allocator.targetItems[k] = &v targetItem := TargetItem{ JobName: v.JobName, Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(v.JobName))}, @@ -162,7 +170,7 @@ func (allocator *Allocator) processWaitingTargets() { } col.NumTargets++ targetsPerCollector.WithLabelValues(col.Name).Set(float64(col.NumTargets)) - allocator.TargetItems[v.JobName+v.TargetURL] = &targetItem + allocator.targetItems[v.hash()] = &targetItem } } } @@ -172,6 +180,6 @@ func NewAllocator(log logr.Logger) *Allocator { log: log, targetsWaiting: make(map[string]TargetItem), collectors: make(map[string]*collector), - TargetItems: make(map[string]*TargetItem), + targetItems: make(map[string]*TargetItem), } } diff --git a/cmd/otel-allocator/allocation/allocator_test.go b/cmd/otel-allocator/allocation/allocator_test.go index 893422a3b4..596ff3b628 100644 --- a/cmd/otel-allocator/allocation/allocator_test.go +++ b/cmd/otel-allocator/allocation/allocator_test.go @@ -45,11 +45,12 @@ func TestAddingAndRemovingTargets(t *testing.T) { cols := []string{"col-1", "col-2", "col-3"} s.SetCollectors(cols) + labels := model.LabelSet{} initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"} var targetList []TargetItem for _, i := range initTargets { - targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) + targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) } // test that targets and collectors are added properly @@ -58,13 +59,13 @@ func TestAddingAndRemovingTargets(t *testing.T) { // verify expectedTargetLen := len(initTargets) - assert.Len(t, s.TargetItems, expectedTargetLen) + assert.Len(t, s.TargetItems(), expectedTargetLen) // prepare second round of targets tar := []string{"prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004"} var newTargetList []TargetItem for _, i := range tar { - newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) + newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) } // test that less targets are found - removed @@ -73,11 +74,45 @@ func TestAddingAndRemovingTargets(t *testing.T) { // verify expectedNewTargetLen := len(tar) - assert.Len(t, s.TargetItems, expectedNewTargetLen) + assert.Len(t, s.TargetItems(), expectedNewTargetLen) // verify results map for _, i := range tar { - _, ok := s.TargetItems["sample-name"+i] + _, ok := s.TargetItems()["sample-name"+i+labels.Fingerprint().String()] + assert.True(t, ok) + } +} + +// Tests that two targets with the same target url and job name but different label set are both added +func TestAllocationCollision(t *testing.T) { + // prepare allocator with initial targets and collectors + s := NewAllocator(logger) + + cols := []string{"col-1", "col-2", "col-3"} + s.SetCollectors(cols) + firstLabels := model.LabelSet{ + "test": "test1", + } + secondLabels := model.LabelSet{ + "test": "test2", + } + + targetList := []TargetItem{ + TargetItem{JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: firstLabels}, + TargetItem{JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: secondLabels}, + } + + // test that targets and collectors are added properly + s.SetWaitingTargets(targetList) + s.AllocateTargets() + + // verify + expectedTargetLen := len(targetList) + assert.Len(t, s.TargetItems(), expectedTargetLen) + + // verify results map + for _, i := range targetList { + _, ok := s.TargetItems()[i.hash()] assert.True(t, ok) } } @@ -104,8 +139,8 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { // Divisor needed to get 15% divisor := 6.7 - count := len(s.TargetItems) / len(s.collectors) - percent := float64(len(s.TargetItems)) / divisor + count := len(s.TargetItems()) / len(s.collectors) + percent := float64(len(s.TargetItems())) / divisor // test for _, i := range s.collectors { @@ -123,8 +158,8 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { s.SetWaitingTargets(newTargetList) s.AllocateTargets() - count = len(s.TargetItems) / len(s.collectors) - percent = float64(len(s.TargetItems)) / divisor + count = len(s.TargetItems()) / len(s.collectors) + percent = float64(len(s.TargetItems())) / divisor // test for _, i := range s.collectors { @@ -141,8 +176,8 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { s.SetWaitingTargets(newTargetList) s.AllocateTargets() - count = len(s.TargetItems) / len(s.collectors) - percent = float64(len(s.TargetItems)) / divisor + count = len(s.TargetItems()) / len(s.collectors) + percent = float64(len(s.TargetItems())) / divisor // test for _, i := range s.collectors { diff --git a/cmd/otel-allocator/allocation/http.go b/cmd/otel-allocator/allocation/http.go index 49b6cb65bd..01f4e5c82e 100644 --- a/cmd/otel-allocator/allocation/http.go +++ b/cmd/otel-allocator/allocation/http.go @@ -23,7 +23,7 @@ type targetGroupJSON struct { func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator *Allocator) map[string]collectorJSON { displayData := make(map[string]collectorJSON) - for _, j := range allocator.TargetItems { + for _, j := range allocator.TargetItems() { if j.JobName == job { var targetList []TargetItem targetList = append(targetList, cMap[j.Collector.Name+j.JobName]...) @@ -52,7 +52,7 @@ func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[strin if col.Name == collector { for _, targetItemArr := range cMap { for _, targetItem := range targetItemArr { - if targetItem.Collector.Name == collector && targetItem.JobName == job { + if targetItem.Collector.Name == collector && targetItem.JobName == job { group[targetItem.Label.String()] = targetItem.TargetURL labelSet[targetItem.TargetURL] = targetItem.Label } diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index c1806c2e67..c467ff4bfc 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -185,7 +185,7 @@ func (s *server) Shutdown(ctx context.Context) error { func (s *server) JobHandler(w http.ResponseWriter, r *http.Request) { displayData := make(map[string]allocation.LinkJSON) - for _, v := range s.allocator.TargetItems { + for _, v := range s.allocator.TargetItems() { displayData[v.JobName] = allocation.LinkJSON{v.Link.Link} } jsonHandler(w, r, displayData) @@ -206,7 +206,7 @@ func (s *server) TargetsHandler(w http.ResponseWriter, r *http.Request) { q := r.URL.Query()["collector_id"] var compareMap = make(map[string][]allocation.TargetItem) // CollectorName+jobName -> TargetItem - for _, v := range s.allocator.TargetItems { + for _, v := range s.allocator.TargetItems() { compareMap[v.Collector.Name+v.JobName] = append(compareMap[v.Collector.Name+v.JobName], *v) } params := mux.Vars(r)