Skip to content

Implemented first version of dynamic jaeger tracing configuration #1484

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/core/cms/console/configs_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ const THashSet<ui32> DYNAMIC_KINDS({
(ui32)NKikimrConsole::TConfigItem::TenantPoolConfigItem,
(ui32)NKikimrConsole::TConfigItem::TenantSlotBrokerConfigItem,
(ui32)NKikimrConsole::TConfigItem::AllowEditYamlInUiItem,
(ui32)NKikimrConsole::TConfigItem::BackgroundCleaningConfigItem
(ui32)NKikimrConsole::TConfigItem::BackgroundCleaningConfigItem,
(ui32)NKikimrConsole::TConfigItem::TracingConfigItem,
});

const THashSet<ui32> NON_YAML_KINDS({
Expand Down
82 changes: 82 additions & 0 deletions ydb/core/cms/console/jaeger_tracing_configurator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#include "jaeger_tracing_configurator.h"

#include "configs_dispatcher.h"
#include "console.h"

#include <ydb/core/jaeger_tracing/sampling_throttling_configurator.h>
#include <ydb/library/actors/core/actor.h>

namespace NKikimr::NConsole {

class TJaegerTracingConfigurator : public TActorBootstrapped<TJaegerTracingConfigurator> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::JAEGER_TRACING_CONFIGURATOR;
}

TJaegerTracingConfigurator(NJaegerTracing::TSamplingThrottlingConfigurator tracingConfigurator,
const NKikimrConfig::TTracingConfig& cfg);

void Bootstrap(const TActorContext& ctx);

private:
void Handle(TEvConsole::TEvConfigNotificationRequest::TPtr& ev, const TActorContext& ctx);

STRICT_STFUNC(StateWork,
HFunc(TEvConsole::TEvConfigNotificationRequest, Handle)
IgnoreFunc(TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse)
)

TMaybe<TString> ApplyConfigs(const NKikimrConfig::TTracingConfig& cfg);

NJaegerTracing::TSamplingThrottlingConfigurator TracingConfigurator;
};

TJaegerTracingConfigurator::TJaegerTracingConfigurator(
NJaegerTracing::TSamplingThrottlingConfigurator tracingConfigurator,
const NKikimrConfig::TTracingConfig& cfg)
: TracingConfigurator(std::move(tracingConfigurator))
{
if (auto err = ApplyConfigs(cfg)) {
Cerr << "Failed to apply initial tracing configs: " << *err << Endl;
}
}

void TJaegerTracingConfigurator::Bootstrap(const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::CMS_CONFIGS, "TJaegerTracingConfigurator: Bootstrap");
Become(&TThis::StateWork);

LOG_DEBUG_S(ctx, NKikimrServices::CMS_CONFIGS, "TJaegerTracingConfigurator: subscribe to config updates");
ui32 item = static_cast<ui32>(NKikimrConsole::TConfigItem::TracingConfigItem);
ctx.Send(MakeConfigsDispatcherID(SelfId().NodeId()),
new TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest(item));
}

void TJaegerTracingConfigurator::Handle(TEvConsole::TEvConfigNotificationRequest::TPtr& ev, const TActorContext& ctx) {
auto& rec = ev->Get()->Record;

LOG_INFO_S(ctx, NKikimrServices::CMS_CONFIGS,
"TJaegerTracingConfigurator: got new config: "
<< rec.GetConfig().ShortDebugString());

if (auto err = ApplyConfigs(rec.GetConfig().GetTracingConfig())) {
LOG_NOTICE_S(ctx, NKikimrServices::CMS_CONFIGS, "TJaegerTracingConfigurator: Failed to apply tracing configs: " << *err);
}

auto resp = MakeHolder<TEvConsole::TEvConfigNotificationResponse>(rec);
LOG_TRACE_S(ctx, NKikimrServices::CMS_CONFIGS,
"TJaegerTracingConfigurator: Send TEvConfigNotificationResponse: "
<< resp->Record.ShortDebugString());
ctx.Send(ev->Sender, resp.Release(), 0, ev->Cookie);
}

TMaybe<TString> TJaegerTracingConfigurator::ApplyConfigs(const NKikimrConfig::TTracingConfig& cfg) {
return TracingConfigurator.HandleConfigs(cfg);
}

IActor* CreateJaegerTracingConfigurator(NJaegerTracing::TSamplingThrottlingConfigurator tracingConfigurator,
const NKikimrConfig::TTracingConfig& cfg) {
return new TJaegerTracingConfigurator(std::move(tracingConfigurator), cfg);
}

} // namespace NKikimr::NConsole
13 changes: 13 additions & 0 deletions ydb/core/cms/console/jaeger_tracing_configurator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#pragma once

