Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions ydb/core/grpc_services/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <ydb/core/tx/scheme_board/events.h>
#include <ydb/core/base/events.h>

#include <ydb/library/actors/wilson/wilson_span.h>

#include <util/stream/str.h>

namespace NKikimr {
Expand Down Expand Up @@ -361,6 +363,10 @@ class IRequestProxyCtx : public virtual IRequestCtxBase {
virtual void ReplyUnauthenticated(const TString& msg = "") = 0;
virtual void ReplyUnavaliable() = 0;

//tracing
virtual void StartTracing(NWilson::TSpan&& span) = 0;
virtual void LegacyFinishSpan() = 0;

// validation
virtual bool Validate(TString& error) = 0;

Expand Down Expand Up @@ -477,6 +483,9 @@ class TRefreshTokenImpl
return Token_;
}

void StartTracing(NWilson::TSpan&& /*span*/) override {}
void LegacyFinishSpan() override {}

void UpdateAuthState(NYdbGrpc::TAuthState::EAuthState state) override {
State_.State = state;
}
Expand Down Expand Up @@ -598,7 +607,7 @@ class TRefreshTokenImpl
return {};
}

TMaybe<TString> GetOpenTelemetryTraceParent() const override {
NWilson::TTraceId GetWilsonTraceId() const override {
return {};
}

Expand Down Expand Up @@ -821,8 +830,8 @@ class TGRpcRequestBiStreamWrapper
return GetPeerMetaValues(NYdb::YDB_TRACE_ID_HEADER);
}

TMaybe<TString> GetOpenTelemetryTraceParent() const override {
return GetPeerMetaValues(NYdb::OTEL_TRACE_HEADER);
NWilson::TTraceId GetWilsonTraceId() const override {
return Span_.GetTraceId();
}

const TMaybe<TString> GetSdkBuildInfo() const {
Expand Down Expand Up @@ -872,6 +881,16 @@ class TGRpcRequestBiStreamWrapper
Y_ABORT("unimplemented for TGRpcRequestBiStreamWrapper");
}

// IRequestProxyCtx
//
void StartTracing(NWilson::TSpan&& span) override {
Span_ = std::move(span);
}

void LegacyFinishSpan() override {
Span_.End();
}

// IRequestCtxBase
//
void AddAuditLogPart(const TStringBuf&, const TString&) override {
Expand All @@ -889,6 +908,7 @@ class TGRpcRequestBiStreamWrapper
TMaybe<NRpcService::TRlPath> RlPath_;
bool RlAllowed_;
IGRpcProxyCounters::TPtr Counters_;
NWilson::TSpan Span_;
};

template <typename TDerived>
Expand Down Expand Up @@ -1147,8 +1167,8 @@ class TGRpcRequestWrapperImpl
return GetPeerMetaValues(NYdb::YDB_TRACE_ID_HEADER);
}

TMaybe<TString> GetOpenTelemetryTraceParent() const override {
return GetPeerMetaValues(NYdb::OTEL_TRACE_HEADER);
NWilson::TTraceId GetWilsonTraceId() const override {
return Span_.GetTraceId();
}

const TMaybe<TString> GetSdkBuildInfo() const {
Expand Down Expand Up @@ -1277,6 +1297,12 @@ class TGRpcRequestWrapperImpl
return AuditLogParts;
}

void StartTracing(NWilson::TSpan&& span) override {
Span_ = std::move(span);
}

void LegacyFinishSpan() override {}

void ReplyGrpcError(grpc::StatusCode code, const TString& msg, const TString& details = "") {
Ctx_->ReplyError(code, msg, details);
}
Expand Down Expand Up @@ -1316,6 +1342,8 @@ class TGRpcRequestWrapperImpl
};
}

protected:
NWilson::TSpan Span_;
private:
TIntrusivePtr<NYdbGrpc::IRequestContextBase> Ctx_;
TIntrusiveConstPtr<NACLib::TUserToken> InternalToken_;
Expand Down Expand Up @@ -1393,6 +1421,8 @@ class TGrpcRequestCall
{ }

