Skip to content

Commit 2d120ed

Browse files
authored
YQ-3809 Shared reading: fix logging (#11175)
1 parent 7cf0383 commit 2d120ed

File tree

2 files changed

+45
-26
lines changed

2 files changed

+45
-26
lines changed

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,14 @@ struct TEvPrivate {
5050
EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
5151
EvCoordinatorPing = EvBegin + 20,
5252
EvUpdateMetrics,
53+
EvPrintStateToLog,
5354
EvEnd
5455
};
5556

5657
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
5758
struct TEvCoordinatorPing : NActors::TEventLocal<TEvCoordinatorPing, EvCoordinatorPing> {};
5859
struct TEvUpdateMetrics : public NActors::TEventLocal<TEvUpdateMetrics, EvUpdateMetrics> {};
60+
struct TEvPrintStateToLog : public NActors::TEventLocal<TEvPrintStateToLog, EvPrintStateToLog> {};
5961
};
6062

6163
struct TQueryStat {
@@ -65,6 +67,7 @@ struct TQueryStat {
6567
};
6668

6769
ui64 UpdateMetricsPeriodSec = 60;
70+
ui64 PrintStateToLogPeriodSec = 300;
6871

6972
class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
7073

@@ -215,10 +218,12 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
215218
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvPing::TPtr&);
216219
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&);
217220
void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&);
218-
221+
void Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&);
222+
219223
void DeleteConsumer(const ConsumerSessionKey& key);
220224
void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession);
221225
void UpdateMetrics();
226+
void PrintInternalState();
222227

223228
STRICT_STFUNC(
224229
StateFunc, {
@@ -242,6 +247,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
242247
hFunc(NActors::TEvents::TEvPing, Handle);
243248
hFunc(NFq::TEvRowDispatcher::TEvNewDataArrived, Handle);
244249
hFunc(NFq::TEvPrivate::TEvUpdateMetrics, Handle);
250+
hFunc(NFq::TEvPrivate::TEvPrintStateToLog, Handle);
245251
})
246252
};
247253

@@ -275,6 +281,7 @@ void TRowDispatcher::Bootstrap() {
275281
Register(NewLeaderElection(SelfId(), coordinatorId, config, CredentialsProviderFactory, YqSharedResources, Tenant, Counters).release());
276282
Schedule(TDuration::Seconds(CoordinatorPingPeriodSec), new TEvPrivate::TEvCoordinatorPing());
277283
Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics());
284+
Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog());
278285
}
279286

280287
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {
@@ -339,18 +346,34 @@ void TRowDispatcher::UpdateMetrics() {
339346
return;
340347
}
341348
TMap<TString, TQueryStat> queryStats;
342-
TStringStream str;
349+
350+
for (auto& [key, sessionsInfo] : TopicSessions) {
351+
for (auto& [actorId, sessionInfo] : sessionsInfo.Sessions) {
352+
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
353+
auto& stat = queryStats[consumer->QueryId];
354+
stat.UnreadRows.Add(NYql::TCounters::TEntry(consumer->Stat.UnreadRows));
355+
stat.UnreadBytes.Add(NYql::TCounters::TEntry(consumer->Stat.UnreadBytes));
356+
}
357+
}
358+
}
359+
for (const auto& [queryId, stat] : queryStats) {
360+
auto queryGroup = Metrics.Counters->GetSubgroup("queryId", queryId);
361+
queryGroup->GetCounter("MaxUnreadRows")->Set(stat.UnreadRows.Max);
362+
queryGroup->GetCounter("AvgUnreadRows")->Set(stat.UnreadRows.Avg);
363+
queryGroup->GetCounter("MaxUnreadBytes")->Set(stat.UnreadBytes.Max);
364+
queryGroup->GetCounter("AvgUnreadBytes")->Set(stat.UnreadBytes.Avg);
365+
}
366+
}
343367

368+
void TRowDispatcher::PrintInternalState() {
369+
TStringStream str;
344370
str << "Statistics:\n";
345371
for (auto& [key, sessionsInfo] : TopicSessions) {
346372
str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicPath << " / " << key.PartitionId;
347373
for (auto& [actorId, sessionInfo] : sessionsInfo.Sessions) {
348374
str << " / " << actorId << "\n";
349375
str << " unread bytes " << sessionInfo.Stat.UnreadBytes << "\n";
350376
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
351-
auto& stat = queryStats[consumer->QueryId];
352-
stat.UnreadRows.Add(NYql::TCounters::TEntry(consumer->Stat.UnreadRows));
353-
stat.UnreadBytes.Add(NYql::TCounters::TEntry(consumer->Stat.UnreadBytes));
354377
str << " " << consumer->QueryId << " " << readActorId << " unread rows "
355378
<< consumer->Stat.UnreadRows << " unread bytes " << consumer->Stat.UnreadBytes << " offset " << consumer->Stat.Offset
356379
<< " get " << consumer->Counters.GetNextBatch
@@ -361,15 +384,6 @@ void TRowDispatcher::UpdateMetrics() {
361384
}
362385
}
363386
LOG_ROW_DISPATCHER_DEBUG(str.Str());
364-
365-
for (const auto& [queryId, stat] : queryStats) {
366-
LOG_ROW_DISPATCHER_DEBUG("UnreadBytes " << queryId << " " << stat.UnreadBytes.Max);
367-
auto queryGroup = Metrics.Counters->GetSubgroup("queryId", queryId);
368-
queryGroup->GetCounter("MaxUnreadRows")->Set(stat.UnreadRows.Max);
369-
queryGroup->GetCounter("AvgUnreadRows")->Set(stat.UnreadRows.Avg);
370-
queryGroup->GetCounter("MaxUnreadBytes")->Set(stat.UnreadBytes.Max);
371-
queryGroup->GetCounter("AvgUnreadBytes")->Set(stat.UnreadBytes.Avg);
372-
}
373387
}
374388

375389
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
@@ -609,6 +623,11 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&) {
609623
UpdateMetrics();
610624
}
611625

