Skip to content

Commit 00bc4be

Browse files
committed
Added proper request finishing in grpc request proxy (ydb-platform#2568)
1 parent aaf38c3 commit 00bc4be

File tree

6 files changed

+74
-39
lines changed

6 files changed

+74
-39
lines changed

ydb/core/grpc_services/base/base.h

+7-7
Original file line numberDiff line numberDiff line change
@@ -365,9 +365,9 @@ class IRequestProxyCtx : public virtual IRequestCtxBase {
365365
virtual void ReplyUnauthenticated(const TString& msg = "") = 0;
366366
virtual void ReplyUnavaliable() = 0;
367367

368-
//tracing
368+
// tracing
369369
virtual void StartTracing(NWilson::TSpan&& span) = 0;
370-
virtual void LegacyFinishSpan() = 0;
370+
virtual void FinishSpan() = 0;
371371

372372
// Used for per-type sampling
373373
virtual NJaegerTracing::TRequestDiscriminator GetRequestDiscriminator() const {
@@ -491,7 +491,7 @@ class TRefreshTokenImpl
491491
}
492492

493493
void StartTracing(NWilson::TSpan&& /*span*/) override {}
494-
void LegacyFinishSpan() override {}
494+
void FinishSpan() override {}
495495

496496
void UpdateAuthState(NYdbGrpc::TAuthState::EAuthState state) override {
497497
State_.State = state;
@@ -893,7 +893,7 @@ class TGRpcRequestBiStreamWrapper
893893
Span_ = std::move(span);
894894
}
895895

896-
void LegacyFinishSpan() override {
896+
void FinishSpan() override {
897897
Span_.End();
898898
}
899899

@@ -1307,7 +1307,9 @@ class TGRpcRequestWrapperImpl
13071307
Span_ = std::move(span);
13081308
}
13091309

1310-
void LegacyFinishSpan() override {}
1310+
void FinishSpan() override {
1311+
Span_.End();
1312+
}
13111313

13121314
void ReplyGrpcError(grpc::StatusCode code, const TString& msg, const TString& details = "") {
13131315
Ctx_->ReplyError(code, msg, details);
@@ -1427,8 +1429,6 @@ class TGrpcRequestCall
14271429
{ }
14281430

14291431
void Pass(const IFacilityProvider& facility) override {
1430-
this->Span_.End();
1431-
14321432
try {
14331433
PassMethod(std::move(std::unique_ptr<TRequestIface>(this)), facility);
14341434
} catch (const std::exception& ex) {

ydb/core/grpc_services/grpc_request_check_actor.h

+10-2
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,10 @@ class TGrpcRequestCheckActor
8989
, Request_(std::move(request))
9090
, Counters_(counters)
9191
, SecurityObject_(std::move(securityObject))
92+
, GrpcRequestBaseCtx_(Request_->Get())
9293
, SkipCheckConnectRigths_(skipCheckConnectRigths)
9394
, FacilityProvider_(facilityProvider)
9495
{
95-
GrpcRequestBaseCtx_ = Request_->Get();
9696
TMaybe<TString> authToken = GrpcRequestBaseCtx_->GetYdbToken();
9797
if (authToken) {
9898
TString peerName = GrpcRequestBaseCtx_->GetPeerName();
@@ -225,6 +225,7 @@ class TGrpcRequestCheckActor
225225
}
226226

227227
void HandlePoison(TEvents::TEvPoisonPill::TPtr&) {
228+
GrpcRequestBaseCtx_->FinishSpan();
228229
TBase::PassAway();
229230
}
230231

@@ -374,29 +375,34 @@ class TGrpcRequestCheckActor
374375
void ReplyUnauthorizedAndDie(const NYql::TIssue& issue) {
375376
GrpcRequestBaseCtx_->RaiseIssue(issue);
376377
GrpcRequestBaseCtx_->ReplyWithYdbStatus(Ydb::StatusIds::UNAUTHORIZED);
378+
GrpcRequestBaseCtx_->FinishSpan();
377379
TBase::PassAway();
378380
}
379381

380382
void ReplyUnavailableAndDie(const NYql::TIssue& issue) {
381383
GrpcRequestBaseCtx_->RaiseIssue(issue);
382384
GrpcRequestBaseCtx_->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE);
385+
GrpcRequestBaseCtx_->FinishSpan();
383386
TBase::PassAway();
384387
}
385388

386389
void ReplyUnavailableAndDie(const NYql::TIssues& issue) {
387390
GrpcRequestBaseCtx_->RaiseIssues(issue);
388391
GrpcRequestBaseCtx_->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE);
392+
GrpcRequestBaseCtx_->FinishSpan();
389393
TBase::PassAway();
390394
}
391395

392396
void ReplyUnauthenticatedAndDie() {
393397
GrpcRequestBaseCtx_->ReplyUnauthenticated("Unknown database");
398+
GrpcRequestBaseCtx_->FinishSpan();
394399
TBase::PassAway();
395400
}
396401

397402
void ReplyOverloadedAndDie(const NYql::TIssue& issue) {
398403
GrpcRequestBaseCtx_->RaiseIssue(issue);
399404
GrpcRequestBaseCtx_->ReplyWithYdbStatus(Ydb::StatusIds::OVERLOADED);
405+
GrpcRequestBaseCtx_->FinishSpan();
400406
TBase::PassAway();
401407
}
402408

@@ -413,6 +419,7 @@ class TGrpcRequestCheckActor
413419
// and authorization check against the database
414420
AuditRequest(GrpcRequestBaseCtx_, CheckedDatabaseName_, TBase::GetUserSID());
415421

422+
GrpcRequestBaseCtx_->FinishSpan();
416423
event->Release().Release()->Pass(*this);
417424
TBase::PassAway();
418425
}
@@ -428,12 +435,13 @@ class TGrpcRequestCheckActor
428435

429436
template <typename T>
430437
void HandleAndDie(T& event) {
431-
GrpcRequestBaseCtx_->LegacyFinishSpan();
438+
GrpcRequestBaseCtx_->FinishSpan();
432439
TGRpcRequestProxyHandleMethods::Handle(event, TlsActivationContext->AsActorContext());
433440
TBase::PassAway();
434441
}
435442

436443
void ReplyBackAndDie() {
444+
GrpcRequestBaseCtx_->FinishSpan();
437445
TlsActivationContext->Send(Request_->Forward(Owner_));
438446
TBase::PassAway();
439447
}

ydb/core/grpc_services/grpc_request_proxy.cpp

+41-20
Original file line numberDiff line numberDiff line change
@@ -90,22 +90,23 @@ class TGRpcRequestProxyImpl
9090
void Handle(TAutoPtr<TEventHandle<TEvent>>& event, const TActorContext& ctx) {
9191
IRequestProxyCtx* requestBaseCtx = event->Get();
9292
if (ValidateAndReplyOnError(requestBaseCtx)) {
93-
requestBaseCtx->LegacyFinishSpan();
93+
requestBaseCtx->FinishSpan();
9494
TGRpcRequestProxyHandleMethods::Handle(event, ctx);
9595
}
9696
}
9797

9898
void Handle(TEvListEndpointsRequest::TPtr& event, const TActorContext& ctx) {
9999
IRequestProxyCtx* requestBaseCtx = event->Get();
100100
if (ValidateAndReplyOnError(requestBaseCtx)) {
101-
requestBaseCtx->LegacyFinishSpan();
101+
requestBaseCtx->FinishSpan();
102102
TGRpcRequestProxy::Handle(event, ctx);
103103
}
104104
}
105105

106106
void Handle(TEvProxyRuntimeEvent::TPtr& event, const TActorContext&) {
107107
IRequestProxyCtx* requestBaseCtx = event->Get();
108108
if (ValidateAndReplyOnError(requestBaseCtx)) {
109+
requestBaseCtx->FinishSpan();
109110
event->Release().Release()->Pass(*this);
110111
}
111112
}
@@ -137,34 +138,32 @@ class TGRpcRequestProxyImpl
137138
return true;
138139
}
139140

140-
template <typename TEvent>
141-
void PreHandle(TAutoPtr<TEventHandle<TEvent>>& event, const TActorContext& ctx) {
141+
template<class TEvent>
142+
bool PreHandleImpl(TAutoPtr<TEventHandle<TEvent>>& event, const TActorContext& ctx) {
142143
IRequestProxyCtx* requestBaseCtx = event->Get();
143-
144-
LogRequest(event);
145-
146144
if (!SchemeCache) {
147145
const TString error = "Grpc proxy is not ready to accept request, no proxy service";
148146
LOG_ERROR_S(ctx, NKikimrServices::GRPC_SERVER, error);
149147
const auto issue = MakeIssue(NKikimrIssues::TIssuesIds::GENERIC_TXPROXY_ERROR, error);
150148
requestBaseCtx->RaiseIssue(issue);
151149
requestBaseCtx->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE);
152-
return;
150+
return true;
153151
}
154152

155-
156-
MaybeStartTracing(*requestBaseCtx);
153+
if (!IsHandlingDeferred) {
154+
MaybeStartTracing(*requestBaseCtx);
155+
}
157156

158157
if (IsAuthStateOK(*requestBaseCtx)) {
159158
Handle(event, ctx);
160-
return;
159+
return false;
161160
}
162161

163162
auto state = requestBaseCtx->GetAuthState();
164163

165164
if (state.State == NYdbGrpc::TAuthState::AS_FAIL) {
166165
requestBaseCtx->ReplyUnauthenticated();
167-
return;
166+
return true;
168167
}
169168

170169
if (state.State == NYdbGrpc::TAuthState::AS_UNAVAILABLE) {
@@ -173,7 +172,7 @@ class TGRpcRequestProxyImpl
173172
const auto issue = MakeIssue(NKikimrIssues::TIssuesIds::YDB_AUTH_UNAVAILABLE, error);
174173
requestBaseCtx->RaiseIssue(issue);
175174
requestBaseCtx->ReplyUnavaliable();
176-
return;
175+
return true;
177176
}
178177

179178
TString databaseName;
@@ -190,7 +189,7 @@ class TGRpcRequestProxyImpl
190189
} else {
191190
if (!AllowYdbRequestsWithoutDatabase && DynamicNode) {
192191
requestBaseCtx->ReplyUnauthenticated("Requests without specified database is not allowed");
193-
return;
192+
return true;
194193
} else {
195194
databaseName = RootDatabase;
196195
skipResourceCheck = true;
@@ -199,8 +198,9 @@ class TGRpcRequestProxyImpl
199198
}
200199
if (databaseName.empty()) {
201200
Counters->IncDatabaseUnavailableCounter();
201+
requestBaseCtx->FinishSpan();
202202
requestBaseCtx->ReplyUnauthenticated("Empty database name");
203-
return;
203+
return true;
204204
}
205205
auto it = Databases.find(databaseName);
206206
if (it != Databases.end() && it->second.IsDatabaseReady()) {
@@ -214,8 +214,9 @@ class TGRpcRequestProxyImpl
214214
const auto issue = MakeIssue(NKikimrIssues::TIssuesIds::YDB_DB_NOT_READY, error);
215215
requestBaseCtx->RaiseIssue(issue);
216216
requestBaseCtx->ReplyUnavaliable();
217+
return true;
217218
}
218-
return;
219+
return false;
219220
}
220221
}
221222

@@ -234,7 +235,7 @@ class TGRpcRequestProxyImpl
234235
auto issue = MakeIssue(NKikimrIssues::TIssuesIds::ACCESS_DENIED, error);
235236
requestBaseCtx->RaiseIssue(issue);
236237
requestBaseCtx->ReplyWithYdbStatus(Ydb::StatusIds::UNAUTHORIZED);
237-
return;
238+
return true;
238239
}
239240
}
240241
if (domain.GetDomainState().GetDiskQuotaExceeded()) {
@@ -245,15 +246,15 @@ class TGRpcRequestProxyImpl
245246
auto issue = MakeIssue(NKikimrIssues::TIssuesIds::YDB_DB_NOT_READY, "database unavailable");
246247
requestBaseCtx->RaiseIssue(issue);
247248
requestBaseCtx->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE);
248-
return;
249+
return true;
249250
}
250251