void Pass(const IFacilityProvider& facility) override {
this->Span_.End();

PassMethod(std::move(std::unique_ptr<TRequestIface>(this)), facility);
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/grpc_services/base/iface.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <util/generic/fwd.h>
#include <ydb/library/actors/wilson/wilson_span.h>

namespace google::protobuf {
class Message;
Expand All @@ -21,7 +22,7 @@ using TAuditLogHook = std::function<void (ui32 status, const TAuditLogParts&)>;
class IRequestCtxBaseMtSafe {
public:
virtual TMaybe<TString> GetTraceId() const = 0;
virtual TMaybe<TString> GetOpenTelemetryTraceParent() const = 0;
virtual NWilson::TTraceId GetWilsonTraceId() const = 0;
// Returns client provided database name
virtual const TMaybe<TString> GetDatabaseName() const = 0;
// Returns "internal" token (result of ticket parser authentication)
Expand Down
1 change: 1 addition & 0 deletions ydb/core/grpc_services/grpc_request_check_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ class TGrpcRequestCheckActor

template <typename T>
void HandleAndDie(T& event) {
GrpcRequestBaseCtx_->LegacyFinishSpan();
TGRpcRequestProxyHandleMethods::Handle(event, TlsActivationContext->AsActorContext());
TBase::PassAway();
}
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/grpc_services/grpc_request_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <ydb/core/grpc_services/counters/proxy_counters.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/scheme_board/scheme_board.h>
#include <ydb/library/wilson_ids/wilson.h>

#include <shared_mutex>

Expand Down Expand Up @@ -79,19 +80,23 @@ class TGRpcRequestProxyImpl
void HandleSchemeBoard(TSchemeBoardEvents::TEvNotifyDelete::TPtr& ev);
void ReplayEvents(const TString& databaseName, const TActorContext& ctx);

void StartTracing(IRequestProxyCtx& ctx);

static bool IsAuthStateOK(const IRequestProxyCtx& ctx);

template <typename TEvent>
void Handle(TAutoPtr<TEventHandle<TEvent>>& event, const TActorContext& ctx) {
IRequestProxyCtx* requestBaseCtx = event->Get();
if (ValidateAndReplyOnError(requestBaseCtx)) {
requestBaseCtx->LegacyFinishSpan();
TGRpcRequestProxyHandleMethods::Handle(event, ctx);
}
}

void Handle(TEvListEndpointsRequest::TPtr& event, const TActorContext& ctx) {
IRequestProxyCtx* requestBaseCtx = event->Get();
if (ValidateAndReplyOnError(requestBaseCtx)) {
requestBaseCtx->LegacyFinishSpan();
TGRpcRequestProxy::Handle(event, ctx);
}
}
Expand Down Expand Up @@ -145,6 +150,9 @@ class TGRpcRequestProxyImpl
return;
}


//StartTracing(*requestBaseCtx);

if (IsAuthStateOK(*requestBaseCtx)) {
Handle(event, ctx);
return;
Expand Down Expand Up @@ -401,6 +409,12 @@ bool TGRpcRequestProxyImpl::IsAuthStateOK(const IRequestProxyCtx& ctx) {
state.NeedAuth == false && !ctx.GetYdbToken();
}

void TGRpcRequestProxyImpl::StartTracing(IRequestProxyCtx& ctx) {
auto traceId = NWilson::TTraceId::NewTraceId(15, Max<ui32>());
NWilson::TSpan grpcRequestProxySpan(TWilsonGrpc::RequestProxy, std::move(traceId), "GrpcRequestProxy");
ctx.StartTracing(std::move(grpcRequestProxySpan));
}

void TGRpcRequestProxyImpl::HandleSchemeBoard(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev, const TActorContext& ctx) {
TString databaseName = ev->Get()->Path;
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::GRPC_SERVER, "SchemeBoardUpdate " << databaseName);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/grpc_services/local_rpc/local_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ class TLocalRpcCtx : public NGRpcService::IRequestOpCtx {
return Nothing();
}

TMaybe<TString> GetOpenTelemetryTraceParent() const override {
return Nothing();
NWilson::TTraceId GetWilsonTraceId() const override {
return {};
}

TInstant GetDeadline() const override {
Expand Down
6 changes: 6 additions & 0 deletions ydb/library/wilson_ids/wilson.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,10 @@ namespace NKikimr {
};
};

struct TWilsonGrpc {
enum {
RequestProxy = 9,
};
};

} // NKikimr