Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 33 additions & 14 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ struct TEvPrivate {
EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
EvCoordinatorPing = EvBegin + 20,
EvUpdateMetrics,
EvPrintStateToLog,
EvEnd
};

static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
struct TEvCoordinatorPing : NActors::TEventLocal<TEvCoordinatorPing, EvCoordinatorPing> {};
struct TEvUpdateMetrics : public NActors::TEventLocal<TEvUpdateMetrics, EvUpdateMetrics> {};
struct TEvPrintStateToLog : public NActors::TEventLocal<TEvPrintStateToLog, EvPrintStateToLog> {};
};

struct TQueryStat {
Expand All @@ -65,6 +67,7 @@ struct TQueryStat {
};

ui64 UpdateMetricsPeriodSec = 60;
ui64 PrintStateToLogPeriodSec = 300;

class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {

Expand Down Expand Up @@ -215,10 +218,12 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvPing::TPtr&);
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&);
void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&);

void Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&);

void DeleteConsumer(const ConsumerSessionKey& key);
void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession);
void UpdateMetrics();
void PrintInternalState();

STRICT_STFUNC(
StateFunc, {
Expand All @@ -242,6 +247,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
hFunc(NActors::TEvents::TEvPing, Handle);
hFunc(NFq::TEvRowDispatcher::TEvNewDataArrived, Handle);
hFunc(NFq::TEvPrivate::TEvUpdateMetrics, Handle);
hFunc(NFq::TEvPrivate::TEvPrintStateToLog, Handle);
})
};

Expand Down Expand Up @@ -275,6 +281,7 @@ void TRowDispatcher::Bootstrap() {
Register(NewLeaderElection(SelfId(), coordinatorId, config, CredentialsProviderFactory, YqSharedResources, Tenant, Counters).release());
Schedule(TDuration::Seconds(CoordinatorPingPeriodSec), new TEvPrivate::TEvCoordinatorPing());
Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics());
Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog());
}

void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {
Expand Down Expand Up @@ -339,18 +346,34 @@ void TRowDispatcher::UpdateMetrics() {
return;
}
TMap<TString, TQueryStat> queryStats;
TStringStream str;

for (auto& [key, sessionsInfo] : TopicSessions) {
for (auto& [actorId, sessionInfo] : sessionsInfo.Sessions) {
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
auto& stat = queryStats[consumer->QueryId];
stat.UnreadRows.Add(NYql::TCounters::TEntry(consumer->Stat.UnreadRows));
stat.UnreadBytes.Add(NYql::TCounters::TEntry(consumer->Stat.UnreadBytes));
}
}
}
for (const auto& [queryId, stat] : queryStats) {
auto queryGroup = Metrics.Counters->GetSubgroup("queryId", queryId);
queryGroup->GetCounter("MaxUnreadRows")->Set(stat.UnreadRows.Max);
queryGroup->GetCounter("AvgUnreadRows")->Set(stat.UnreadRows.Avg);
queryGroup->GetCounter("MaxUnreadBytes")->Set(stat.UnreadBytes.Max);
queryGroup->GetCounter("AvgUnreadBytes")->Set(stat.UnreadBytes.Avg);
}
}

void TRowDispatcher::PrintInternalState() {
TStringStream str;
str << "Statistics:\n";
for (auto& [key, sessionsInfo] : TopicSessions) {
str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicPath << " / " << key.PartitionId;
for (auto& [actorId, sessionInfo] : sessionsInfo.Sessions) {
str << " / " << actorId << "\n";
str << " unread bytes " << sessionInfo.Stat.UnreadBytes << "\n";
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
auto& stat = queryStats[consumer->QueryId];
stat.UnreadRows.Add(NYql::TCounters::TEntry(consumer->Stat.UnreadRows));
stat.UnreadBytes.Add(NYql::TCounters::TEntry(consumer->Stat.UnreadBytes));
str << " " << consumer->QueryId << " " << readActorId << " unread rows "
<< consumer->Stat.UnreadRows << " unread bytes " << consumer->Stat.UnreadBytes << " offset " << consumer->Stat.Offset
<< " get " << consumer->Counters.GetNextBatch
Expand All @@ -361,15 +384,6 @@ void TRowDispatcher::UpdateMetrics() {
}
}
LOG_ROW_DISPATCHER_DEBUG(str.Str());

for (const auto& [queryId, stat] : queryStats) {
LOG_ROW_DISPATCHER_DEBUG("UnreadBytes " << queryId << " " << stat.UnreadBytes.Max);
auto queryGroup = Metrics.Counters->GetSubgroup("queryId", queryId);
queryGroup->GetCounter("MaxUnreadRows")->Set(stat.UnreadRows.Max);
queryGroup->GetCounter("AvgUnreadRows")->Set(stat.UnreadRows.Avg);
queryGroup->GetCounter("MaxUnreadBytes")->Set(stat.UnreadBytes.Max);
queryGroup->GetCounter("AvgUnreadBytes")->Set(stat.UnreadBytes.Avg);
}
}

void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
Expand Down Expand Up @@ -609,6 +623,11 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&) {
UpdateMetrics();
}

