Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 97 additions & 68 deletions ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,15 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
html << "<h3>State</h3>";
html << "<pre>" << ComputeActorState.DebugString() << "</pre>";

#define DUMP(P, X) html << #X ": " << P.X << "<br />"
#define DUMP(P, X,...) html << #X ": " << P.X __VA_ARGS__ << "<br />"
#define DUMP_PREFIXED(TITLE, S, FIELD,...) html << TITLE << #FIELD ": " << S . FIELD __VA_ARGS__ << "<br />"
html << "<h4>ProcessSourcesState</h4>";
DUMP(ProcessSourcesState, Inflight);
html << "<h4>ProcessOutputsState</h4>";
DUMP(ProcessOutputsState, Inflight);
DUMP(ProcessOutputsState, ChannelsReady);
DUMP(ProcessOutputsState, HasDataToSend);
DUMP(ProcessOutputsState, DataWasSent);
DUMP(ProcessOutputsState, AllOutputsFinished);
DUMP(ProcessOutputsState, LastRunStatus);
DUMP(ProcessOutputsState, LastPopReturnedNoData);
Expand All @@ -192,12 +194,11 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
html << "<h3>CPU Quota</h3>";
html << "QuoterServiceActorId: " << QuoterServiceActorId.ToString() << "<br />";
if (ContinueRunEvent) {
html << "ContinueRunEvent.AskFreeSpace: " << ContinueRunEvent->AskFreeSpace << "<br />";
html << "ContinueRunEvent.CheckpointOnly: " << ContinueRunEvent->CheckpointOnly << "<br />";
html << "ContinueRunEvent.CheckpointRequest: " << ContinueRunEvent->CheckpointRequest.Defined() << "<br />";
html << "ContinueRunEvent.WatermarkRequest: " << ContinueRunEvent->WatermarkRequest.Defined() << "<br />";
html << "ContinueRunEvent.CheckpointOnly: " << ContinueRunEvent->CheckpointOnly << "<br />";
html << "ContinueRunEvent.MemLimit: " << ContinueRunEvent->MemLimit << "<br />";
DUMP_PREFIXED("ContinueRunEvent.", (*ContinueRunEvent), AskFreeSpace);
DUMP_PREFIXED("ContinueRunEvent.", (*ContinueRunEvent), CheckpointOnly);
DUMP_PREFIXED("ContinueRunEvent.", (*ContinueRunEvent), CheckpointRequest, .Defined());
DUMP_PREFIXED("ContinueRunEvent.", (*ContinueRunEvent), WatermarkRequest, .Defined());
DUMP_PREFIXED("ContinueRunEvent.", (*ContinueRunEvent), MemLimit);
for (const auto& sinkId: ContinueRunEvent->SinkIds) {
html << "ContinueRunEvent.SinkIds: " << sinkId << "<br />";
}
Expand All @@ -207,46 +208,45 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
}
}

html << "ContinueRunStartWaitTime: " << ContinueRunStartWaitTime.ToString() << "<br />";
html << "ContinueRunInflight: " << ContinueRunInflight << "<br />";
html << "CpuTimeSpent: " << CpuTimeSpent.ToString() << "<br />";
html << "CpuTimeQuotaAsked: " << CpuTimeQuotaAsked.ToString() << "<br />";
html << "UseCpuQuota: " << UseCpuQuota() << "<br />";

DUMP((*this), ContinueRunStartWaitTime, .ToString());
DUMP((*this), ContinueRunInflight);
DUMP((*this), CpuTimeSpent, .ToString());
DUMP((*this), CpuTimeQuotaAsked, .ToString());
DUMP((*this), UseCpuQuota, ());

html << "<h3>Checkpoints</h3>";
html << "ReadyToCheckpoint: " << ReadyToCheckpoint() << "<br />";
html << "CheckpointRequestedFromTaskRunner: " << CheckpointRequestedFromTaskRunner << "<br />";
DUMP((*this), ReadyToCheckpoint, ());
DUMP((*this), CheckpointRequestedFromTaskRunner);