626+
void TRowDispatcher::Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&) {
627+
PrintInternalState();
628+
Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog());
629+
}
630+
612631
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev) {
613632
LOG_ROW_DISPATCHER_TRACE("TEvSessionStatistic from " << ev->Sender);
614633
const auto& key = ev->Get()->Stat.SessionKey;

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
192192
void DoFiltering(const TVector<ui64>& offsets, const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& parsedValues);
193193
void SendData(TClientsInfo& info);
194194
void UpdateParser();
195-
void FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter = nullptr);
195+
void FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter, bool addParserDescription);
196196
void SendDataArrived(TClientsInfo& client);
197197
void StopReadSession();
198198
TString GetSessionId() const;
@@ -494,7 +494,7 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TSessionClose
494494
LOG_ROW_DISPATCHER_DEBUG(message);
495495
NYql::TIssues issues;
496496
issues.AddIssue(message);
497-
Self.FatalError(issues.ToOneLineString());
497+
Self.FatalError(issues.ToOneLineString(), nullptr, false);
498498
}
499499

500500
void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent& event) {
@@ -580,7 +580,7 @@ void TTopicSession::DoParsing(bool force) {
580580
const auto& parsedValues = Parser->Parse();
581581
DoFiltering(Parser->GetOffsets(), parsedValues);
582582
} catch (const std::exception& e) {
583-
FatalError(e.what());
583+
FatalError(e.what(), nullptr, true);
584584
}
585585
}
586586

@@ -594,7 +594,7 @@ void TTopicSession::DoFiltering(const TVector<ui64>& offsets, const TVector<NKik
594594
info.Filter->Push(offsets, RebuildJson(info, parsedValues));
595595
}
596596
} catch (const std::exception& e) {
597-
FatalError(e.what(), &info.Filter);
597+
FatalError(e.what(), &info.Filter, false);
598598
}
599599
}
600600

@@ -664,7 +664,7 @@ bool HasJsonColumns(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) {
664664
void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
665665
auto it = Clients.find(ev->Sender);
666666
if (it != Clients.end()) {
667-
FatalError("Internal error: sender " + ev->Sender.ToString());
667+
FatalError("Internal error: sender " + ev->Sender.ToString(), nullptr, false);
668668
return;
669669
}
670670

@@ -712,11 +712,11 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
712712
}
713713
}
714714
} catch (const NYql::NPureCalc::TCompileError& e) {
715-
FatalError("Adding new client failed: CompileError: sql: " + e.GetYql() + ", error: " + e.GetIssues());
715+
FatalError("Adding new client failed: CompileError: sql: " + e.GetYql() + ", error: " + e.GetIssues(), nullptr, true);
716716
} catch (const yexception &ex) {
717-
FatalError(TString{"Adding new client failed: "} + ex.what());
717+
FatalError(TString{"Adding new client failed: "} + ex.what(), nullptr, true);
718718
} catch (...) {
719-
FatalError("Adding new client failed, " + CurrentExceptionMessage());
719+
FatalError("Adding new client failed, " + CurrentExceptionMessage(), nullptr, true);
720720
}
721721
UpdateParser();
722722
SendStatistic();
@@ -810,14 +810,14 @@ void TTopicSession::UpdateParser() {
810810
const auto& parserConfig = Config.GetJsonParser();
811811
Parser = NewJsonParser(names, types, parserConfig.GetBatchSizeBytes(), TDuration::MilliSeconds(parserConfig.GetBatchCreationTimeoutMs()));
812812
} catch (const NYql::NPureCalc::TCompileError& e) {
813-
FatalError(e.GetIssues());
813+
FatalError(e.GetIssues(), nullptr, true);
814814
}
815815
}
816816

817-
void TTopicSession::FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter) {
817+
void TTopicSession::FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter, bool addParserDescription) {
818818
TStringStream str;
819819
str << message;
820-
if (Parser) {
820+
if (Parser && addParserDescription) {
821821
str << ", parser description:\n" << Parser->GetDescription();
822822
}
823823
if (filter) {
@@ -868,7 +868,7 @@ void TTopicSession::HandleException(const std::exception& e) {
868868
if (CurrentStateFunc() == &TThis::ErrorState) {
869869
return;
870870
}
871-
FatalError(TString("Internal error: exception: ") + e.what());
871+
FatalError(TString("Internal error: exception: ") + e.what(), nullptr, false);
872872
}
873873

874874
void TTopicSession::SendStatistic() {

0 commit comments

Comments
 (0)