251252
if (requestBaseCtx->IsClientLost()) {
252253
// Any status here
253254
LOG_DEBUG(*TlsActivationContext, NKikimrServices::GRPC_SERVER,
254255
"Client was disconnected before processing request (grpc request proxy)");
255256
requestBaseCtx->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE);
256-
return;
257+
return true;
257258
}
258259

259260
Register(CreateGrpcRequestCheckActor<TEvent>(SelfId(),
@@ -263,13 +264,23 @@ class TGRpcRequestProxyImpl
263264
Counters,
264265
skipCheckConnectRigths,
265266
this));
266-
return;
267+
return false;
267268
}
268269

269270
// in case we somehow skipped all auth checks
270271
const auto issue = MakeIssue(NKikimrIssues::TIssuesIds::GENERIC_TXPROXY_ERROR, "Can't authenticate request");
271272
requestBaseCtx->RaiseIssue(issue);
272273
requestBaseCtx->ReplyWithYdbStatus(Ydb::StatusIds::BAD_REQUEST);
274+
return true;
275+
}
276+
277+
template <typename TEvent>
278+
void PreHandle(TAutoPtr<TEventHandle<TEvent>>& event, const TActorContext& ctx) {
279+
LogRequest(event);
280+
281+
if (PreHandleImpl(event, ctx)) {
282+
event->Get()->FinishSpan();
283+
}
273284
}
274285