void TRowDispatcher::Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&) {
PrintInternalState();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

давай строчки местами поменяем, чтобы сами себя не заспамили, если функция медленная

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Переставил

Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog());
}

void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev) {
LOG_ROW_DISPATCHER_TRACE("TEvSessionStatistic from " << ev->Sender);
const auto& key = ev->Get()->Stat.SessionKey;
Expand Down
24 changes: 12 additions & 12 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
void DoFiltering(const TVector<ui64>& offsets, const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& parsedValues);
void SendData(TClientsInfo& info);
void UpdateParser();
void FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter = nullptr);
void FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter, bool addParserDescription);
void SendDataArrived(TClientsInfo& client);
void StopReadSession();
TString GetSessionId() const;
Expand Down Expand Up @@ -494,7 +494,7 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TSessionClose
LOG_ROW_DISPATCHER_DEBUG(message);
NYql::TIssues issues;
issues.AddIssue(message);
Self.FatalError(issues.ToOneLineString());
Self.FatalError(issues.ToOneLineString(), nullptr, false);
}

void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent& event) {
Expand Down Expand Up @@ -580,7 +580,7 @@ void TTopicSession::DoParsing(bool force) {
const auto& parsedValues = Parser->Parse();
DoFiltering(Parser->GetOffsets(), parsedValues);
} catch (const std::exception& e) {
FatalError(e.what());
FatalError(e.what(), nullptr, true);
}
}

Expand All @@ -594,7 +594,7 @@ void TTopicSession::DoFiltering(const TVector<ui64>& offsets, const TVector<NKik
info.Filter->Push(offsets, RebuildJson(info, parsedValues));
}
} catch (const std::exception& e) {
FatalError(e.what(), &info.Filter);
FatalError(e.what(), &info.Filter, false);
}
}

Expand Down Expand Up @@ -664,7 +664,7 @@ bool HasJsonColumns(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) {
void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
auto it = Clients.find(ev->Sender);
if (it != Clients.end()) {
FatalError("Internal error: sender " + ev->Sender.ToString());
FatalError("Internal error: sender " + ev->Sender.ToString(), nullptr, false);
return;
}

Expand Down Expand Up @@ -712,11 +712,11 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
}
}
} catch (const NYql::NPureCalc::TCompileError& e) {
FatalError("Adding new client failed: CompileError: sql: " + e.GetYql() + ", error: " + e.GetIssues());
FatalError("Adding new client failed: CompileError: sql: " + e.GetYql() + ", error: " + e.GetIssues(), nullptr, true);
} catch (const yexception &ex) {
FatalError(TString{"Adding new client failed: "} + ex.what());
FatalError(TString{"Adding new client failed: "} + ex.what(), nullptr, true);
} catch (...) {
FatalError("Adding new client failed, " + CurrentExceptionMessage());
FatalError("Adding new client failed, " + CurrentExceptionMessage(), nullptr, true);
}
UpdateParser();
SendStatistic();
Expand Down Expand Up @@ -810,14 +810,14 @@ void TTopicSession::UpdateParser() {
const auto& parserConfig = Config.GetJsonParser();
Parser = NewJsonParser(names, types, parserConfig.GetBatchSizeBytes(), TDuration::MilliSeconds(parserConfig.GetBatchCreationTimeoutMs()));
} catch (const NYql::NPureCalc::TCompileError& e) {
FatalError(e.GetIssues());
FatalError(e.GetIssues(), nullptr, true);
}
}

void TTopicSession::FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter) {
void TTopicSession::FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter, bool addParserDescription) {
TStringStream str;
str << message;
if (Parser) {
if (Parser && addParserDescription) {
str << ", parser description:\n" << Parser->GetDescription();
}
if (filter) {
Expand Down Expand Up @@ -868,7 +868,7 @@ void TTopicSession::HandleException(const std::exception& e) {
if (CurrentStateFunc() == &TThis::ErrorState) {
return;
}
FatalError(TString("Internal error: exception: ") + e.what());
FatalError(TString("Internal error: exception: ") + e.what(), nullptr, false);
}

void TTopicSession::SendStatistic() {
Expand Down
Loading