Skip to content

Commit

Permalink
Resolve bug where TA doesn't allocate all targets
Browse files Browse the repository at this point in the history
  • Loading branch information
jaronoff97 committed Aug 16, 2022
1 parent 388a7c9 commit 594462d
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 25 deletions.
29 changes: 19 additions & 10 deletions cmd/otel-allocator/allocation/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ type TargetItem struct {
Collector *collector
}

func (t TargetItem) hash() string {
return t.JobName + t.TargetURL + t.Label.Fingerprint().String()
}

// Create a struct that holds collector - and jobs for that collector
// This struct will be parsed into endpoint with collector and jobs info

Expand All @@ -59,11 +63,15 @@ type Allocator struct {

collectors map[string]*collector // all current collectors

TargetItems map[string]*TargetItem
targetItems map[string]*TargetItem

log logr.Logger
}

func (allocator *Allocator) TargetItems() map[string]*TargetItem {
return allocator.targetItems
}

// findNextCollector finds the next collector with less number of targets.
func (allocator *Allocator) findNextCollector() *collector {
var col *collector
Expand Down Expand Up @@ -91,8 +99,9 @@ func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) {
allocator.targetsWaiting = make(map[string]TargetItem, len(targets))
// Set new data
for _, i := range targets {
allocator.targetsWaiting[i.JobName+i.TargetURL] = i
allocator.targetsWaiting[i.hash()] = i
}
//allocator.log.Info(fmt.Sprintf("len(targetsWaiting): %d\nlen(targets): %d", len(allocator.targetsWaiting), len(targets)))
}

// SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
Expand Down Expand Up @@ -133,26 +142,26 @@ func (allocator *Allocator) ReallocateCollectors() {
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("ReallocateCollectors"))
defer timer.ObserveDuration()
defer allocator.m.Unlock()
allocator.TargetItems = make(map[string]*TargetItem)
allocator.targetItems = make(map[string]*TargetItem)
allocator.processWaitingTargets()
}

// removeOutdatedTargets removes targets that are no longer available.
func (allocator *Allocator) removeOutdatedTargets() {
for k := range allocator.TargetItems {
for k := range allocator.targetItems {
if _, ok := allocator.targetsWaiting[k]; !ok {
allocator.collectors[allocator.TargetItems[k].Collector.Name].NumTargets--
delete(allocator.TargetItems, k)
allocator.collectors[allocator.targetItems[k].Collector.Name].NumTargets--
delete(allocator.targetItems, k)
}
}
}

// processWaitingTargets processes the newly set targets.
func (allocator *Allocator) processWaitingTargets() {
for k, v := range allocator.targetsWaiting {
if _, ok := allocator.TargetItems[k]; !ok {
if _, ok := allocator.targetItems[k]; !ok {
col := allocator.findNextCollector()
allocator.TargetItems[k] = &v
allocator.targetItems[k] = &v
targetItem := TargetItem{
JobName: v.JobName,
Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(v.JobName))},
Expand All @@ -162,7 +171,7 @@ func (allocator *Allocator) processWaitingTargets() {
}
col.NumTargets++
targetsPerCollector.WithLabelValues(col.Name).Set(float64(col.NumTargets))
allocator.TargetItems[v.JobName+v.TargetURL] = &targetItem
allocator.targetItems[v.hash()] = &targetItem
}
}
}
Expand All @@ -172,6 +181,6 @@ func NewAllocator(log logr.Logger) *Allocator {
log: log,
targetsWaiting: make(map[string]TargetItem),
collectors: make(map[string]*collector),
TargetItems: make(map[string]*TargetItem),
targetItems: make(map[string]*TargetItem),
}
}
57 changes: 46 additions & 11 deletions cmd/otel-allocator/allocation/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ func TestAddingAndRemovingTargets(t *testing.T) {

cols := []string{"col-1", "col-2", "col-3"}
s.SetCollectors(cols)
labels := model.LabelSet{}

initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"}
var targetList []TargetItem
for _, i := range initTargets {
targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}})
targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels})
}

// test that targets and collectors are added properly
Expand All @@ -58,13 +59,13 @@ func TestAddingAndRemovingTargets(t *testing.T) {

// verify
expectedTargetLen := len(initTargets)
assert.Len(t, s.TargetItems, expectedTargetLen)
assert.Len(t, s.TargetItems(), expectedTargetLen)

// prepare second round of targets
tar := []string{"prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004"}
var newTargetList []TargetItem
for _, i := range tar {
newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}})
newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels})
}

