Skip to content

Commit cd6e4b4

Browse files
authored
Support tracing for long grpc stream session requests (#12493)
1 parent db99e50 commit cd6e4b4

File tree

11 files changed

+263
-104
lines changed

11 files changed

+263
-104
lines changed

ydb/core/cms/console/jaeger_tracing_configurator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ TSettings<double, TWithTag<TThrottlingSettings>> TJaegerTracingConfigurator::Get
140140
if (fraction < 0 || fraction > 1) {
141141
ALOG_ERROR(NKikimrServices::CMS_CONFIGS, "provided fraction " << fraction
142142
<< " violated range [0; 1]. Clamping it to the range");
143-
fraction = std::min(1.0, std::max(0.0, fraction));
143+
fraction = std::clamp(fraction, 0.0, 1.0);
144144
}
145145

146146
TSamplingRule<double, TWithTag<TThrottlingSettings>> rule {

ydb/core/cms/console/jaeger_tracing_configurator_ut.cpp

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,6 @@ Y_UNIT_TEST_SUITE(TJaegerTracingConfiguratorTests) {
206206
};
207207

208208
{
209-
size_t sampled = 0;
210209
size_t traced = 0;
211210
for (size_t i = 0; i < 1000; ++i) {
212211
auto [state, level] = controls.HandleTracing(true, RandomChoice(discriminators));
@@ -215,17 +214,15 @@ Y_UNIT_TEST_SUITE(TJaegerTracingConfiguratorTests) {
215214
case TTracingControls::OFF:
216215
break;
217216
case TTracingControls::SAMPLED:
218-
UNIT_ASSERT_EQUAL(level, 5);
219-
++sampled;
217+
UNIT_ASSERT(false); // We provide external trace id, => new sampled trace must not be generated
220218
break;
221219
case TTracingControls::EXTERNAL:
222220
++traced;
223221
break;
224222
}
225-
timeProvider->Advance(TDuration::MilliSeconds(250));
223+
timeProvider->Advance(TDuration::MilliSeconds(250)); // 4 requests per second
226224
}
227-
UNIT_ASSERT_EQUAL(traced, 250);
228-
UNIT_ASSERT(sampled >= 110 && sampled <= 135);
225+
UNIT_ASSERT_C(traced >= 250 + 125 - 50 && traced <= 250 + 125 + 50, traced); // 1 of each 4 requests external traced + 1 of each 3 other requests sampled (but not greater than 0.5 of them according to throttling)
229226
}
230227
timeProvider->Advance(TDuration::Minutes(1));
231228

@@ -249,7 +246,62 @@ Y_UNIT_TEST_SUITE(TJaegerTracingConfiguratorTests) {
249246
}
250247
timeProvider->Advance(TDuration::Seconds(1));
251248
}
252-
UNIT_ASSERT(sampled >= 210 && sampled <= 300);
249+
UNIT_ASSERT_C(sampled >= 210 && sampled <= 300, sampled);
250+
}
251+
timeProvider->Advance(TDuration::Minutes(1));
252+
}
253+
254+
Y_UNIT_TEST(ExternalTracePlusSampling) {
255+
TTenantTestRuntime runtime(DefaultConsoleTestConfig());
256+
auto timeProvider = MakeIntrusive<TTimeProviderMock>(TInstant::Now());
257+
auto [controls, configurator] = CreateSamplingThrottlingConfigurator(10, timeProvider);
258+
NKikimrConfig::TTracingConfig cfg;
259+
{
260+
auto rule = cfg.AddExternalThrottling();
261+
rule->SetMaxTracesBurst(0);
262+
rule->SetMaxTracesPerMinute(60);
263+
}
264+
{
265+
auto rule = cfg.AddSampling();
266+
rule->SetFraction(0.5);
267+
rule->SetLevel(5);
268+
rule->SetMaxTracesBurst(10);
269+
rule->SetMaxTracesPerMinute(90);
270+
}
271+
InitJaegerTracingConfigurator(runtime, std::move(configurator), cfg);
272+
273+
std::array discriminators{
274+
TRequestDiscriminator{
275+
.RequestType = ERequestType::TABLE_READROWS,
276+
.Database = "/Root/test3",
277+
},
278+
TRequestDiscriminator{
279+
.RequestType = ERequestType::KEYVALUE_READ,
280+
},
281+
TRequestDiscriminator{
282+
.Database = "/Root/test2",
283+
},
284+
TRequestDiscriminator{},
285+
};
286+
287+
{
288+
size_t traced = 0;
289+
for (size_t i = 0; i < 1000; ++i) {
290+
auto [state, level] = controls.HandleTracing(true, RandomChoice(discriminators));
291+
292+
switch (state) {
293+
case TTracingControls::OFF:
294+
break;
295+
case TTracingControls::SAMPLED:
296+
UNIT_ASSERT(false); // We provide external trace id, => new sampled trace must not be generated
297+
break;
298+
case TTracingControls::EXTERNAL:
299+
++traced;
300+
break;
301+
}
302+
timeProvider->Advance(TDuration::MilliSeconds(250)); // 4 requests per second
303+
}
304+
UNIT_ASSERT_C(traced >= 250 + 375 - 75 && traced <= 250 + 375 + 75, traced); // 1 of each 4 requests external traced + 1.5 of each 3 other requests sampled
253305
}
254306
}
255307