#include "defs.h"

#include <ydb/core/jaeger_tracing/sampling_throttling_configurator.h>
#include <ydb/core/protos/config.pb.h>

namespace NKikimr::NConsole {

IActor* CreateJaegerTracingConfigurator(NJaegerTracing::TSamplingThrottlingConfigurator tracingConfigurator,
const NKikimrConfig::TTracingConfig& cfg);

} // namespace NKikimr::NConsole
2 changes: 2 additions & 0 deletions ydb/core/cms/console/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ SRCS(
http.h
immediate_controls_configurator.cpp
immediate_controls_configurator.h
jaeger_tracing_configurator.cpp
jaeger_tracing_configurator.h
log_settings_configurator.cpp
log_settings_configurator.h
logger.cpp
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/driver_lib/run/factories.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ struct TModuleFactories {
std::shared_ptr<NHttpProxy::IAuthFactory> DataStreamsAuthFactory;
std::vector<NKikimr::NMiniKQL::TComputationNodeFactory> AdditionalComputationNodeFactories;

std::unique_ptr<NWilson::IGrpcSigner>(*WilsonGrpcSignerFactory)(const NKikimrConfig::TTracingConfig::TAuthConfig&);
std::unique_ptr<NWilson::IGrpcSigner>(*WilsonGrpcSignerFactory)(const NKikimrConfig::TTracingConfig::TBackendConfig::TAuthConfig&);

~TModuleFactories();
};
Expand Down
50 changes: 35 additions & 15 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <ydb/core/cms/console/configs_cache.h>
#include <ydb/core/cms/console/console.h>
#include <ydb/core/cms/console/immediate_controls_configurator.h>
#include <ydb/core/cms/console/jaeger_tracing_configurator.h>
#include <ydb/core/cms/console/log_settings_configurator.h>
#include <ydb/core/cms/console/shared_cache_configurator.h>
#include <ydb/core/cms/console/validators/core_validators.h>
Expand Down Expand Up @@ -826,22 +827,34 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
}
}

