Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start moving components of adaptive sampling to OSS #973

Merged
merged 7 commits into from
Nov 13, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Move adaptive sampling to OSS
Signed-off-by: Won Jun Jang <wjang@uber.com>
  • Loading branch information
black-adder committed Nov 13, 2018
commit 09f226d16182c8aa98de645f6c60c9542a6a6c69
24 changes: 24 additions & 0 deletions plugin/sampling/strategystore/adaptive/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package adaptive

// samplingCacheEntry keeps track of the probability and whether a service-operation is using adaptive sampling
type samplingCacheEntry struct {
probability float64
usingAdapative bool
}

type samplingCache map[string]map[string]*samplingCacheEntry

func (s samplingCache) Set(service, operation string, entry *samplingCacheEntry) {
if _, ok := s[service]; !ok {
s[service] = make(map[string]*samplingCacheEntry)
}
s[service][operation] = entry
}

func (s samplingCache) Get(service, operation string) *samplingCacheEntry {
_, ok := s[service]
if !ok {
return nil
}
return s[service][operation]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package calculationstrategy

// ProbabilityCalculator calculates the new probability given the current and target QPS
type ProbabilityCalculator interface {
Calculate(targetQPS, curQPS, prevProbability float64) (newProbability float64)
}

// Func wraps a function of appropriate signature and makes a ProbabilityCalculator from it.
type Func func(targetQPS, curQPS, prevProbability float64) (newProbability float64)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you envision ever having other methods in the interface? Consider:

type Calculate func(targetQPS, curQPS, prevProbability float64) (newProbability float64)


// Calculate implements Calculator interface.
func (f Func) Calculate(targetQPS, curQPS, prevProbability float64) float64 {
return f(targetQPS, curQPS, prevProbability)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package calculationstrategy

const (
defaultPercentageIncreaseCap = 0.5
)

// PercentageIncreaseCappedCalculator is a probability calculator that caps the probability
// increase to a certain percentage of the previous probability.
//
// Given prevProb = 0.1, newProb = 0.5, and cap = 0.5:
// (0.5 - 0.1)/0.1 = 400% increase. Given that our cap is 50%, the probability can only
// increase to 0.15.
//
// Given prevProb = 0.4, newProb = 0.5, and cap = 0.5:
// (0.5 - 0.4)/0.4 = 25% increase. Given that this is below our cap of 50%, the probability
// can increase to 0.5.
type PercentageIncreaseCappedCalculator struct {
percentageIncreaseCap float64
}

// NewPercentageIncreaseCappedCalculator returns a new percentage increase capped calculator.
func NewPercentageIncreaseCappedCalculator(percentageIncreaseCap float64) PercentageIncreaseCappedCalculator {
if percentageIncreaseCap == 0 {
percentageIncreaseCap = defaultPercentageIncreaseCap
}
return PercentageIncreaseCappedCalculator{
percentageIncreaseCap: percentageIncreaseCap,
}
}

// Calculate calculates the new probability.
func (c PercentageIncreaseCappedCalculator) Calculate(targetQPS, curQPS, prevProbability float64) float64 {
factor := targetQPS / curQPS
newProbability := prevProbability * factor
// If curQPS is lower than the targetQPS, we need to increase the probability slowly to
// defend against oversampling.
// Else if curQPS is higher than the targetQPS, jump directly to the newProbability to
// defend against oversampling.
if factor > 1.0 {
percentIncrease := (newProbability - prevProbability) / prevProbability
if percentIncrease > c.percentageIncreaseCap {
newProbability = prevProbability + (prevProbability * c.percentageIncreaseCap)
}
}
return newProbability
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package calculationstrategy

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestCalculate(t *testing.T) {
calculator := NewPercentageIncreaseCappedCalculator(0)
tests := []struct {
targetQPS float64
curQPS float64
oldProbability float64
expectedProbability float64
testName string
}{
{1.0, 2.0, 0.1, 0.05, "test1"},
{1.0, 0.5, 0.1, 0.15, "test2"},
{1.0, 0.8, 0.1, 0.125, "test3"},
}
for _, tt := range tests {
probability := calculator.Calculate(tt.targetQPS, tt.curQPS, tt.oldProbability)
assert.InDelta(t, probability, tt.expectedProbability, 0.0001, tt.testName)
}
}
207 changes: 207 additions & 0 deletions plugin/sampling/strategystore/adaptive/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package adaptive

import (
"time"

"github.com/uber/jaeger-lib/metrics"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)

const (
defaultAggregationInterval = time.Minute
defaultTargetQPS = 1
defaultEquivalenceThreshold = 0.3
defaultLookbackQPSCount = 1
defaultCalculationInterval = time.Minute
defaultLookbackInterval = time.Minute * 10
defaultDelay = time.Minute * 2
defaultSamplingProbability = 0.001
defaultMinSamplingProbability = 0.00001 // once in 100 thousand requests
defaultLowerBoundTracesPerSecond = 1.0 / (1 * float64(time.Minute/time.Second)) // once every 1 minute
defaultLeaderLeaseRefreshInterval = 5 * time.Second
defaultFollowerLeaseRefreshInterval = 60 * time.Second
)

// ThroughputAggregatorConfig is the configuration for the ThroughputAggregator.
type ThroughputAggregatorConfig struct {
// AggregationInterval determines how often throughput is aggregated and written to storage.
AggregationInterval time.Duration `yaml:"aggregation_interval"`
}

// ProcessorConfig is the configuration for the SamplingProcessor.
type ProcessorConfig struct {
// TargetQPS is the target sampled qps for all operations.
TargetQPS float64 `yaml:"target_qps"`

// QPSEquivalenceThreshold is the acceptable amount of deviation for the operation QPS from the `targetQPS`,
// ie. [targetQPS-equivalenceThreshold, targetQPS+equivalenceThreshold] is the acceptable targetQPS range.
// Increase this to reduce the amount of fluctuation in the probability calculation.
QPSEquivalenceThreshold float64 `yaml:"qps_equivalence_threshold"`

// LookbackQPSCount determines how many previous operation QPS are used in calculating the weighted QPS,
// ie. if LookbackQPSCount is 1, the only the most recent QPS will be used in calculating the weighted QPS.
LookbackQPSCount int `yaml:"lookback_qps_count"`

// CalculationInterval determines the interval each bucket represents, ie. if an interval is
// 1 minute, the bucket will contain 1 minute of throughput data for all services.
CalculationInterval time.Duration `yaml:"calculation_interval"`

// LookbackInterval is the total amount of throughput data used to calculate probabilities.
LookbackInterval time.Duration `yaml:"lookback_interval"`

// Delay is the amount of time to delay probability generation by, ie. if the calculationInterval
// is 1 minute, the number of buckets is 10, and the delay is 2 minutes, then at one time
// we'll have [now()-12,now()-2] range of throughput data in memory to base the calculations
// off of.
Delay time.Duration `yaml:"delay"`

// DefaultSamplingProbability is the initial sampling probability for all new operations.
DefaultSamplingProbability float64 `yaml:"default_sampling_probability"`

// MinSamplingProbability is the minimum sampling probability for all operations. ie. the calculated sampling
// probability will be bound [MinSamplingProbability, 1.0]
MinSamplingProbability float64 `yaml:"min_sampling_probability"`

// LowerBoundTracesPerSecond determines the lower bound number of traces that are sampled per second.
// For example, if the value is 0.01666666666 (one every minute), then the sampling processor will do
// its best to sample at least one trace a minute for an operation. This is useful for a low QPS operation
// that is never sampled by the probabilistic sampler and depends on some time based element.
LowerBoundTracesPerSecond float64 `yaml:"lower_bound_traces_per_second"`

// LeaderLeaseRefreshInterval is the duration to sleep if this processor is elected leader before
// attempting to renew the lease on the leader lock. NB. This should be less than FollowerLeaseRefreshInterval
// to reduce lock thrashing.
LeaderLeaseRefreshInterval time.Duration `yaml:"leader_lease_refresh_interval"`

// FollowerLeaseRefreshInterval is the duration to sleep if this processor is a follower
// (ie. failed to gain the leader lock).
FollowerLeaseRefreshInterval time.Duration `yaml:"follower_lease_refresh_interval"`

// Mutable is a configuration holder that holds configurations that could dynamically change during
// the lifetime of the processor.
Mutable MutableProcessorConfigurator
}

// MutableProcessorConfigurator is a mutable config holder for certain configs that can change during the lifetime
// of the processor.
type MutableProcessorConfigurator interface {
GetTargetQPS() float64
GetQPSEquivalenceThreshold() float64
}

// ImmutableProcessorConfig is a MutableProcessorConfigurator that doesn't dynamically update (it can be updated, but
// doesn't guarantee thread safety).
type ImmutableProcessorConfig struct {
TargetQPS float64 `json:"target_qps"`
QPSEquivalenceThreshold float64 `json:"qps_equivalence_threshold"`
}

// GetTargetQPS implements MutableProcessorConfigurator#GetTargetQPS
func (d ImmutableProcessorConfig) GetTargetQPS() float64 {
return d.TargetQPS
}

// GetQPSEquivalenceThreshold implements MutableProcessorConfigurator#GetQPSEquivalenceThreshold
func (d ImmutableProcessorConfig) GetQPSEquivalenceThreshold() float64 {
return d.QPSEquivalenceThreshold
}

// MutableProcessorConfig is a MutableProcessorConfigurator that is thread safe and dynamically updates.
type MutableProcessorConfig struct {
targetQPS *atomic.Float64
qpsEquivalenceThreshold *atomic.Float64
}

// NewMutableProcessorConfig returns a MutableProcessorConfigurator that dynamically updates.
func NewMutableProcessorConfig(config ImmutableProcessorConfig) *MutableProcessorConfig {
return &MutableProcessorConfig{
targetQPS: atomic.NewFloat64(config.GetTargetQPS()),
qpsEquivalenceThreshold: atomic.NewFloat64(config.GetQPSEquivalenceThreshold()),
}
}

// Update updates the configs.
func (d *MutableProcessorConfig) Update(config ImmutableProcessorConfig) {
d.targetQPS.Store(config.GetTargetQPS())
d.qpsEquivalenceThreshold.Store(config.GetQPSEquivalenceThreshold())
}

// GetTargetQPS implements MutableProcessorConfigurator#GetTargetQPS.
func (d *MutableProcessorConfig) GetTargetQPS() float64 {
return d.targetQPS.Load()
}

// GetQPSEquivalenceThreshold implements MutableProcessorConfigurator#GetQPSEquivalenceThreshold.
func (d *MutableProcessorConfig) GetQPSEquivalenceThreshold() float64 {
return d.qpsEquivalenceThreshold.Load()
}

// Builder struct to hold configurations.
type Builder struct {
ThroughputAggregator ThroughputAggregatorConfig `yaml:"throughput_aggregator"`
SamplingProcessor ProcessorConfig `yaml:"sampling_processor"`

metrics metrics.Factory
logger *zap.Logger
}

// NewBuilder creates a default builder.
func NewBuilder() *Builder {
return &Builder{
ThroughputAggregator: ThroughputAggregatorConfig{
AggregationInterval: defaultAggregationInterval,
},
SamplingProcessor: ProcessorConfig{
LookbackQPSCount: defaultLookbackQPSCount,
CalculationInterval: defaultCalculationInterval,
LookbackInterval: defaultLookbackInterval,
Delay: defaultDelay,
DefaultSamplingProbability: defaultSamplingProbability,
MinSamplingProbability: defaultMinSamplingProbability,
LowerBoundTracesPerSecond: defaultLowerBoundTracesPerSecond,
LeaderLeaseRefreshInterval: defaultLeaderLeaseRefreshInterval,
FollowerLeaseRefreshInterval: defaultFollowerLeaseRefreshInterval,
Mutable: ImmutableProcessorConfig{
TargetQPS: defaultTargetQPS,
QPSEquivalenceThreshold: defaultEquivalenceThreshold,
},
},
}
}

// WithMetricsFactory sets metrics factory.
func (b *Builder) WithMetricsFactory(m metrics.Factory) *Builder {
b.metrics = m
return b
}

// WithLogger sets logger.
func (b *Builder) WithLogger(l *zap.Logger) *Builder {
b.logger = l
return b
}

func (b *Builder) applyDefaults() {
if b.metrics == nil {
b.metrics = metrics.NullFactory
}
if b.logger == nil {
b.logger = zap.NewNop()
}
}

// NewThroughputAggregator creates and returns a ThroughputAggregator.
func (b *Builder) NewThroughputAggregator(storage samplingstore.Store) (Aggregator, error) {
b.applyDefaults()
return NewAggregator(b.metrics, b.ThroughputAggregator.AggregationInterval, storage), nil
}

// NewProcessor creates and returns a SamplingProcessor.
func (b *Builder) NewProcessor(hostname string, storage samplingstore.Store, lock distributedlock.Lock) (Processor, error) {
b.applyDefaults()
return NewProcessor(b.SamplingProcessor, hostname, storage, lock, b.metrics, b.logger)
}
Loading