Skip to content

Commit

Permalink
address review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
moh-osman3 committed Oct 31, 2022
1 parent a96c6c1 commit dd1281c
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 62 deletions.
17 changes: 8 additions & 9 deletions cmd/otel-allocator/allocation/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/prometheus/client_golang/prometheus"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/prehook"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
)

Expand Down Expand Up @@ -53,10 +52,10 @@ type consistentHashingAllocator struct {

log logr.Logger

filterFunction Filter
filter Filter
}

func newConsistentHashingAllocator(log logr.Logger, opts ...AllocOption) Allocator {
func newConsistentHashingAllocator(log logr.Logger, opts ...AllocationOption) Allocator {
config := consistent.Config{
PartitionCount: 1061,
ReplicationFactor: 5,
Expand All @@ -77,9 +76,9 @@ func newConsistentHashingAllocator(log logr.Logger, opts ...AllocOption) Allocat
return chAllocator
}

// SetHook sets the filtering hook to use.
func (c *consistentHashingAllocator) SetFilter(filterFunction Filter) {
c.filterFunction = filterFunction
// SetFilter sets the filtering hook to use.
func (c *consistentHashingAllocator) SetFilter(filter Filter) {
c.filter = filter
}

// addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems
Expand Down Expand Up @@ -161,10 +160,10 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Targe
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", consistentHashingStrategyName))
defer timer.ObserveDuration()

if c.filterFunction != nil {
targets = c.filterFunction.Apply(targets)
if c.filter != nil {
targets = c.filter.Apply(targets)
}
prehook.RecordTargetsKeptPerJob(targets)
RecordTargetsKeptPerJob(targets)

c.m.Lock()
defer c.m.Unlock()
Expand Down
17 changes: 8 additions & 9 deletions cmd/otel-allocator/allocation/least_weighted.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"sync"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/prehook"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -52,12 +51,12 @@ type leastWeightedAllocator struct {

log logr.Logger

filterFunction Filter
filter Filter
}

// SetHook sets the filtering hook to use.
func (allocator *leastWeightedAllocator) SetFilter(filterFunction Filter) {
allocator.filterFunction = filterFunction
// SetFilter sets the filtering hook to use.
func (allocator *leastWeightedAllocator) SetFilter(filter Filter) {
allocator.filter = filter
}

// TargetItems returns a shallow copy of the targetItems map.
Expand Down Expand Up @@ -173,10 +172,10 @@ func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*target.T
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", leastWeightedStrategyName))
defer timer.ObserveDuration()

if allocator.filterFunction != nil {
targets = allocator.filterFunction.Apply(targets)
if allocator.filter != nil {
targets = allocator.filter.Apply(targets)
}
prehook.RecordTargetsKeptPerJob(targets)
RecordTargetsKeptPerJob(targets)

allocator.m.Lock()
defer allocator.m.Unlock()
Expand Down Expand Up @@ -216,7 +215,7 @@ func (allocator *leastWeightedAllocator) SetCollectors(collectors map[string]*Co
}
}

func newLeastWeightedAllocator(log logr.Logger, opts ...AllocOption) Allocator {
func newLeastWeightedAllocator(log logr.Logger, opts ...AllocationOption) Allocator {
lwAllocator := &leastWeightedAllocator{
log: log,
collectors: make(map[string]*Collector),
Expand Down
30 changes: 24 additions & 6 deletions cmd/otel-allocator/allocation/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
)

type AllocatorProvider func(log logr.Logger, opts ...AllocOption) Allocator
type AllocatorProvider func(log logr.Logger, opts ...AllocationOption) Allocator

var (
registry = map[string]AllocatorProvider{}
Expand All @@ -45,21 +45,39 @@ var (
Name: "opentelemetry_allocator_time_to_allocate",
Help: "The time it takes to allocate",
}, []string{"method", "strategy"})
targetsKeptPerJob = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "opentelemetry_allocator_targets_kept",
Help: "Number of targets kept after filtering.",
}, []string{"job_name"})
)

type AllocOption func(Allocator)
type AllocationOption func(Allocator)

type Filter interface {
Apply(map[string]*target.TargetItem) map[string]*target.TargetItem
}

func WithFilter(filterFunction Filter) AllocOption {
func WithFilter(filter Filter) AllocationOption {
return func(allocator Allocator) {
allocator.SetFilter(filterFunction)
allocator.SetFilter(filter)
}
}

func New(name string, log logr.Logger, opts ...AllocOption) (Allocator, error) {
func RecordTargetsKeptPerJob(targets map[string]*target.TargetItem) map[string]float64 {
targetsPerJob := make(map[string]float64)

for _, tItem := range targets {
targetsPerJob[tItem.JobName] += 1
}

for jName, numTargets := range targetsPerJob {
targetsKeptPerJob.WithLabelValues(jName).Set(numTargets)
}

return targetsPerJob
}

func New(name string, log logr.Logger, opts ...AllocationOption) (Allocator, error) {
if p, ok := registry[name]; ok {
return p(log, opts...), nil
}
Expand All @@ -79,7 +97,7 @@ type Allocator interface {
SetTargets(targets map[string]*target.TargetItem)
TargetItems() map[string]*target.TargetItem
Collectors() map[string]*Collector
SetFilter(filterFunction Filter)
SetFilter(filter Filter)
}

var _ consistent.Member = Collector{}
Expand Down
7 changes: 2 additions & 5 deletions cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,8 @@ func main() {
log := ctrl.Log.WithName("allocator")

// allocatorPrehook will be nil if filterStrategy is not set or
// unrecognized. This means no filtering will be used.
allocatorPrehook, err := prehook.New(cfg.GetTargetsFilterStrategy(), log)
if err != nil {
log.Info("Unrecognized filter strategy; filtering disabled")
}
// unrecognized. No filtering will be used in this case.
allocatorPrehook := prehook.New(cfg.GetTargetsFilterStrategy(), log)

allocator, err := allocation.New(cfg.GetAllocationStrategy(), log, allocation.WithFilter(allocatorPrehook))
if err != nil {
Expand Down
32 changes: 5 additions & 27 deletions cmd/otel-allocator/prehook/prehook.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@ package prehook

import (
"errors"
"fmt"

"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/relabel"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
Expand All @@ -30,13 +27,6 @@ const (
relabelConfigTargetFilterName = "relabel-config"
)

var (
TargetsKept = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "opentelemetry_allocator_targets_kept",
Help: "Number of targets kept after filtering.",
}, []string{"job_name"})
)

type Hook interface {
Apply(map[string]*target.TargetItem) map[string]*target.TargetItem
SetConfig(map[string][]*relabel.Config)
Expand All @@ -49,25 +39,13 @@ var (
registry = map[string]HookProvider{}
)

func RecordTargetsKeptPerJob(targets map[string]*target.TargetItem) map[string]float64 {
targetsPerJob := make(map[string]float64)

for _, tItem := range targets {
targetsPerJob[tItem.JobName] += 1
}

for jName, numTargets := range targetsPerJob {
TargetsKept.WithLabelValues(jName).Set(numTargets)
}

return targetsPerJob
}

func New(name string, log logr.Logger) (Hook, error) {
func New(name string, log logr.Logger) Hook {
if p, ok := registry[name]; ok {
return p(log.WithName("Prehook").WithName(name)), nil
return p(log.WithName("Prehook").WithName(name))
}
return nil, fmt.Errorf("unregistered filtering strategy: %s", name)

log.Info("Unrecognized filter strategy; filtering disabled")
return nil
}

func Register(name string, provider HookProvider) error {
Expand Down
12 changes: 6 additions & 6 deletions cmd/otel-allocator/prehook/relabel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ func makeNNewTargets(n int, numCollectors int, startingIndex int) (map[string]*t
}

func TestApply(t *testing.T) {
allocatorPrehook, err := New("relabel-config", logger)
assert.Nil(t, err)
allocatorPrehook := New("relabel-config", logger)
assert.NotNil(t, allocatorPrehook)

targets, numRemaining, expectedTargetMap, relabelCfg := makeNNewTargets(numTargets, 3, 0)
allocatorPrehook.SetConfig(relabelCfg)
Expand All @@ -180,8 +180,8 @@ func TestApply(t *testing.T) {

func TestApplyEmptyRelabelCfg(t *testing.T) {

allocatorPrehook, err := New("relabel-config", logger)
assert.Nil(t, err)
allocatorPrehook := New("relabel-config", logger)
assert.NotNil(t, allocatorPrehook)

targets, _, _, _ := makeNNewTargets(numTargets, 3, 0)

Expand All @@ -194,8 +194,8 @@ func TestApplyEmptyRelabelCfg(t *testing.T) {
}

func TestSetConfig(t *testing.T) {
allocatorPrehook, err := New("relabel-config", logger)
assert.Nil(t, err)
allocatorPrehook := New("relabel-config", logger)
assert.NotNil(t, allocatorPrehook)

_, _, _, relabelCfg := makeNNewTargets(numTargets, 3, 0)
allocatorPrehook.SetConfig(relabelCfg)
Expand Down

0 comments on commit dd1281c

Please sign in to comment.