Skip to content

Commit 9460c14

Browse files
authored
Merge 34c1e68 into 846a92b
2 parents 846a92b + 34c1e68 commit 9460c14

File tree

2 files changed

+35
-26
lines changed

2 files changed

+35
-26
lines changed

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ struct TQueryStat {
6464
NYql::TCounters::TEntry UnreadBytes;
6565
};
6666

67-
ui64 UpdateMetricsPeriodSec = 60;
67+
ui64 UpdateMetricsPeriodSec = 300;
6868

6969
class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
7070

@@ -219,6 +219,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
219219
void DeleteConsumer(const ConsumerSessionKey& key);
220220
void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession);
221221
void UpdateMetrics();
222+
void PrintInternalState();
222223

223224
STRICT_STFUNC(
224225
StateFunc, {
@@ -339,18 +340,34 @@ void TRowDispatcher::UpdateMetrics() {
339340
return;
340341
}
341342
TMap<TString, TQueryStat> queryStats;
342-
TStringStream str;
343+
344+
for (auto& [key, sessionsInfo] : TopicSessions) {
345+
for (auto& [actorId, sessionInfo] : sessionsInfo.Sessions) {
346+
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
347+
auto& stat = queryStats[consumer->QueryId];
348+
stat.UnreadRows.Add(NYql::TCounters::TEntry(consumer->Stat.UnreadRows));
349+
stat.UnreadBytes.Add(NYql::TCounters::TEntry(consumer->Stat.UnreadBytes));
350+
}
351+
}
352+
}
353+
for (const auto& [queryId, stat] : queryStats) {
354+
auto queryGroup = Metrics.Counters->GetSubgroup("queryId", queryId);
355+
queryGroup->GetCounter("MaxUnreadRows")->Set(stat.UnreadRows.Max);
356+
queryGroup->GetCounter("AvgUnreadRows")->Set(stat.UnreadRows.Avg);
357+
queryGroup->GetCounter("MaxUnreadBytes")->Set(stat.UnreadBytes.Max);
358+
queryGroup->GetCounter("AvgUnreadBytes")->Set(stat.UnreadBytes.Avg);
359+
}
360+
}
343361

362+
void TRowDispatcher::PrintInternalState() {
363+
TStringStream str;
344364
str << "Statistics:\n";
345365
for (auto& [key, sessionsInfo] : TopicSessions) {
346366
str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicPath << " / " << key.PartitionId;
347367
for (auto& [actorId, sessionInfo] : sessionsInfo.Sessions) {
348368
str << " / " << actorId << "\n";
349369
str << " unread bytes " << sessionInfo.Stat.UnreadBytes << "\n";
350370
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
351-
auto& stat = queryStats[consumer->QueryId];
352-
stat.UnreadRows.Add(NYql::TCounters::TEntry(consumer->Stat.UnreadRows));
353-
stat.UnreadBytes.Add(NYql::TCounters::TEntry(consumer->Stat.UnreadBytes));
354371
str << " " << consumer->QueryId << " " << readActorId << " unread rows "
355372
<< consumer->Stat.UnreadRows << " unread bytes " << consumer->Stat.UnreadBytes << " offset " << consumer->Stat.Offset
356373
<< " get " << consumer->Counters.GetNextBatch
@@ -361,15 +378,6 @@ void TRowDispatcher::UpdateMetrics() {
361378
}
362379
}
363380
LOG_ROW_DISPATCHER_DEBUG(str.Str());
364-
365-
for (const auto& [queryId, stat] : queryStats) {
366-
LOG_ROW_DISPATCHER_DEBUG("UnreadBytes " << queryId << " " << stat.UnreadBytes.Max);
367-
auto queryGroup = Metrics.Counters->GetSubgroup("queryId", queryId);
368-
queryGroup->GetCounter("MaxUnreadRows")->Set(stat.UnreadRows.Max);
369-
queryGroup->GetCounter("AvgUnreadRows")->Set(stat.UnreadRows.Avg);
370-
queryGroup->GetCounter("MaxUnreadBytes")->Set(stat.UnreadBytes.Max);
371-
queryGroup->GetCounter("AvgUnreadBytes")->Set(stat.UnreadBytes.Avg);
372-
}
373381
}
374382

375383
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
@@ -607,6 +615,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStatus::TPtr& ev) {
607615
void TRowDispatcher::Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&) {
608616
Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics());
609617
UpdateMetrics();
618+
PrintInternalState();
610619
}
611620

