Skip to content

Implemented tracing sampling by db #2220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions ydb/core/cms/console/jaeger_tracing_configurator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class TJaegerTracingConfigurator : public TActorBootstrapped<TJaegerTracingConfi

void ApplyConfigs(const NKikimrConfig::TTracingConfig& cfg);
static TMaybe<ERequestType> GetRequestType(const NKikimrConfig::TTracingConfig::TSelectors& selectors);
static TMaybe<TString> GetDatabase(const NKikimrConfig::TTracingConfig::TSelectors& selectors);
static TSettings<double, TThrottlingSettings> GetSettings(const NKikimrConfig::TTracingConfig& cfg);

TSamplingThrottlingConfigurator TracingConfigurator;
Expand Down Expand Up @@ -85,18 +86,28 @@ TMaybe<ERequestType> TJaegerTracingConfigurator::GetRequestType(const NKikimrCon
return {};
}

TMaybe<TString> TJaegerTracingConfigurator::GetDatabase(const NKikimrConfig::TTracingConfig::TSelectors& selectors) {
if (selectors.HasDatabase()) {
return selectors.GetDatabase();
}
return NothingObject;
}

TSettings<double, TThrottlingSettings> TJaegerTracingConfigurator::GetSettings(const NKikimrConfig::TTracingConfig& cfg) {
TSettings<double, TThrottlingSettings> settings;

for (const auto& samplingRule : cfg.GetSampling()) {
const auto& scope = samplingRule.GetScope();

ERequestType requestType;
if (auto parsedRequestType = GetRequestType(samplingRule.GetScope())) {
if (auto parsedRequestType = GetRequestType(scope)) {
requestType = *parsedRequestType;
} else {
ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "failed to parse request type in the rule "
<< samplingRule.ShortDebugString() << ". Skipping the rule");
continue;
}

if (!samplingRule.HasLevel() || !samplingRule.HasFraction() || !samplingRule.HasMaxTracesPerMinute()) {
ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "missing required fields in rule " << samplingRule.ShortDebugString()
<< " (required fields are: level, fraction, max_traces_per_minute). Skipping the rule");
Expand Down Expand Up @@ -129,10 +140,19 @@ TSettings<double, TThrottlingSettings> TJaegerTracingConfigurator::GetSettings(c
.MaxTracesBurst = samplingRule.GetMaxTracesBurst(),
},
};
settings.SamplingRules[static_cast<size_t>(requestType)].push_back(rule);

auto& requestTypeRules = settings.SamplingRules[static_cast<size_t>(requestType)];
auto database = GetDatabase(scope);
if (database) {
requestTypeRules.DatabaseRules[*database].push_back(rule);
} else {
requestTypeRules.Global.push_back(rule);
}
}

for (const auto& throttlingRule : cfg.GetExternalThrottling()) {
const auto& scope = throttlingRule.GetScope();

ERequestType requestType;
if (auto parsedRequestType = GetRequestType(throttlingRule.GetScope())) {
requestType = *parsedRequestType;
Expand Down Expand Up @@ -161,14 +181,13 @@ TSettings<double, TThrottlingSettings> TJaegerTracingConfigurator::GetSettings(c
.MaxTracesBurst = maxBurst,
},
};
auto& currentRule = settings.ExternalThrottlingRules[static_cast<size_t>(requestType)];
if (currentRule) {
ALOG_WARN(NKikimrServices::CMS_CONFIGS, "duplicate external throttling rule for scope "
<< throttlingRule.GetScope() << ". Adding the limits");
currentRule->Throttler.MaxTracesBurst += rule.Throttler.MaxTracesBurst;
currentRule->Throttler.MaxTracesPerMinute += rule.Throttler.MaxTracesPerMinute;

auto& requestTypeRules = settings.ExternalThrottlingRules[static_cast<size_t>(requestType)];
auto database = GetDatabase(scope);
if (database) {
requestTypeRules.DatabaseRules[*database].push_back(rule);
} else {
currentRule = rule;
requestTypeRules.Global.push_back(rule);
}
}

Expand All @@ -181,7 +200,7 @@ TSettings<double, TThrottlingSettings> TJaegerTracingConfigurator::GetSettings(c
},
};

settings.ExternalThrottlingRules[static_cast<size_t>(ERequestType::UNSPECIFIED)] = rule;
settings.ExternalThrottlingRules[static_cast<size_t>(ERequestType::UNSPECIFIED)].Global.push_back(rule);
}

return settings;
Expand Down
32 changes: 31 additions & 1 deletion ydb/core/jaeger_tracing/sampling_throttling_configurator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,33 @@