ydb/core/grpc_services/base/base.h

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -732,10 +732,10 @@ inline TString MakeAuthError(const TString& in, NYql::TIssueManager& issues) {
732732
return out.Str();
733733
}
734734

735-
template <ui32 TRpcId, typename TReq, typename TResp, TRateLimiterMode RlMode = TRateLimiterMode::Off>
735+
template <ui32 TRpcId, typename TReq, typename TResp>
736736
class TGRpcRequestBiStreamWrapper
737737
: public IRequestProxyCtx
738-
, public TEventLocal<TGRpcRequestBiStreamWrapper<TRpcId, TReq, TResp, RlMode>, TRpcId>
738+
, public TEventLocal<TGRpcRequestBiStreamWrapper<TRpcId, TReq, TResp>, TRpcId>
739739
{
740740
private:
741741
void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override {
@@ -748,12 +748,11 @@ class TGRpcRequestBiStreamWrapper
748748
using TRequest = TReq;
749749
using TResponse = TResp;
750750
using IStreamCtx = NGRpcServer::IGRpcStreamingContext<TRequest, TResponse>;
751-
static constexpr TRateLimiterMode RateLimitMode = RlMode;
752751

753-
TGRpcRequestBiStreamWrapper(TIntrusivePtr<IStreamCtx> ctx, bool rlAllowed = true)
752+
TGRpcRequestBiStreamWrapper(TIntrusivePtr<IStreamCtx> ctx, TRequestAuxSettings auxSettings = {})
754753
: Ctx_(ctx)
755-
, RlAllowed_(rlAllowed)
756754
, TraceId(GetPeerMetaValues(NYdb::YDB_TRACE_ID_HEADER))
755+
, AuxSettings(std::move(auxSettings))
757756
{
758757
if (!TraceId) {
759758
TraceId = UlidGen.Next().ToString();
@@ -766,11 +765,29 @@ class TGRpcRequestBiStreamWrapper
766765
}
767766

768767
TRateLimiterMode GetRlMode() const override {
769-
return RlAllowed_ ? RateLimitMode : TRateLimiterMode::Off;
768+
return AuxSettings.RlMode;
770769
}
771770

772-
bool TryCustomAttributeProcess(const NKikimrScheme::TEvDescribeSchemeResult&, ICheckerIface*) override {
773-
return false;
771+
bool TryCustomAttributeProcess(const NKikimrScheme::TEvDescribeSchemeResult& schemeData,
772+
ICheckerIface* iface) override
773+
{
774+
if (!AuxSettings.CustomAttributeProcessor) {
775+
return false;
776+
} else {
777+
AuxSettings.CustomAttributeProcessor(schemeData, iface);
778+
return true;
779+
}
780+
}
781+
782+
NJaegerTracing::TRequestDiscriminator GetRequestDiscriminator() const override {
783+
return {
784+
.RequestType = AuxSettings.RequestType,
785+
.Database = GetDatabaseName(),
786+
};
787+
}
788+
789+
bool IsAuditable() const override {
790+
return (AuxSettings.AuditMode == TAuditMode::Auditable) && !this->IsInternalCall();
774791
}
775792

776793
const TMaybe<TString> GetYdbToken() const override {
@@ -933,12 +950,12 @@ class TGRpcRequestBiStreamWrapper
933950
inline static const TString EmptySerializedTokenMessage_;
934951
NYql::TIssueManager IssueManager_;
935952
TMaybe<NRpcService::TRlPath> RlPath_;
936-
bool RlAllowed_;
937953
IGRpcProxyCounters::TPtr Counters_;
938954
NWilson::TSpan Span_;
939955
bool IsTracingDecided_ = false;
940956
TULIDGenerator UlidGen;
941957
TMaybe<TString> TraceId;
958+
const TRequestAuxSettings AuxSettings;
942959
};
943960

944961
template <typename TDerived>

ydb/core/grpc_services/rpc_calls.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ using TEvListEndpointsRequest = TGRpcRequestWrapper<TRpcServices::EvListEndpoint
5050
using TEvBiStreamPingRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvBiStreamPing, Draft::Dummy::PingRequest, Draft::Dummy::PingResponse>;
5151
using TEvStreamPQWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQWrite, Ydb::PersQueue::V1::StreamingWriteClientMessage, Ydb::PersQueue::V1::StreamingWriteServerMessage>;
5252
using TEvStreamPQMigrationReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQMigrationRead, Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>;
53-
using TEvStreamTopicWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicWrite, Ydb::Topic::StreamWriteMessage::FromClient, Ydb::Topic::StreamWriteMessage::FromServer, TRateLimiterMode::RuManual>;
54-
using TEvStreamTopicReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicRead, Ydb::Topic::StreamReadMessage::FromClient, Ydb::Topic::StreamReadMessage::FromServer, TRateLimiterMode::RuManual>;
55-
using TEvStreamTopicDirectReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicDirectRead, Ydb::Topic::StreamDirectReadMessage::FromClient, Ydb::Topic::StreamDirectReadMessage::FromServer, TRateLimiterMode::RuManual>;
53+
using TEvStreamTopicWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicWrite, Ydb::Topic::StreamWriteMessage::FromClient, Ydb::Topic::StreamWriteMessage::FromServer>;
54+
using TEvStreamTopicReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicRead, Ydb::Topic::StreamReadMessage::FromClient, Ydb::Topic::StreamReadMessage::FromServer>;
55+
using TEvStreamTopicDirectReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicDirectRead, Ydb::Topic::StreamDirectReadMessage::FromClient, Ydb::Topic::StreamDirectReadMessage::FromServer>;
5656
using TEvCommitOffsetRequest = TGRpcRequestWrapper<TRpcServices::EvTopicCommitOffset, Ydb::Topic::CommitOffsetRequest, Ydb::Topic::CommitOffsetResponse, true>;
5757
using TEvPQReadInfoRequest = TGRpcRequestWrapper<TRpcServices::EvPQReadInfo, Ydb::PersQueue::V1::ReadInfoRequest, Ydb::PersQueue::V1::ReadInfoResponse, true>;
5858
//TODO: Change this to runtime dispatching!
@@ -121,8 +121,8 @@ void RefreshTokenSendRequest(const TActorContext& ctx, IEventBase* refreshTokenR
121121
void RefreshTokenReplyUnauthenticated(TActorId recipient, TActorId sender, NYql::TIssues&& issues);
122122
void RefreshTokenReplyUnavailable(TActorId recipient, NYql::TIssues&& issues);
123123

124-
template <ui32 TRpcId, typename TReq, typename TResp, TRateLimiterMode RlMode>
125-
void TGRpcRequestBiStreamWrapper<TRpcId, TReq, TResp, RlMode>::RefreshToken(const TString& token, const TActorContext& ctx, TActorId id) {
124+
template <ui32 TRpcId, typename TReq, typename TResp>
125+
void TGRpcRequestBiStreamWrapper<TRpcId, TReq, TResp>::RefreshToken(const TString& token, const TActorContext& ctx, TActorId id) {
126126
using TSelf = typename std::remove_pointer<decltype(this)>::type;
127127
using TRefreshToken = typename TRefreshTokenTypeForRequest<TSelf>::type;
128128
RefreshTokenSendRequest(ctx, new TRefreshToken(token, GetDatabaseName().GetOrElse(""), id));

ydb/core/jaeger_tracing/request_discriminator.cpp

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,98 @@ const TRequestDiscriminator TRequestDiscriminator::EMPTY {
77
.Database = NothingObject,
88
};
99

10+
extern const THashMap<TStringBuf, ERequestType> NameToRequestType = {
11+
{"KeyValue.CreateVolume", ERequestType::KEYVALUE_CREATEVOLUME},
12+
{"KeyValue.DropVolume", ERequestType::KEYVALUE_DROPVOLUME},
13+
{"KeyValue.AlterVolume", ERequestType::KEYVALUE_ALTERVOLUME},
14+
{"KeyValue.DescribeVolume", ERequestType::KEYVALUE_DESCRIBEVOLUME},
15+
{"KeyValue.ListLocalPartitions", ERequestType::KEYVALUE_LISTLOCALPARTITIONS},
16+
{"KeyValue.AcquireLock", ERequestType::KEYVALUE_ACQUIRELOCK},
17+
{"KeyValue.ExecuteTransaction", ERequestType::KEYVALUE_EXECUTETRANSACTION},
18+
{"KeyValue.Read", ERequestType::KEYVALUE_READ},
19+
{"KeyValue.ReadRange", ERequestType::KEYVALUE_READRANGE},
20+
{"KeyValue.ListRange", ERequestType::KEYVALUE_LISTRANGE},
21+
{"KeyValue.GetStorageChannelStatus", ERequestType::KEYVALUE_GETSTORAGECHANNELSTATUS},
22+
23+
{"Table.CreateSession", ERequestType::TABLE_CREATESESSION},
24+
{"Table.KeepAlive", ERequestType::TABLE_KEEPALIVE},
25+
{"Table.AlterTable", ERequestType::TABLE_ALTERTABLE},
26+
{"Table.CreateTable", ERequestType::TABLE_CREATETABLE},
27+
{"Table.DropTable", ERequestType::TABLE_DROPTABLE},
28+
{"Table.DescribeTable", ERequestType::TABLE_DESCRIBETABLE},
29+
{"Table.CopyTable", ERequestType::TABLE_COPYTABLE},
30+
{"Table.CopyTables", ERequestType::TABLE_COPYTABLES},
31+
{"Table.RenameTables", ERequestType::TABLE_RENAMETABLES},
32+
{"Table.ExplainDataQuery", ERequestType::TABLE_EXPLAINDATAQUERY},
33+
{"Table.ExecuteSchemeQuery", ERequestType::TABLE_EXECUTESCHEMEQUERY},
34+
{"Table.BeginTransaction", ERequestType::TABLE_BEGINTRANSACTION},
35+
{"Table.DescribeTableOptions", ERequestType::TABLE_DESCRIBETABLEOPTIONS},
36+
{"Table.DeleteSession", ERequestType::TABLE_DELETESESSION},
37+
{"Table.CommitTransaction", ERequestType::TABLE_COMMITTRANSACTION},
38+
{"Table.RollbackTransaction", ERequestType::TABLE_ROLLBACKTRANSACTION},
39+
{"Table.PrepareDataQuery", ERequestType::TABLE_PREPAREDATAQUERY},
40+
{"Table.ExecuteDataQuery", ERequestType::TABLE_EXECUTEDATAQUERY},
41+
{"Table.BulkUpsert", ERequestType::TABLE_BULKUPSERT},
42+
{"Table.StreamExecuteScanQuery", ERequestType::TABLE_STREAMEXECUTESCANQUERY},
43+
{"Table.StreamReadTable", ERequestType::TABLE_STREAMREADTABLE},
44+
{"Table.ReadRows", ERequestType::TABLE_READROWS},
45+
46+
{"Query.ExecuteQuery", ERequestType::QUERY_EXECUTEQUERY},
47+
{"Query.ExecuteScript", ERequestType::QUERY_EXECUTESCRIPT},
48+
{"Query.FetchScriptResults", ERequestType::QUERY_FETCHSCRIPTRESULTS},
49+
{"Query.CreateSession", ERequestType::QUERY_CREATESESSION},
50+
{"Query.DeleteSession", ERequestType::QUERY_DELETESESSION},
51+
{"Query.AttachSession", ERequestType::QUERY_ATTACHSESSION},
52+
{"Query.BeginTransaction", ERequestType::QUERY_BEGINTRANSACTION},
53+
{"Query.CommitTransaction", ERequestType::QUERY_COMMITTRANSACTION},
54+
{"Query.RollbackTransaction", ERequestType::QUERY_ROLLBACKTRANSACTION},
55+
56+
{"Discovery.WhoAmI", ERequestType::DISCOVERY_WHOAMI},
57+
{"Discovery.NodeRegistration", ERequestType::DISCOVERY_NODEREGISTRATION},
58+
{"Discovery.ListEndpoints", ERequestType::DISCOVERY_LISTENDPOINTS},
59+
60+
{"RateLimiter.CreateResource", ERequestType::RATELIMITER_CREATE_RESOURCE},
61+
{"RateLimiter.AlterResource", ERequestType::RATELIMITER_ALTER_RESOURCE},
62+
{"RateLimiter.DropResource", ERequestType::RATELIMITER_DROP_RESOURCE},
63+
{"RateLimiter.ListResources", ERequestType::RATELIMITER_LIST_RESOURCES},
64+
{"RateLimiter.DescribeResource", ERequestType::RATELIMITER_DESCRIBE_RESOURCE},
65+
{"RateLimiter.AcquireResource", ERequestType::RATELIMITER_ACQUIRE_RESOURCE},
66+
67+
{"BSConfig.ReplaceStorageConfig", ERequestType::BSCONFIG_REPLACESTORAGECONFIG},
68+
{"BSConfig.FetchStorageConfig", ERequestType::BSCONFIG_FETCHSTORAGECONFIG},
69+
{"BSConfig.Bootstrap", ERequestType::BSCONFIG_BOOTSTRAP},
70+
71+
{"Topic.StreamWrite", ERequestType::TOPIC_STREAMWRITE},
72+
{"Topic.StreamWrite.Init", ERequestType::TOPIC_STREAMWRITE_INIT},
73+
{"Topic.StreamWrite.Write", ERequestType::TOPIC_STREAMWRITE_WRITE},
74+
{"Topic.StreamWrite.UpdateToken", ERequestType::TOPIC_STREAMWRITE_UPDATE_TOKEN},
75+
{"Topic.StreamRead", ERequestType::TOPIC_STREAMREAD},
76+
{"Topic.StreamRead.Init", ERequestType::TOPIC_STREAMREAD_INIT},
77+
{"Topic.StreamRead.Read", ERequestType::TOPIC_STREAMREAD_READ},
78+
{"Topic.StreamRead.CommitOffset", ERequestType::TOPIC_STREAMREAD_COMMIT_OFFSET},
79+
{"Topic.StreamRead.PartitionSessionStatus", ERequestType::TOPIC_STREAMREAD_PARTITION_SESSION_STATUS},
80+
{"Topic.StreamRead.UpdateToken", ERequestType::TOPIC_STREAMREAD_UPDATE_TOKEN},
81+
{"Topic.StreamRead.DirectReadAck", ERequestType::TOPIC_STREAMREAD_DIRECT_READ_ACK},
82+
{"Topic.StreamRead.StartPartitionSession", ERequestType::TOPIC_STREAMREAD_START_PARTITION_SESSION},
83+
{"Topic.StreamRead.StopPartitionSession", ERequestType::TOPIC_STREAMREAD_STOP_PARTITION_SESSION},
84+
{"Topic.StreamDirectRead", ERequestType::TOPIC_STREAMDIRECTREAD},
85+
{"Topic.StreamDirectRead.Init", ERequestType::TOPIC_STREAMDIRECTREAD_INIT},
86+
{"Topic.StreamDirectRead.StartDirectReadPartitionSession", ERequestType::TOPIC_STREAMDIRECTREAD_START_DIRECT_READ_PARTITION_SESSION},
87+
{"Topic.StreamDirectRead.UpdateToken", ERequestType::TOPIC_STREAMDIRECTREAD_UPDATE_TOKEN},
88+
{"Topic.CommitOffset", ERequestType::TOPIC_COMMITOFFSET},
89+
{"Topic.UpdateOffsetsInTransaction", ERequestType::TOPIC_UPDATEOFFSETSINTRANSACTION},
90+
{"Topic.CreateTopic", ERequestType::TOPIC_CREATETOPIC},
91+
{"Topic.DescribeTopic", ERequestType::TOPIC_DESCRIBETOPIC},
92+
{"Topic.DescribePartition", ERequestType::TOPIC_DESCRIBEPARTITION},
93+
{"Topic.DescribeConsumer", ERequestType::TOPIC_DESCRIBECONSUMER},
94+
{"Topic.AlterTopic", ERequestType::TOPIC_ALTERTOPIC},
95+
{"Topic.DropTopic", ERequestType::TOPIC_DROPTOPIC},
96+
};
97+
98+
extern const THashSet<ERequestType> NoDefaultSamplingRequestTypes = {
99+
ERequestType::TOPIC_STREAMWRITE,
100+
ERequestType::TOPIC_STREAMREAD,
101+
ERequestType::TOPIC_STREAMDIRECTREAD,
102+
};
103+
10104
} // namespace NKikimr::NJaegerTracing

0 commit comments

Comments
 (0)