275286
void ForgetDatabase(const TString& database);
@@ -286,11 +297,14 @@ class TGRpcRequestProxyImpl
286297
}
287298

288299
virtual void PassAway() override {
300+
auto prevIsHandlingDeferred = std::exchange(IsHandlingDeferred, true);
289301
for (auto& [_, queue] : DeferredEvents) {
290302
for (TEventReqHolder& req : queue) {
291303
req.Ctx->ReplyUnavaliable();
304+
req.Ctx->FinishSpan();
292305
}
293306
}
307+
IsHandlingDeferred = prevIsHandlingDeferred;
294308

295309
for (const auto& [_, actor] : Subscribers) {
296310
Send(actor, new TEvents::TEvPoisonPill());
@@ -314,6 +328,7 @@ class TGRpcRequestProxyImpl
314328
TString RootDatabase;
315329
IGRpcProxyCounters::TPtr Counters;
316330
TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> TracingControl;
331+
bool IsHandlingDeferred = false; // A crutch for proper tracing
317332
};
318333

319334
void TGRpcRequestProxyImpl::Bootstrap(const TActorContext& ctx) {
@@ -352,6 +367,7 @@ void TGRpcRequestProxyImpl::Bootstrap(const TActorContext& ctx) {
352367
}
353368

354369
void TGRpcRequestProxyImpl::ReplayEvents(const TString& databaseName, const TActorContext&) {
370+
auto prevIsHandlingDeferred = std::exchange(IsHandlingDeferred, true);
355371
auto itDeferredEvents = DeferredEvents.find(databaseName);
356372
if (itDeferredEvents != DeferredEvents.end()) {
357373
std::deque<TEventReqHolder>& queue = itDeferredEvents->second;
@@ -365,6 +381,7 @@ void TGRpcRequestProxyImpl::ReplayEvents(const TString& databaseName, const TAct
365381
DeferredEvents.erase(itDeferredEvents);
366382
}
367383
}
384+
IsHandlingDeferred = prevIsHandlingDeferred;
368385
}
369386

370387
void TGRpcRequestProxyImpl::HandleConfig(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr&) {
@@ -423,6 +440,7 @@ void TGRpcRequestProxyImpl::MaybeStartTracing(IRequestProxyCtx& ctx) {
423440
if (auto database = ctx.GetDatabaseName()) {
424441
grpcRequestProxySpan.Attribute("database", std::move(*database));
425442
}
443+
grpcRequestProxySpan.Attribute("request_type", ctx.GetRequestName());
426444
ctx.StartTracing(std::move(grpcRequestProxySpan));
427445
}
428446
}
@@ -479,17 +497,20 @@ void TGRpcRequestProxyImpl::ForgetDatabase(const TString& database) {
479497
Send(itSubscriber->second, new TEvents::TEvPoisonPill());
480498
Subscribers.erase(itSubscriber);
481499
}
500+
auto prevIsHandlingDeferred = std::exchange(IsHandlingDeferred, true);
482501
auto itDeferredEvents = DeferredEvents.find(database);
483502
if (itDeferredEvents != DeferredEvents.end()) {
484503
auto& queue(itDeferredEvents->second);
485504
while (!queue.empty()) {
486505
Counters->IncDatabaseUnavailableCounter();
487506
queue.front().Ctx->ReplyUnauthenticated("Unknown database");
507+
queue.front().Ctx->FinishSpan();
488508
queue.pop_front();
489509
}
490510
DeferredEvents.erase(itDeferredEvents);
491511
}
492512
Databases.erase(database);
513+
IsHandlingDeferred = prevIsHandlingDeferred;
493514
}
494515

495516
void TGRpcRequestProxyImpl::SubscribeToDatabase(const TString& database) {

ydb/core/grpc_services/rpc_calls.h

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ inline bool ValidateAndReplyOnError(IRequestProxyCtx* ctx) {
5050
const auto issue = MakeIssue(NKikimrIssues::TIssuesIds::YDB_API_VALIDATION_ERROR, validationError);
5151
ctx->RaiseIssue(issue);
5252
ctx->ReplyWithYdbStatus(Ydb::StatusIds::BAD_REQUEST);
53+
ctx->FinishSpan();
5354
return false;
5455
} else {
5556
return true;

ydb/library/actors/wilson/wilson_span.cpp

+14
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "wilson_span.h"
22
#include "wilson_uploader.h"
3+
#include <util/system/backtrace.h>
34
#include <google/protobuf/text_format.h>
45

56
namespace NWilson {
@@ -62,6 +63,19 @@ namespace NWilson {
6263
Data->Sent = true;
6364
}
6465

66+
TSpan& TSpan::operator=(TSpan&& other) {
67+
if (this != &other) {
68+
if (Y_UNLIKELY(*this)) {
69+
TStringStream err;
70+
err << "TSpan instance incorrectly overwritten at:\n";
71+
FormatBackTrace(&err);
72+
EndError(std::move(err.Str()));
73+
}
74+
Data = std::exchange(other.Data, nullptr);
75+
}
76+
return *this;
77+
}
78+
6579
const TSpan TSpan::Empty;
6680

6781
} // NWilson

0 commit comments

Comments
 (0)