if (Config.HasTracingConfig()) {
const auto& tracing = Config.GetTracingConfig();
if (Config.HasTracingConfig() && Config.GetTracingConfig().HasBackend()) {
const auto& tracing_backend = Config.GetTracingConfig().GetBackend();
std::unique_ptr<NWilson::IGrpcSigner> grpcSigner;
if (tracing.HasAuthConfig() && Factories && Factories->WilsonGrpcSignerFactory) {
grpcSigner = Factories->WilsonGrpcSignerFactory(tracing.GetAuthConfig());
if (tracing_backend.HasAuthConfig() && Factories && Factories->WilsonGrpcSignerFactory) {
grpcSigner = Factories->WilsonGrpcSignerFactory(tracing_backend.GetAuthConfig());
}
NActors::IActor* wilsonUploader = nullptr;
switch (tracing_backend.GetBackendCase()) {
case NKikimrConfig::TTracingConfig::TBackendConfig::BackendCase::kOpentelemetry: {
const auto& opentelemetry = tracing_backend.GetOpentelemetry();
wilsonUploader = NWilson::WilsonUploaderParams {
.CollectorUrl = opentelemetry.GetCollectorUrl(),
.ServiceName = opentelemetry.GetServiceName(),
.GrpcSigner = std::move(grpcSigner),
}.CreateUploader();
break;
}

case NKikimrConfig::TTracingConfig::TBackendConfig::BackendCase::BACKEND_NOT_SET: {
Y_DEBUG_ABORT_UNLESS(false, "No backend option was provided in backend config");
break;
}
}
if (wilsonUploader) {
setup->LocalServices.emplace_back(
NWilson::MakeWilsonUploaderId(),
TActorSetupCmd(wilsonUploader, TMailboxType::ReadAsFilled, appData->BatchPoolId));
}
auto wilsonUploader = NWilson::WilsonUploaderParams {
.Host = tracing.GetHost(),
.Port = static_cast<ui16>(tracing.GetPort()),
.RootCA = tracing.GetRootCA(),
.ServiceName = tracing.GetServiceName(),
.GrpcSigner = std::move(grpcSigner),
}.CreateUploader();
setup->LocalServices.emplace_back(
NWilson::MakeWilsonUploaderId(),
TActorSetupCmd(wilsonUploader, TMailboxType::ReadAsFilled, appData->BatchPoolId));
}
}

Expand Down Expand Up @@ -1616,15 +1629,22 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se

if (!IsServiceInitialized(setup, NGRpcService::CreateGRpcRequestProxyId(0))) {
const size_t proxyCount = Config.HasGRpcConfig() ? Config.GetGRpcConfig().GetGRpcProxyCount() : 1UL;
NJaegerTracing::TSamplingThrottlingConfigurator tracingConfigurator(appData->TimeProvider, appData->RandomProvider);
for (size_t i = 0; i < proxyCount; ++i) {
auto grpcReqProxy = Config.HasGRpcConfig() && Config.GetGRpcConfig().GetSkipSchemeCheck()
? NGRpcService::CreateGRpcRequestProxySimple(Config)
: NGRpcService::CreateGRpcRequestProxy(Config, appData->Icb);
: NGRpcService::CreateGRpcRequestProxy(Config, tracingConfigurator.GetControl());
setup->LocalServices.push_back(std::pair<TActorId,
TActorSetupCmd>(NGRpcService::CreateGRpcRequestProxyId(i),
TActorSetupCmd(grpcReqProxy, TMailboxType::ReadAsFilled,
appData->UserPoolId)));
}
setup->LocalServices.push_back(std::pair<TActorId, TActorSetupCmd>(
TActorId(),
TActorSetupCmd(
NConsole::CreateJaegerTracingConfigurator(std::move(tracingConfigurator), Config.GetTracingConfig()),
TMailboxType::ReadAsFilled,
appData->UserPoolId)));
}

if (!IsServiceInitialized(setup, NKesus::MakeKesusProxyServiceId())) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ PEERDIR(
ydb/core/grpc_services/auth_processor
ydb/core/health_check
ydb/core/http_proxy
ydb/core/jaeger_tracing
ydb/core/kesus/proxy
ydb/core/kesus/tablet
ydb/core/keyvalue
Expand Down
49 changes: 7 additions & 42 deletions ydb/core/grpc_services/grpc_request_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ydb/core/cms/console/console.h>
#include <ydb/core/control/common_controls/tracing_control.h>
#include <ydb/core/grpc_services/counters/proxy_counters.h>
#include <ydb/core/jaeger_tracing/sampling_throttling_control.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/scheme_board/scheme_board.h>
#include <ydb/library/wilson_ids/wilson.h>
Expand Down Expand Up @@ -61,9 +62,9 @@ class TGRpcRequestProxyImpl
{
using TBase = TActorBootstrapped<TGRpcRequestProxyImpl>;
public:
explicit TGRpcRequestProxyImpl(const NKikimrConfig::TAppConfig& appConfig, TIntrusivePtr<TControlBoard> icb)
explicit TGRpcRequestProxyImpl(const NKikimrConfig::TAppConfig& appConfig, TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> tracingControl)
: ChannelBufferSize(appConfig.GetTableServiceConfig().GetResourceManager().GetChannelBufferSize())
, Icb(std::move(icb))
, TracingControl(std::move(tracingControl))
{ }

void Bootstrap(const TActorContext& ctx);
Expand All @@ -82,8 +83,6 @@ class TGRpcRequestProxyImpl
void HandleSchemeBoard(TSchemeBoardEvents::TEvNotifyDelete::TPtr& ev);
void ReplayEvents(const TString& databaseName, const TActorContext& ctx);

static TString InternalRequestTypeToControlDomain(const TString& type);
TTracingControl& GetTracingControl(const TString& type);
void MaybeStartTracing(IRequestProxyCtx& ctx);

static bool IsAuthStateOK(const IRequestProxyCtx& ctx);
Expand Down Expand Up @@ -315,8 +314,7 @@ class TGRpcRequestProxyImpl
bool DynamicNode = false;
TString RootDatabase;
IGRpcProxyCounters::TPtr Counters;
THashMap<TString, TTracingControl> TracingControls;
TIntrusivePtr<TControlBoard> Icb;
TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> TracingControl;
};

void TGRpcRequestProxyImpl::Bootstrap(const TActorContext& ctx) {
Expand Down Expand Up @@ -415,31 +413,6 @@ bool TGRpcRequestProxyImpl::IsAuthStateOK(const IRequestProxyCtx& ctx) {
state.NeedAuth == false && !ctx.GetYdbToken();
}

TString TGRpcRequestProxyImpl::InternalRequestTypeToControlDomain(const TString& type) {
static constexpr TStringBuf ydbNamespacePrefix = "Ydb.";
static constexpr TStringBuf requestSuffix = "Request";

TString controlDomain = type;
if (controlDomain.StartsWith(ydbNamespacePrefix)) {
controlDomain.erase(0, ydbNamespacePrefix.size());
}
if (controlDomain.EndsWith(requestSuffix)) {
controlDomain.erase(controlDomain.size() - requestSuffix.size());
}

return controlDomain;
}

TTracingControl& TGRpcRequestProxyImpl::GetTracingControl(const TString& type) {
if (auto it = TracingControls.find(type); it != TracingControls.end()) {
return it->second;
}
auto tracingControlsDomain = InternalRequestTypeToControlDomain(type);
auto domain = TString::Join("TracingControls.", tracingControlsDomain);
TTracingControl control(Icb, TAppData::TimeProvider, TAppData::RandomProvider, std::move(domain));
return TracingControls.emplace(type, std::move(control)).first->second;
}

void TGRpcRequestProxyImpl::MaybeStartTracing(IRequestProxyCtx& ctx) {
auto requestType = ctx.GetInternalRequestType();
if (requestType.empty()) {
Expand All @@ -449,15 +422,7 @@ void TGRpcRequestProxyImpl::MaybeStartTracing(IRequestProxyCtx& ctx) {
if (const auto otelHeader = ctx.GetPeerMetaValues(NYdb::OTEL_TRACE_HEADER)) {
traceId = NWilson::TTraceId::FromTraceparentHeader(otelHeader.GetRef());
}
auto& control = GetTracingControl(requestType);
if (traceId && control.ThrottleExternal()) {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::GRPC_SERVER, "Dropping external traceId " << traceId.GetHexTraceId() << " for request type " << requestType);
traceId = {};
}
if (!traceId && control.SampleThrottle()) {
traceId = NWilson::TTraceId::NewTraceId(control.SampledVerbosity(), 4095);
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::GRPC_SERVER, "Created new traceId " << traceId.GetHexTraceId() << " for request type " << requestType);
}
TracingControl->HandleTracing(traceId, NJaegerTracing::TRequestDiscriminator{});
if (traceId) {
NWilson::TSpan grpcRequestProxySpan(TWilsonGrpc::RequestProxy, std::move(traceId), "GrpcRequestProxy");
ctx.StartTracing(std::move(grpcRequestProxySpan));
Expand Down Expand Up @@ -617,8 +582,8 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev) {
}
}

IActor* CreateGRpcRequestProxy(const NKikimrConfig::TAppConfig& appConfig, TIntrusivePtr<TControlBoard> icb) {
return new TGRpcRequestProxyImpl(appConfig, std::move(icb));
IActor* CreateGRpcRequestProxy(const NKikimrConfig::TAppConfig& appConfig, TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> tracingControl) {
return new TGRpcRequestProxyImpl(appConfig, std::move(tracingControl));
}

} // namespace NGRpcService
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/grpc_services/grpc_request_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "grpc_request_proxy_handle_methods.h"

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/jaeger_tracing/sampling_throttling_control.h>

#include <ydb/library/actors/core/actor.h>

Expand All @@ -23,7 +24,7 @@ struct TAppData;
namespace NGRpcService {

TString DatabaseFromDomain(const TAppData* appdata);
IActor* CreateGRpcRequestProxy(const NKikimrConfig::TAppConfig& appConfig, TIntrusivePtr<TControlBoard> icb);
IActor* CreateGRpcRequestProxy(const NKikimrConfig::TAppConfig& appConfig, TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> tracingControl);
IActor* CreateGRpcRequestProxySimple(const NKikimrConfig::TAppConfig& appConfig);

class TGRpcRequestProxy : public TGRpcRequestProxyHandleMethods, public IFacilityProvider {
Expand Down
27 changes: 27 additions & 0 deletions ydb/core/jaeger_tracing/control.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include "library/cpp/deprecated/atomic/atomic.h"

#include <util/generic/ptr.h>

namespace NKikimr::NJaegerTracing {

struct TControl
: public TThrRefBase
{
TControl(ui64 initial = 0)
: Value(initial)
{}

ui64 Get() const {
return AtomicGet(Value);
}

void Set(ui64 newValue) {
AtomicSet(Value, newValue);
}

TAtomic Value;
};

} // namespace NKikimr::NJaegerTracing
23 changes: 23 additions & 0 deletions ydb/core/jaeger_tracing/control_wrapper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#pragma once

#include "control.h"

namespace NKikimr::NJaegerTracing {

class TControlWrapper {
public:
TControlWrapper(ui64 initial = 0) : Control(MakeIntrusive<TControl>(initial)) {}

ui64 Get() const {
return Control->Get();
}

void Set(ui64 newValue) {
Control->Set(newValue);
}

private:
TIntrusivePtr<TControl> Control;
};

} // namespace NKikimr::NJaegerTracing
8 changes: 8 additions & 0 deletions ydb/core/jaeger_tracing/request_discriminator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#pragma once

namespace NKikimr::NJaegerTracing {

struct TRequestDiscriminator {
};

} // namespace NKikimr::NJaegerTracing
Loading