namespace NKikimr::NJaegerTracing {

namespace {

template<class T>
void PropagateUnspecifiedRequest(TRulesContainer<T>& rules) {
constexpr auto unspecifiedRequestType = static_cast<size_t>(ERequestType::UNSPECIFIED);
const auto& unspecifiedRequestTypeRules = rules[unspecifiedRequestType];

for (size_t requestType = 0; requestType < kRequestTypesCnt; ++requestType) {
if (requestType == unspecifiedRequestType) {
continue;
}

auto& requestTypeDatabaseRules = rules[requestType].DatabaseRules;
auto& requestTypeGlobalRules = rules[requestType].Global;
for (const auto& [database, unspecifiedDatabaseRules] : unspecifiedRequestTypeRules.DatabaseRules) {
auto& databaseRules = requestTypeDatabaseRules[database];
databaseRules.insert(databaseRules.end(), unspecifiedDatabaseRules.begin(),
unspecifiedDatabaseRules.end());
}
requestTypeGlobalRules.insert(requestTypeGlobalRules.end(),
unspecifiedRequestTypeRules.Global.begin(),
unspecifiedRequestTypeRules.Global.end());
}
}

} // namespace anonymous

TSamplingThrottlingConfigurator::TSamplingThrottlingConfigurator(TIntrusivePtr<ITimeProvider> timeProvider,
TIntrusivePtr<IRandomProvider>& randomProvider)
: TimeProvider(std::move(timeProvider))
Expand All @@ -23,7 +50,10 @@ TIntrusivePtr<TSamplingThrottlingControl> TSamplingThrottlingConfigurator::GetCo
}

void TSamplingThrottlingConfigurator::UpdateSettings(TSettings<double, TThrottlingSettings> settings) {
CurrentSettings = GenerateThrottlers(std::move(settings));
auto enrichedSettings = GenerateThrottlers(std::move(settings));
PropagateUnspecifiedRequest(enrichedSettings.SamplingRules);
PropagateUnspecifiedRequest(enrichedSettings.ExternalThrottlingRules);
CurrentSettings = std::move(enrichedSettings);

for (auto& control : IssuedControls) {
control->UpdateImpl(GenerateSetup());
Expand Down
72 changes: 39 additions & 33 deletions ydb/core/jaeger_tracing/sampling_throttling_control_internals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,60 @@

namespace NKikimr::NJaegerTracing {

namespace {

template<class T, class TAction>
void ForEachMatchingRule(TRequestTypeRules<T>& rules, const TMaybe<TString>& database, TAction&& action) {
for (auto& rule : rules.Global) {
action(rule);
}
if (database) {
if (auto databaseRules = rules.DatabaseRules.FindPtr(*database)) {
for (auto& rule : *databaseRules) {
action(rule);
}
}
}
}

} // namespace anonymous

void TSamplingThrottlingControl::TSamplingThrottlingImpl::HandleTracing(
NWilson::TTraceId& traceId, const TRequestDiscriminator& discriminator) {
auto requestType = discriminator.RequestType;
NWilson::TTraceId& traceId, TRequestDiscriminator discriminator) {
auto requestType = static_cast<size_t>(discriminator.RequestType);
auto database = std::move(discriminator.Database);

if (traceId) {
bool throttle = Throttle(requestType);
throttle = Throttle(ERequestType::UNSPECIFIED) && throttle;
bool throttle = true;

ForEachMatchingRule(
Setup.ExternalThrottlingRules[requestType], database,
[&throttle](auto& throttlingRule) {
throttle = throttlingRule.Throttler->Throttle() && throttle;
});

if (throttle) {
traceId = {};
}
}

if (!traceId) {
TMaybe<ui8> level;
if (auto sampled_level = Sample(requestType)) {
level = sampled_level;
}
if (auto sampled_level = Sample(ERequestType::UNSPECIFIED)) {
if (!level || *sampled_level > *level) {
level = sampled_level;
}
}
ForEachMatchingRule(
Setup.SamplingRules[requestType], database,
[&level](auto& samplingRule) {
if (!samplingRule.Sampler.Sample() || samplingRule.Throttler->Throttle()) {
return;
}
if (!level || samplingRule.Level > *level) {
level = samplingRule.Level;
}
});

if (level) {
traceId = NWilson::TTraceId::NewTraceId(*level, Max<ui32>());
}
}
}

bool TSamplingThrottlingControl::TSamplingThrottlingImpl::Throttle(ERequestType requestType) {
auto& throttlingRule = Setup.ExternalThrottlingRules[static_cast<size_t>(requestType)];
if (throttlingRule) {
return throttlingRule->Throttler->Throttle();
} else {
return true;
}
}

TMaybe<ui8> TSamplingThrottlingControl::TSamplingThrottlingImpl::Sample(ERequestType requestType) {
TMaybe<ui8> level;
for (auto& samplingRule : Setup.SamplingRules[static_cast<size_t>(requestType)]) {
if (samplingRule.Sampler.Sample() && !samplingRule.Throttler->Throttle()) {
if (!level || *level < samplingRule.Level) {
level = samplingRule.Level;
}
}
}
return level;
}

} // namespace NKikimr::NJaegerTracing
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@ struct TSamplingThrottlingControl::TSamplingThrottlingImpl {

TSettings<TSampler, TIntrusivePtr<TThrottler>> Setup;

void HandleTracing(NWilson::TTraceId& traceId, const TRequestDiscriminator& discriminator);

private:
bool Throttle(ERequestType requestType);

TMaybe<ui8> Sample(ERequestType requestType);
void HandleTracing(NWilson::TTraceId& traceId, TRequestDiscriminator discriminator);
};

} // namespace NKikimr::NJaegerTracing
Loading