// test that less targets are found - removed
Expand All @@ -73,11 +74,45 @@ func TestAddingAndRemovingTargets(t *testing.T) {

// verify
expectedNewTargetLen := len(tar)
assert.Len(t, s.TargetItems, expectedNewTargetLen)
assert.Len(t, s.TargetItems(), expectedNewTargetLen)

// verify results map
for _, i := range tar {
_, ok := s.TargetItems["sample-name"+i]
_, ok := s.TargetItems()["sample-name"+i+labels.Fingerprint().String()]
assert.True(t, ok)
}
}

// Tests that two targets with the same target url and job name but different label set are both added
func TestAllocationCollision(t *testing.T) {
// prepare allocator with initial targets and collectors
s := NewAllocator(logger)

cols := []string{"col-1", "col-2", "col-3"}
s.SetCollectors(cols)
firstLabels := model.LabelSet{
"test": "test1",
}
secondLabels := model.LabelSet{
"test": "test2",
}

targetList := []TargetItem{
TargetItem{JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: firstLabels},
TargetItem{JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: secondLabels},
}

// test that targets and collectors are added properly
s.SetWaitingTargets(targetList)
s.AllocateTargets()

// verify
expectedTargetLen := len(targetList)
assert.Len(t, s.TargetItems(), expectedTargetLen)

// verify results map
for _, i := range targetList {
_, ok := s.TargetItems()[i.hash()]
assert.True(t, ok)
}
}
Expand All @@ -104,8 +139,8 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
// Divisor needed to get 15%
divisor := 6.7

count := len(s.TargetItems) / len(s.collectors)
percent := float64(len(s.TargetItems)) / divisor
count := len(s.TargetItems()) / len(s.collectors)
percent := float64(len(s.TargetItems())) / divisor

// test
for _, i := range s.collectors {
Expand All @@ -123,8 +158,8 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
s.SetWaitingTargets(newTargetList)
s.AllocateTargets()

count = len(s.TargetItems) / len(s.collectors)
percent = float64(len(s.TargetItems)) / divisor
count = len(s.TargetItems()) / len(s.collectors)
percent = float64(len(s.TargetItems())) / divisor

// test
for _, i := range s.collectors {
Expand All @@ -141,8 +176,8 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
s.SetWaitingTargets(newTargetList)
s.AllocateTargets()

count = len(s.TargetItems) / len(s.collectors)
percent = float64(len(s.TargetItems)) / divisor
count = len(s.TargetItems()) / len(s.collectors)
percent = float64(len(s.TargetItems())) / divisor

// test
for _, i := range s.collectors {
Expand Down
4 changes: 2 additions & 2 deletions cmd/otel-allocator/allocation/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type targetGroupJSON struct {

func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator *Allocator) map[string]collectorJSON {
displayData := make(map[string]collectorJSON)
for _, j := range allocator.TargetItems {
for _, j := range allocator.TargetItems() {
if j.JobName == job {
var targetList []TargetItem
targetList = append(targetList, cMap[j.Collector.Name+j.JobName]...)
Expand Down Expand Up @@ -52,7 +52,7 @@ func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[strin
if col.Name == collector {
for _, targetItemArr := range cMap {
for _, targetItem := range targetItemArr {
if targetItem.Collector.Name == collector && targetItem.JobName == job {
if targetItem.Collector.Name == collector && targetItem.JobName == job {
group[targetItem.Label.String()] = targetItem.TargetURL
labelSet[targetItem.TargetURL] = targetItem.Label
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
ctrl "sigs.k8s.io/controller-runtime"
//_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)

var (
Expand Down Expand Up @@ -185,7 +186,7 @@ func (s *server) Shutdown(ctx context.Context) error {

func (s *server) JobHandler(w http.ResponseWriter, r *http.Request) {
displayData := make(map[string]allocation.LinkJSON)
for _, v := range s.allocator.TargetItems {
for _, v := range s.allocator.TargetItems() {
displayData[v.JobName] = allocation.LinkJSON{v.Link.Link}
}
jsonHandler(w, r, displayData)
Expand All @@ -206,7 +207,7 @@ func (s *server) TargetsHandler(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()["collector_id"]

var compareMap = make(map[string][]allocation.TargetItem) // CollectorName+jobName -> TargetItem
for _, v := range s.allocator.TargetItems {
for _, v := range s.allocator.TargetItems() {
compareMap[v.Collector.Name+v.JobName] = append(compareMap[v.Collector.Name+v.JobName], *v)
}
params := mux.Vars(r)
Expand Down

0 comments on commit 594462d

Please sign in to comment.