Skip to content

Commit 6440f7c

Browse files
authored
Merge e3b70df into 36d42e5
2 parents 36d42e5 + e3b70df commit 6440f7c

File tree

8 files changed

+57
-21
lines changed

8 files changed

+57
-21
lines changed

ydb/core/driver_lib/run/run.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -973,13 +973,13 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
973973
sslData.DoRequestClientCertificate = appConfig.GetClientCertificateAuthorization().GetRequestClientCertificate();
974974
sslOpts.SetSslData(sslData);
975975

976-
GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(sslOpts) });
976+
GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(sslOpts, Counters) });
977977

978978
fillFn(grpcConfig, *GRpcServers.back().second, sslOpts);
979979
}
980980

981981
if (grpcConfig.GetPort()) {
982-
GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(opts) });
982+
GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(opts, Counters) });
983983

984984
fillFn(grpcConfig, *GRpcServers.back().second, opts);
985985
}
@@ -996,7 +996,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
996996
xopts.SetEndpointId(ex.GetEndpointId());
997997
}
998998

999-
GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts) });
999+
GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts, Counters) });
10001000
fillFn(ex, *GRpcServers.back().second, xopts);
10011001
}
10021002

@@ -1035,7 +1035,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
10351035
Y_ABORT_UNLESS(xopts.SslData->Cert, "Cert not set");
10361036
Y_ABORT_UNLESS(xopts.SslData->Key, "Key not set");
10371037

1038-
GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(xopts) });
1038+
GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(xopts, Counters) });
10391039
fillFn(ex, *GRpcServers.back().second, xopts);
10401040
}
10411041
}

ydb/core/grpc_streaming/grpc_streaming_ut.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,13 @@ class TGRpcTestServer {
9595

9696
Server->GetRuntime()->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG);
9797

98+
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(MakeIntrusive<::NMonitoring::TDynamicCounters>());
99+
98100
NYdbGrpc::TServerOptions options;
99101
options.SetPort(grpc);
100-
GRpcServer.Reset(new NYdbGrpc::TGRpcServer(options));
102+
GRpcServer.Reset(new NYdbGrpc::TGRpcServer(options, counters));
101103

102104
auto* as = Server->GetRuntime()->GetAnyNodeActorSystem();
103-
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(MakeIntrusive<::NMonitoring::TDynamicCounters>());
104105

105106
GRpcServer->AddService(new TStreamingService<TImplActor>(as, counters));
106107
GRpcServer->Start();

ydb/core/http_proxy/ut/datastreams_fixture.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -888,7 +888,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
888888
actorId = as->Register(NKikimr::NHttpProxy::CreateHttpProxy(httpProxyConfig));
889889
as->RegisterLocalService(MakeHttpProxyID(), actorId);
890890

891-
GRpcServer = MakeHolder<NYdbGrpc::TGRpcServer>(opts);
891+
GRpcServer = MakeHolder<NYdbGrpc::TGRpcServer>(opts, Counters);
892892
GRpcServer->AddService(new NKikimr::NHttpProxy::TGRpcDiscoveryService(as, credentialsProvider, Counters));
893893
GRpcServer->Start();
894894

ydb/core/testlib/test_client.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,10 @@ namespace Tests {
346346
}
347347

