Skip to content

Commit 1dd2b99

Browse files
authored
Add GRPC thread CPU time metric (#11772)
1 parent 4d17f1a commit 1dd2b99

File tree

5 files changed

+49
-17
lines changed

5 files changed

+49
-17
lines changed

ydb/core/driver_lib/run/run.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -973,13 +973,13 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
973973
sslData.DoRequestClientCertificate = appConfig.GetClientCertificateAuthorization().GetRequestClientCertificate();
974974
sslOpts.SetSslData(sslData);
975975

976-
GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(sslOpts) });
976+
GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(sslOpts, Counters) });
977977

978978
fillFn(grpcConfig, *GRpcServers.back().second, sslOpts);
979979
}
980980

981981
if (grpcConfig.GetPort()) {
982-
GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(opts) });
982+
GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(opts, Counters) });
983983

984984
fillFn(grpcConfig, *GRpcServers.back().second, opts);
985985
}
@@ -996,7 +996,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
996996
xopts.SetEndpointId(ex.GetEndpointId());
997997
}
998998

999-
GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts) });
999+
GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts, Counters) });
10001000
fillFn(ex, *GRpcServers.back().second, xopts);
10011001
}
10021002

@@ -1035,7 +1035,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
10351035
Y_ABORT_UNLESS(xopts.SslData->Cert, "Cert not set");
10361036
Y_ABORT_UNLESS(xopts.SslData->Key, "Key not set");
10371037

1038-
GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(xopts) });
1038+
GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(xopts, Counters) });
10391039
fillFn(ex, *GRpcServers.back().second, xopts);
10401040
}
10411041
}

ydb/core/grpc_streaming/grpc_streaming_ut.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,13 @@ class TGRpcTestServer {
9595

9696
Server->GetRuntime()->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG);
9797

98+
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(MakeIntrusive<::NMonitoring::TDynamicCounters>());
99+
98100
NYdbGrpc::TServerOptions options;
99101
options.SetPort(grpc);
100102
GRpcServer.Reset(new NYdbGrpc::TGRpcServer(options));
101103

102104
auto* as = Server->GetRuntime()->GetAnyNodeActorSystem();
103-
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(MakeIntrusive<::NMonitoring::TDynamicCounters>());
104105

105106
GRpcServer->AddService(new TStreamingService<TImplActor>(as, counters));
106107
GRpcServer->Start();

ydb/core/testlib/test_client.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,9 @@ namespace Tests {
347347
}
348348

