Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Pushkar Mishra <pushkarmishra029@gmail.com>
  • Loading branch information
Pushkarm029 committed May 15, 2024
1 parent 976ee74 commit b417348
Show file tree
Hide file tree
Showing 3 changed files with 995 additions and 98 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2024 The Jaeger Authors.
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
package adaptive

import (
"context"
"errors"
"math"
"math/rand"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/sampling/calculationstrategy"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)

Expand Down Expand Up @@ -138,6 +140,16 @@ func newPostAggregator(
}, nil
}

// GetSamplingStrategy implements Thrift endpoint for retrieving sampling strategy for a service.
func (ss *StrategyStore) GetSamplingStrategy(_ context.Context, service string) (*api_v2.SamplingStrategyResponse, error) {
ss.RLock()
defer ss.RUnlock()
if strategy, ok := ss.strategyResponses[service]; ok {
return strategy, nil
}
return ss.generateDefaultSamplingStrategyResponse(), nil
}

// Start initializes and starts the sampling processor which regularly calculates sampling probabilities.
func (p *PostAggregator) Start() error {
p.logger.Info("starting adaptive sampling processor")
Expand All @@ -156,6 +168,14 @@ func (p *PostAggregator) runBackground(f func()) {
}()
}

func (ss *StrategyStore) runBackground(f func()) {
ss.bgFinished.Add(1)
go func() {
f()
ss.bgFinished.Done()
}()
}

// Close stops the processor from calculating probabilities.
func (p *PostAggregator) Close() error {
p.logger.Info("stopping adaptive sampling processor")
Expand All @@ -164,10 +184,51 @@ func (p *PostAggregator) Close() error {
return err
}

func (ss *StrategyStore) loadProbabilities() {
// TODO GetLatestProbabilities API can be changed to return the latest measured qps for initialization
probabilities, err := ss.storage.GetLatestProbabilities()
if err != nil {
ss.logger.Warn("failed to initialize probabilities", zap.Error(err))
return
}
ss.Lock()
defer ss.Unlock()
ss.probabilities = probabilities
}

// runUpdateProbabilitiesLoop is a loop that reads probabilities from storage.
// The follower updates its local cache with the latest probabilities and serves them.
func (ss *StrategyStore) runUpdateProbabilitiesLoop() {
select {
case <-time.After(addJitter(ss.followerRefreshInterval)):
case <-ss.shutdown:
return
}

ticker := time.NewTicker(ss.followerRefreshInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Only load probabilities if this processor doesn't hold the leader lock
if !ss.isLeader() {
ss.loadProbabilities()
ss.generateStrategyResponses()
}
case <-ss.shutdown:
return
}
}
}

func (p *PostAggregator) isLeader() bool {
return p.electionParticipant.IsLeader()
}

func (ss *StrategyStore) isLeader() bool {
return ss.electionParticipant.IsLeader()
}

// addJitter adds a random amount of time. Without jitter, if the host holding the leader
// lock were to die, then all other collectors can potentially wait for a full cycle before
// trying to acquire the lock. With jitter, we can reduce the average amount of time before a
Expand Down Expand Up @@ -425,3 +486,40 @@ func (p *PostAggregator) isUsingAdaptiveSampling(
}
return false
}

// generateStrategyResponses generates and caches SamplingStrategyResponse from the calculated sampling probabilities.
func (ss *StrategyStore) generateStrategyResponses() {
ss.RLock()
strategies := make(map[string]*api_v2.SamplingStrategyResponse)
for svc, opProbabilities := range ss.probabilities {
opStrategies := make([]*api_v2.OperationSamplingStrategy, len(opProbabilities))
var idx int
for op, probability := range opProbabilities {
opStrategies[idx] = &api_v2.OperationSamplingStrategy{
Operation: op,
ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{
SamplingRate: probability,
},
}
idx++
}
strategy := ss.generateDefaultSamplingStrategyResponse()
strategy.OperationSampling.PerOperationStrategies = opStrategies
strategies[svc] = strategy
}
ss.RUnlock()

ss.Lock()
defer ss.Unlock()
ss.strategyResponses = strategies
}

func (ss *StrategyStore) generateDefaultSamplingStrategyResponse() *api_v2.SamplingStrategyResponse {
return &api_v2.SamplingStrategyResponse{
StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC,
OperationSampling: &api_v2.PerOperationSamplingStrategies{
DefaultSamplingProbability: ss.InitialSamplingProbability,
DefaultLowerBoundTracesPerSecond: ss.MinSamplesPerSecond,
},
}
}
Loading

0 comments on commit b417348

Please sign in to comment.