auto dumpAsyncStats = [&](auto prefix, auto& asyncStats) {
html << prefix << "Level: " << static_cast<int>(asyncStats.Level) << "<br />";
html << prefix << "MinWaitDuration: " << asyncStats.MinWaitDuration.ToString() << "<br />";
DUMP_PREFIXED(prefix, asyncStats, MinWaitDuration, .ToString());
html << prefix << "CurrentPauseTs: " << (asyncStats.CurrentPauseTs ? asyncStats.CurrentPauseTs->ToString() : TString{}) << "<br />";
html << prefix << "MergeWaitPeriod: " << asyncStats.MergeWaitPeriod << "<br />";
html << prefix << "Bytes: " << asyncStats.Bytes << "<br />";
html << prefix << "DecompressedBytes: " << asyncStats.DecompressedBytes << "<br />";
html << prefix << "Rows: " << asyncStats.Rows << "<br />";
html << prefix << "Chunks: " << asyncStats.Chunks << "<br />";
html << prefix << "Splits: " << asyncStats.Splits << "<br />";
html << prefix << "FirstMessageTs: " << asyncStats.FirstMessageTs.ToString() << "<br />";
html << prefix << "PauseMessageTs: " << asyncStats.PauseMessageTs.ToString() << "<br />";
html << prefix << "ResumeMessageTs: " << asyncStats.ResumeMessageTs.ToString() << "<br />";
html << prefix << "LastMessageTs: " << asyncStats.LastMessageTs.ToString() << "<br />";
html << prefix << "WaitTime: " << asyncStats.WaitTime.ToString() << "<br />";
DUMP_PREFIXED(prefix, asyncStats, MergeWaitPeriod);
DUMP_PREFIXED(prefix, asyncStats, Bytes);
DUMP_PREFIXED(prefix, asyncStats, DecompressedBytes);
DUMP_PREFIXED(prefix, asyncStats, Rows);
DUMP_PREFIXED(prefix, asyncStats, Chunks);
DUMP_PREFIXED(prefix, asyncStats, Splits);
DUMP_PREFIXED(prefix, asyncStats, FirstMessageTs, .ToString());
DUMP_PREFIXED(prefix, asyncStats, PauseMessageTs, .ToString());
DUMP_PREFIXED(prefix, asyncStats, ResumeMessageTs, .ToString());
DUMP_PREFIXED(prefix, asyncStats, LastMessageTs, .ToString());
DUMP_PREFIXED(prefix, asyncStats, WaitTime, .ToString());
};

auto dumpOutputStats = [&](auto prefix, auto& outputStats) {
html << prefix << "MaxMemoryUsage: " << outputStats.MaxMemoryUsage << "<br />";
html << prefix << "MaxRowsInMemory: " << outputStats.MaxRowsInMemory << "<br />";
DUMP_PREFIXED(prefix, outputStats, MaxMemoryUsage);
DUMP_PREFIXED(prefix, outputStats, MaxRowsInMemory);
dumpAsyncStats(prefix, outputStats);
};

auto dumpInputChannelStats = [&](auto prefix, auto& pushStats) {
html << prefix << "ChannelId: " << pushStats.ChannelId << "<br />";
html << prefix << "SrcStageId: " << pushStats.SrcStageId << "<br />";
html << prefix << "RowsInMemory: " << pushStats.RowsInMemory << "<br />";
html << prefix << "MaxMemoryUsage: " << pushStats.MaxMemoryUsage << "<br />";
html << prefix << "DeserializationTime: " << pushStats.DeserializationTime.ToString() << "<br />";
DUMP_PREFIXED(prefix, pushStats, ChannelId);
DUMP_PREFIXED(prefix, pushStats, SrcStageId);
DUMP_PREFIXED(prefix, pushStats, RowsInMemory);
DUMP_PREFIXED(prefix, pushStats, MaxMemoryUsage);
DUMP_PREFIXED(prefix, pushStats, DeserializationTime, .ToString());
dumpAsyncStats(prefix, pushStats);
};