349349
void TServer::EnableGRpc(const NYdbGrpc::TServerOptions& options, ui32 grpcServiceNodeId) {
350+
GRpcServerRootCounters = MakeIntrusive<::NMonitoring::TDynamicCounters>();
351+
auto& counters = GRpcServerRootCounters;
352+
350353
GRpcServer.reset(new NYdbGrpc::TGRpcServer(options));
351354
auto grpcService = new NGRpcProxy::TGRpcService();
352355

@@ -379,9 +382,6 @@ namespace Tests {
379382
auto grpcMon = system->Register(NGRpcService::CreateGrpcMonService(), TMailboxType::ReadAsFilled, appData.UserPoolId);
380383
system->RegisterLocalService(NGRpcService::GrpcMonServiceId(), grpcMon);
381384

382-
GRpcServerRootCounters = MakeIntrusive<::NMonitoring::TDynamicCounters>();
383-
auto& counters = GRpcServerRootCounters;
384-
385385
// Setup discovery for typically used services on the node
386386
{
387387
TIntrusivePtr<NGRpcService::TGrpcEndpointDescription> desc = new NGRpcService::TGrpcEndpointDescription();

ydb/library/grpc/server/grpc_server.cpp

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
#include "grpc_server.h"
22

3+
#include <library/cpp/monlib/dynamic_counters/counters.h>
4+
#include <library/cpp/time_provider/monotonic.h>
5+
36
#include <util/string/join.h>
47
#include <util/generic/yexception.h>
58
#include <util/system/thread.h>
@@ -18,19 +21,31 @@
1821

1922
namespace NYdbGrpc {
2023

21-
using NThreading::TFuture;
22-
23-
static void PullEvents(grpc::ServerCompletionQueue* cq) {
24+
static void PullEvents(grpc::ServerCompletionQueue* cq, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) {
2425
TThread::SetCurrentThreadName("grpc_server");
26+
auto okCounter = counters->GetCounter("RequestExecuted", true);
27+
auto errorCounter = counters->GetCounter("RequestDestroyed", true);
28+
auto cpuTime = counters->GetCounter("ThreadCPU", true);
29+
30+
NMonotonic::TMonotonic lastCpuTimeTs = {};
2531
while (true) {
2632
void* tag; // uniquely identifies a request.
2733
bool ok;
2834

35+
auto now = NMonotonic::TMonotonic::Now();
36+
if (now - lastCpuTimeTs >= TDuration::Seconds(1)) {
37+
lastCpuTimeTs = now;
38+
*cpuTime = ThreadCPUTime();
39+
}
40+
2941
if (cq->Next(&tag, &ok)) {
3042
IQueueEvent* const ev(static_cast<IQueueEvent*>(tag));
3143

32-
if (!ev->Execute(ok)) {
44+
if (ev->Execute(ok)) {
45+
okCounter->Inc();
46+
} else {
3347
ev->DestroyRequest();
48+
errorCounter->Inc();
3449
}
3550
} else {
3651
break;
@@ -103,10 +118,16 @@ void TGrpcServiceProtectiable::DecRequest() {
103118
}
104119
}
105120

106-
TGRpcServer::TGRpcServer(const TServerOptions& opts)
121+
TGRpcServer::TGRpcServer(const TServerOptions& opts, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters)
107122
: Options_(opts)
123+
, Counters_(std::move(counters))
108124
, Limiter_(Options_.MaxGlobalRequestInFlight)
109-
{}
125+
{
126+
if (!Counters_) {
127+
// make a stub to simplify code
128+
Counters_.Reset(new ::NMonitoring::TDynamicCounters());
129+
}
130+
}
110131

111132
TGRpcServer::~TGRpcServer() {
112133
Y_ABORT_UNLESS(Ts.empty());
@@ -237,10 +258,12 @@ void TGRpcServer::Start() {
237258
}
238259

239260
Ts.reserve(Options_.WorkerThreads);
261+
auto grpcCounters = Counters_->GetSubgroup("counters", "grpc");
240262
for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
241263
auto* cq = &CQS_[i % CQS_.size()];
242-
Ts.push_back(SystemThreadFactory()->Run([cq] {
243-
PullEvents(cq->get());
264+
auto workerCounters = grpcCounters->GetSubgroup("worker", ToString(i));
265+
Ts.push_back(SystemThreadFactory()->Run([cq, workerCounters] {
266+
PullEvents(cq->get(), std::move(workerCounters));
244267
}));
245268
}
246269

ydb/library/grpc/server/grpc_server.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818

1919
#include <grpcpp/grpcpp.h>
2020

21+
namespace NMonitoring {
22+
struct TDynamicCounters;
23+
} // NMonitoring
24+
2125
namespace NYdbGrpc {
2226

2327
struct TSslData {
@@ -349,8 +353,11 @@ class TGrpcServiceBase: public TGrpcServiceProtectiable {
349353
class TGRpcServer {
350354
public:
351355
using IGRpcServicePtr = TIntrusivePtr<IGRpcService>;
352-
TGRpcServer(const TServerOptions& opts);
356+
357+
// TODO: remove default nullptr after migration
358+
TGRpcServer(const TServerOptions& opts, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = nullptr);
353359
~TGRpcServer();
360+
354361
void AddService(IGRpcServicePtr service);
355362
void Start();
356363
// Send stop to registred services and call Shutdown on grpc server
@@ -365,6 +372,7 @@ class TGRpcServer {
365372
using IThreadRef = TAutoPtr<IThreadFactory::IThread>;
366373

367374
const TServerOptions Options_;
375+
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
368376
std::unique_ptr<grpc::Server> Server_;
369377
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_;
370378
TVector<IThreadRef> Ts;

0 commit comments

Comments
 (0)