612621
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev) {

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
192192
void DoFiltering(const TVector<ui64>& offsets, const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& parsedValues);
193193
void SendData(TClientsInfo& info);
194194
void UpdateParser();
195-
void FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter = nullptr);
195+
void FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter, bool addParserDescription);
196196
void SendDataArrived(TClientsInfo& client);
197197
void StopReadSession();
198198
TString GetSessionId() const;
@@ -494,7 +494,7 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TSessionClose
494494
LOG_ROW_DISPATCHER_DEBUG(message);
495495
NYql::TIssues issues;
496496
issues.AddIssue(message);
497-
Self.FatalError(issues.ToOneLineString());
497+
Self.FatalError(issues.ToOneLineString(), nullptr, false);
498498
}
499499

500500
void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent& event) {
@@ -580,7 +580,7 @@ void TTopicSession::DoParsing(bool force) {
580580
const auto& parsedValues = Parser->Parse();
581581
DoFiltering(Parser->GetOffsets(), parsedValues);
582582
} catch (const std::exception& e) {
583-
FatalError(e.what());
583+
FatalError(e.what(), nullptr, true);
584584
}
585585
}
586586

@@ -594,7 +594,7 @@ void TTopicSession::DoFiltering(const TVector<ui64>& offsets, const TVector<NKik
594594
info.Filter->Push(offsets, RebuildJson(info, parsedValues));
595595
}
596596
} catch (const std::exception& e) {
597-
FatalError(e.what(), &info.Filter);
597+
FatalError(e.what(), &info.Filter, false);
598598
}
599599
}
600600

@@ -664,7 +664,7 @@ bool HasJsonColumns(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) {
664664
void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
665665
auto it = Clients.find(ev->Sender);
666666
if (it != Clients.end()) {
667-
FatalError("Internal error: sender " + ev->Sender.ToString());
667+
FatalError("Internal error: sender " + ev->Sender.ToString(), nullptr, false);
668668
return;
669669
}
670670

@@ -712,11 +712,11 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
712712
}
713713
}
714714
} catch (const NYql::NPureCalc::TCompileError& e) {
715-
FatalError("Adding new client failed: CompileError: sql: " + e.GetYql() + ", error: " + e.GetIssues());
715+
FatalError("Adding new client failed: CompileError: sql: " + e.GetYql() + ", error: " + e.GetIssues(), nullptr, true);
716716
} catch (const yexception &ex) {
717-
FatalError(TString{"Adding new client failed: "} + ex.what());
717+
FatalError(TString{"Adding new client failed: "} + ex.what(), nullptr, true);
718718
} catch (...) {
719-
FatalError("Adding new client failed, " + CurrentExceptionMessage());
719+
FatalError("Adding new client failed, " + CurrentExceptionMessage(), nullptr, true);
720720
}
721721
UpdateParser();
722722
SendStatistic();
@@ -810,14 +810,14 @@ void TTopicSession::UpdateParser() {
810810
const auto& parserConfig = Config.GetJsonParser();
811811
Parser = NewJsonParser(names, types, parserConfig.GetBatchSizeBytes(), TDuration::MilliSeconds(parserConfig.GetBatchCreationTimeoutMs()));
812812
} catch (const NYql::NPureCalc::TCompileError& e) {
813-
FatalError(e.GetIssues());
813+
FatalError(e.GetIssues(), nullptr, true);
814814
}
815815
}
816816

817-
void TTopicSession::FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter) {
817+
void TTopicSession::FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter, bool addParserDescription) {
818818
TStringStream str;
819819
str << message;
820-
if (Parser) {
820+
if (Parser && addParserDescription) {
821821
str << ", parser description:\n" << Parser->GetDescription();
822822
}
823823
if (filter) {
@@ -868,7 +868,7 @@ void TTopicSession::HandleException(const std::exception& e) {
868868
if (CurrentStateFunc() == &TThis::ErrorState) {
869869
return;
870870
}
871-
FatalError(TString("Internal error: exception: ") + e.what());
871+
FatalError(TString("Internal error: exception: ") + e.what(), nullptr, false);
872872
}
873873

874874
void TTopicSession::SendStatistic() {

0 commit comments

Comments
 (0)