Expand All @@ -265,19 +265,32 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
html << "CheckpointingMode: " << NDqProto::ECheckpointingMode_Name(info.CheckpointingMode) << "<br />";
DUMP(info, FreeSpace);
html << "IsPaused: " << info.IsPaused() << "<br />";
if (info.Channel) {
html << "DqInputChannel.ChannelId: " << info.Channel->GetChannelId() << "<br />";
html << "DqInputChannel.FreeSpace: " << info.Channel->GetFreeSpace() << "<br />";
html << "DqInputChannel.StoredBytes: " << info.Channel->GetStoredBytes() << "<br />";
html << "DqInputChannel.Empty: " << info.Channel->Empty() << "<br />";
html << "DqInputChannel.InputType: " << (info.Channel->GetInputType() ? info.Channel->GetInputType()->GetKindAsStr() : TString{"unknown"}) << "<br />";
html << "DqInputChannel.InputWidth: " << (info.Channel->GetInputWidth() ? ToString(*info.Channel->GetInputWidth()) : TString{"unknown"}) << "<br />";
html << "DqInputChannel.IsFinished: " << info.Channel->IsFinished() << "<br />";

const auto& pushStats = info.Channel->GetPushStats();
auto channel = info.Channel;
if (!channel) {
auto stats = GetTaskRunnerStats();
if (stats) {
auto stageIt = stats->InputChannels.find(info.SrcStageId);
if (stageIt != stats->InputChannels.end()) {
auto channelIt = stageIt->second.find(info.ChannelId);
if (channelIt != stageIt->second.end()) {
channel = channelIt->second;
}
}
}
}
if (channel) {
html << "DqInputChannel.ChannelId: " << channel->GetChannelId() << "<br />";
html << "DqInputChannel.FreeSpace: " << channel->GetFreeSpace() << "<br />";
html << "DqInputChannel.StoredBytes: " << channel->GetStoredBytes() << "<br />";
html << "DqInputChannel.Empty: " << channel->Empty() << "<br />";
html << "DqInputChannel.InputType: " << (channel->GetInputType() ? channel->GetInputType()->GetKindAsStr() : TString{"unknown"}) << "<br />";
html << "DqInputChannel.InputWidth: " << (channel->GetInputWidth() ? ToString(*channel->GetInputWidth()) : TString{"unknown"}) << "<br />";
html << "DqInputChannel.IsFinished: " << channel->IsFinished() << "<br />";

const auto& pushStats = channel->GetPushStats();
dumpInputChannelStats("DqInputChannel.PushStats.", pushStats);

const auto& popStats = info.Channel->GetPopStats();
const auto& popStats = channel->GetPopStats();
dumpInputStats("DqInputChannel.PopStats."sv, popStats);
}
}
Expand All @@ -290,18 +303,19 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
html << "PendingWatermark: " << !!info.PendingWatermark << " " << (!info.PendingWatermark ? TString{} : info.PendingWatermark->ToString()) << "<br />";
html << "WatermarksMode: " << NDqProto::EWatermarksMode_Name(info.WatermarksMode) << "<br />";
html << "FreeSpace: " << info.GetFreeSpace() << "<br />";
if (info.Buffer) {
html << "DqInputBuffer.InputIndex: " << info.Buffer->GetInputIndex() << "<br />";
html << "DqInputBuffer.FreeSpace: " << info.Buffer->GetFreeSpace() << "<br />";
html << "DqInputBuffer.StoredBytes: " << info.Buffer->GetStoredBytes() << "<br />";
html << "DqInputBuffer.Empty: " << info.Buffer->Empty() << "<br />";
html << "DqInputBuffer.InputType: " << (info.Buffer->GetInputType() ? info.Buffer->GetInputType()->GetKindAsStr() : TString{"unknown"}) << "<br />";
html << "DqInputBuffer.InputWidth: " << (info.Buffer->GetInputWidth() ? ToString(*info.Buffer->GetInputWidth()) : TString{"unknown"}) << "<br />";
html << "DqInputBuffer.IsFinished: " << info.Buffer->IsFinished() << "<br />";
html << "DqInputBuffer.IsPaused: " << info.Buffer->IsPaused() << "<br />";
html << "DqInputBuffer.IsPending: " << info.Buffer->IsPending() << "<br />";

const auto& popStats = info.Buffer->GetPopStats();
auto buffer = info.Buffer;
if (buffer) {
html << "DqInputBuffer.InputIndex: " << buffer->GetInputIndex() << "<br />";
html << "DqInputBuffer.FreeSpace: " << buffer->GetFreeSpace() << "<br />";
html << "DqInputBuffer.StoredBytes: " << buffer->GetStoredBytes() << "<br />";
html << "DqInputBuffer.Empty: " << buffer->Empty() << "<br />";
html << "DqInputBuffer.InputType: " << (buffer->GetInputType() ? buffer->GetInputType()->GetKindAsStr() : TString{"unknown"}) << "<br />";
html << "DqInputBuffer.InputWidth: " << (buffer->GetInputWidth() ? ToString(*buffer->GetInputWidth()) : TString{"unknown"}) << "<br />";
html << "DqInputBuffer.IsFinished: " << buffer->IsFinished() << "<br />";
html << "DqInputBuffer.IsPaused: " << buffer->IsPaused() << "<br />";
html << "DqInputBuffer.IsPending: " << buffer->IsPending() << "<br />";

const auto& popStats = buffer->GetPopStats();
dumpInputStats("DqInputBuffer."sv, popStats);
}
if (info.AsyncInput) {
Expand Down Expand Up @@ -332,18 +346,32 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
html << "AsyncData.Watermark: " << info.AsyncData->Watermark << "<br />";
}

if (info.Channel) {
html << "DqOutputChannel.ChannelId: " << info.Channel->GetChannelId() << "<br />";
html << "DqOutputChannel.ValuesCount: " << info.Channel->GetValuesCount() << "<br />";
html << "DqOutputChannel.IsFull: " << info.Channel->IsFull() << "<br />";
html << "DqOutputChannel.HasData: " << info.Channel->HasData() << "<br />";
html << "DqOutputChannel.IsFinished: " << info.Channel->IsFinished() << "<br />";
html << "DqInputChannel.OutputType: " << (info.Channel->GetOutputType() ? info.Channel->GetOutputType()->GetKindAsStr() : TString{"unknown"}) << "<br />";
auto channel = info.Channel;
if (!channel) {
auto stats = GetTaskRunnerStats();
if (stats) {
auto stageIt = stats->OutputChannels.find(info.DstStageId);
if (stageIt != stats->OutputChannels.end()) {
auto channelIt = stageIt->second.find(info.ChannelId);
if (channelIt != stageIt->second.end()) {
channel = channelIt->second;
}
}
}
}

if (channel) {
html << "DqOutputChannel.ChannelId: " << channel->GetChannelId() << "<br />";
html << "DqOutputChannel.ValuesCount: " << channel->GetValuesCount() << "<br />";
html << "DqOutputChannel.IsFull: " << channel->IsFull() << "<br />";
html << "DqOutputChannel.HasData: " << channel->HasData() << "<br />";
html << "DqOutputChannel.IsFinished: " << channel->IsFinished() << "<br />";
html << "DqInputChannel.OutputType: " << (channel->GetOutputType() ? channel->GetOutputType()->GetKindAsStr() : TString{"unknown"}) << "<br />";

const auto& pushStats = info.Channel->GetPushStats();
const auto& pushStats = channel->GetPushStats();
dumpOutputStats("DqOutputChannel.PushStats."sv, pushStats);

const auto& popStats = info.Channel->GetPopStats();
const auto& popStats = channel->GetPopStats();
html << "DqOutputChannel.PopStats.ChannelId: " << popStats.ChannelId << "<br />";
html << "DqOutputChannel.PopStats.DstStageId: " << popStats.DstStageId << "<br />";
html << "DqOutputChannel.PopStats.MaxMemoryUsage: " << popStats.MaxMemoryUsage << "<br />";
Expand All @@ -364,8 +392,8 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
DUMP(info, Finished);
DUMP(info, FinishIsAcknowledged);
DUMP(info, PopStarted);
if (info.Buffer) {
const auto& buffer = *info.Buffer;
if (info.Buffer || TaskRunnerStats.GetSink(id)) {
const auto& buffer = info.Buffer ? *info.Buffer : *TaskRunnerStats.GetSink(id);
html << "DqOutputBuffer.OutputIndex: " << buffer.GetOutputIndex() << "<br />";
html << "DqOutputBuffer.IsFull: " << buffer.IsFull() << "<br />";
html << "DqOutputBuffer.OutputType: " << (buffer.GetOutputType() ? buffer.GetOutputType()->GetKindAsStr() : TString{"unknown"}) << "<br />";
Expand Down Expand Up @@ -404,6 +432,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
}
}
#undef DUMP
#undef DUMP_PREFIXED

Send(ev->Sender, new NActors::NMon::TEvHttpInfoRes(html.Str()));
}
Expand Down
Loading