348348
void TServer::EnableGRpc(const NYdbGrpc::TServerOptions& options, ui32 grpcServiceNodeId) {
349-
GRpcServer.reset(new NYdbGrpc::TGRpcServer(options));
349+
GRpcServerRootCounters = MakeIntrusive<::NMonitoring::TDynamicCounters>();
350+
auto& counters = GRpcServerRootCounters;
351+
352+
GRpcServer.reset(new NYdbGrpc::TGRpcServer(options, counters));
350353
auto grpcService = new NGRpcProxy::TGRpcService();
351354

352355
auto system(Runtime->GetActorSystem(grpcServiceNodeId));
@@ -378,9 +381,6 @@ namespace Tests {
378381
auto grpcMon = system->Register(NGRpcService::CreateGrpcMonService(), TMailboxType::ReadAsFilled, appData.UserPoolId);
379382
system->RegisterLocalService(NGRpcService::GrpcMonServiceId(), grpcMon);
380383

381-
GRpcServerRootCounters = MakeIntrusive<::NMonitoring::TDynamicCounters>();
382-
auto& counters = GRpcServerRootCounters;
383-
384384
// Setup discovery for typically used services on the node
385385
{
386386
TIntrusivePtr<NGRpcService::TGrpcEndpointDescription> desc = new NGRpcService::TGrpcEndpointDescription();

ydb/library/grpc/server/grpc_server.cpp

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
#include "grpc_server.h"
22

3+
#include <library/cpp/monlib/dynamic_counters/counters.h>
4+
#include <library/cpp/time_provider/monotonic.h>
5+
36
#include <util/string/join.h>
47
#include <util/generic/yexception.h>
58
#include <util/system/thread.h>
@@ -18,19 +21,31 @@
1821

1922
namespace NYdbGrpc {
2023

21-
using NThreading::TFuture;
22-
23-
static void PullEvents(grpc::ServerCompletionQueue* cq) {
24+
static void PullEvents(grpc::ServerCompletionQueue* cq, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) {
2425
TThread::SetCurrentThreadName("grpc_server");
26+
auto okCounter = counters->GetCounter("RequestExecuted", true);
27+
auto errorCounter = counters->GetCounter("RequestDestroyed", true);
28+
auto cpuTime = counters->GetCounter("ThreadCPU", true);
29+
30+
NMonotonic::TMonotonic lastCpuTimeTs = {};
2531
while (true) {
2632
void* tag; // uniquely identifies a request.
2733
bool ok;
2834

35+
auto now = NMonotonic::TMonotonic::Now();
36+
if (now - lastCpuTimeTs >= TDuration::Seconds(1)) {
37+
lastCpuTimeTs = now;
38+
*cpuTime = ThreadCPUTime();
39+
}
40+
2941
if (cq->Next(&tag, &ok)) {
3042
IQueueEvent* const ev(static_cast<IQueueEvent*>(tag));
3143

32-
if (!ev->Execute(ok)) {
44+
if (ev->Execute(ok)) {
45+
okCounter->Inc();
46+
} else {
3347
ev->DestroyRequest();
48+
errorCounter->Inc();
3449
}
3550
} else {
3651
break;
@@ -103,10 +118,17 @@ void TGrpcServiceProtectiable::DecRequest() {
103118
}
104119
}
105120

106-
TGRpcServer::TGRpcServer(const TServerOptions& opts)
121+
TGRpcServer::TGRpcServer(const TServerOptions& opts, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters)
107122
: Options_(opts)
123+
, Counters_(std::move(counters))
108124
, Limiter_(Options_.MaxGlobalRequestInFlight)
109-
{}
125+
{
126+
// TODO: remove after migration
127+
if (!Counters_) {
128+
// make a temp stub
129+
Counters_.Reset(new ::NMonitoring::TDynamicCounters());
130+
}
131+
}
110132

111133
TGRpcServer::~TGRpcServer() {
112134
Y_ABORT_UNLESS(Ts.empty());
@@ -237,10 +259,12 @@ void TGRpcServer::Start() {
237259
}
238260

239261
Ts.reserve(Options_.WorkerThreads);
262+
auto grpcCounters = Counters_->GetSubgroup("counters", "grpc");
240263
for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
241264
auto* cq = &CQS_[i % CQS_.size()];
242-
Ts.push_back(SystemThreadFactory()->Run([cq] {
243-
PullEvents(cq->get());
265+
auto workerCounters = grpcCounters->GetSubgroup("worker", ToString(i));
266+
Ts.push_back(SystemThreadFactory()->Run([cq, workerCounters] {
267+
PullEvents(cq->get(), std::move(workerCounters));
244268
}));
245269
}
246270

ydb/library/grpc/server/grpc_server.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818

1919
#include <grpcpp/grpcpp.h>
2020

21+
namespace NMonitoring {
22+
struct TDynamicCounters;
23+
} // NMonitoring
24+
2125
namespace NYdbGrpc {
2226

2327
struct TSslData {
@@ -349,8 +353,11 @@ class TGrpcServiceBase: public TGrpcServiceProtectiable {
349353
class TGRpcServer {
350354
public:
351355
using IGRpcServicePtr = TIntrusivePtr<IGRpcService>;
352-
TGRpcServer(const TServerOptions& opts);
356+
357+
// TODO: remove default nullptr after migration
358+
TGRpcServer(const TServerOptions& opts, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = nullptr);
353359
~TGRpcServer();
360+
354361
void AddService(IGRpcServicePtr service);
355362
void Start();
356363
// Send stop to registred services and call Shutdown on grpc server
@@ -365,6 +372,7 @@ class TGRpcServer {
365372
using IThreadRef = TAutoPtr<IThreadFactory::IThread>;
366373

367374
const TServerOptions Options_;
375+
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
368376
std::unique_ptr<grpc::Server> Server_;
369377
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_;
370378
TVector<IThreadRef> Ts;

ydb/library/yql/providers/dq/service/service_node.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
#include <ydb/library/grpc/server/actors/logger.h>
1010

11+
#include <library/cpp/monlib/dynamic_counters/counters.h>
12+
1113
#include <utility>
1214

1315
namespace NYql {
@@ -149,7 +151,7 @@ namespace NYql {
149151
})
150152
.SetLogger(CreateActorSystemLogger(*ActorSystem, 413)); // 413 - NKikimrServices::GRPC_SERVER
151153

152-
Server = MakeHolder<TGRpcServer>(options);
154+
Server = MakeHolder<TGRpcServer>(options, MakeIntrusive<::NMonitoring::TDynamicCounters>());
153155
Service = TIntrusivePtr<IGRpcService>(new TDqsGrpcService(*ActorSystem, MetricsRegistry->GetSensors(), dqTaskPreprocessorFactories));
154156
Server->AddService(Service);
155157
Server->Start();

ydb/library/yql/providers/dq/service/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ PEERDIR(
1414
library/cpp/build_info
1515
ydb/library/grpc/server
1616
ydb/library/grpc/server/actors
17+
library/cpp/monlib/dynamic_counters
1718
library/cpp/svnversion
1819
library/cpp/threading/future
1920
yql/essentials/sql

0 commit comments

Comments
 (0)