@@ -173,13 +173,15 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
173173 html << " <h3>State</h3>" ;
174174 html << " <pre>" << ComputeActorState.DebugString () << " </pre>" ;
175175
176- #define DUMP (P, X ) html << #X " : " << P.X << " <br />"
176+ #define DUMP (P, X,...) html << #X " : " << P.X __VA_ARGS__ << " <br />"
177+ #define DUMP_PREFIXED (TITLE, S, FIELD,...) html << TITLE << #FIELD " : " << S . FIELD __VA_ARGS__ << " <br />"
177178 html << " <h4>ProcessSourcesState</h4>" ;
178179 DUMP (ProcessSourcesState, Inflight);
179180 html << " <h4>ProcessOutputsState</h4>" ;
180181 DUMP (ProcessOutputsState, Inflight);
181182 DUMP (ProcessOutputsState, ChannelsReady);
182183 DUMP (ProcessOutputsState, HasDataToSend);
184+ DUMP (ProcessOutputsState, DataWasSent);
183185 DUMP (ProcessOutputsState, AllOutputsFinished);
184186 DUMP (ProcessOutputsState, LastRunStatus);
185187 DUMP (ProcessOutputsState, LastPopReturnedNoData);
@@ -192,12 +194,11 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
192194 html << " <h3>CPU Quota</h3>" ;
193195 html << " QuoterServiceActorId: " << QuoterServiceActorId.ToString () << " <br />" ;
194196 if (ContinueRunEvent) {
195- html << " ContinueRunEvent.AskFreeSpace: " << ContinueRunEvent->AskFreeSpace << " <br />" ;
196- html << " ContinueRunEvent.CheckpointOnly: " << ContinueRunEvent->CheckpointOnly << " <br />" ;
197- html << " ContinueRunEvent.CheckpointRequest: " << ContinueRunEvent->CheckpointRequest .Defined () << " <br />" ;
198- html << " ContinueRunEvent.WatermarkRequest: " << ContinueRunEvent->WatermarkRequest .Defined () << " <br />" ;
199- html << " ContinueRunEvent.CheckpointOnly: " << ContinueRunEvent->CheckpointOnly << " <br />" ;
200- html << " ContinueRunEvent.MemLimit: " << ContinueRunEvent->MemLimit << " <br />" ;
197+ DUMP_PREFIXED (" ContinueRunEvent." , (*ContinueRunEvent), AskFreeSpace);
198+ DUMP_PREFIXED (" ContinueRunEvent." , (*ContinueRunEvent), CheckpointOnly);
199+ DUMP_PREFIXED (" ContinueRunEvent." , (*ContinueRunEvent), CheckpointRequest, .Defined ());
200+ DUMP_PREFIXED (" ContinueRunEvent." , (*ContinueRunEvent), WatermarkRequest, .Defined ());
201+ DUMP_PREFIXED (" ContinueRunEvent." , (*ContinueRunEvent), MemLimit);
201202 for (const auto & sinkId: ContinueRunEvent->SinkIds ) {
202203 html << " ContinueRunEvent.SinkIds: " << sinkId << " <br />" ;
203204 }
@@ -207,46 +208,45 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
207208 }
208209 }
209210
210- html << " ContinueRunStartWaitTime: " << ContinueRunStartWaitTime.ToString () << " <br />" ;
211- html << " ContinueRunInflight: " << ContinueRunInflight << " <br />" ;
212- html << " CpuTimeSpent: " << CpuTimeSpent.ToString () << " <br />" ;
213- html << " CpuTimeQuotaAsked: " << CpuTimeQuotaAsked.ToString () << " <br />" ;
214- html << " UseCpuQuota: " << UseCpuQuota () << " <br />" ;
215-
211+ DUMP ((*this ), ContinueRunStartWaitTime, .ToString ());
212+ DUMP ((*this ), ContinueRunInflight);
213+ DUMP ((*this ), CpuTimeSpent, .ToString ());
214+ DUMP ((*this ), CpuTimeQuotaAsked, .ToString ());
215+ DUMP ((*this ), UseCpuQuota, ());
216216
217217 html << " <h3>Checkpoints</h3>" ;
218- html << " ReadyToCheckpoint: " << ReadyToCheckpoint () << " <br /> " ;
219- html << " CheckpointRequestedFromTaskRunner: " << CheckpointRequestedFromTaskRunner << " <br /> " ;
218+ DUMP ((* this ), ReadyToCheckpoint, ()) ;
219+ DUMP ((* this ), CheckpointRequestedFromTaskRunner) ;
220220
221221 auto dumpAsyncStats = [&](auto prefix, auto & asyncStats) {
222222 html << prefix << " Level: " << static_cast <int >(asyncStats.Level ) << " <br />" ;
223- html << prefix << " MinWaitDuration: " << asyncStats. MinWaitDuration . ToString () << " <br /> " ;
223+ DUMP_PREFIXED ( prefix, asyncStats, MinWaitDuration, . ToString ()) ;
224224 html << prefix << " CurrentPauseTs: " << (asyncStats.CurrentPauseTs ? asyncStats.CurrentPauseTs ->ToString () : TString{}) << " <br />" ;
225- html << prefix << " MergeWaitPeriod: " << asyncStats. MergeWaitPeriod << " <br /> " ;
226- html << prefix << " Bytes: " << asyncStats. Bytes << " <br /> " ;
227- html << prefix << " DecompressedBytes: " << asyncStats. DecompressedBytes << " <br /> " ;
228- html << prefix << " Rows: " << asyncStats. Rows << " <br /> " ;
229- html << prefix << " Chunks: " << asyncStats. Chunks << " <br /> " ;
230- html << prefix << " Splits: " << asyncStats. Splits << " <br /> " ;
231- html << prefix << " FirstMessageTs: " << asyncStats. FirstMessageTs . ToString () << " <br /> " ;
232- html << prefix << " PauseMessageTs: " << asyncStats. PauseMessageTs . ToString () << " <br /> " ;
233- html << prefix << " ResumeMessageTs: " << asyncStats. ResumeMessageTs . ToString () << " <br /> " ;
234- html << prefix << " LastMessageTs: " << asyncStats. LastMessageTs . ToString () << " <br /> " ;
235- html << prefix << " WaitTime: " << asyncStats. WaitTime . ToString () << " <br /> " ;
225+ DUMP_PREFIXED ( prefix, asyncStats, MergeWaitPeriod) ;
226+ DUMP_PREFIXED ( prefix, asyncStats, Bytes) ;
227+ DUMP_PREFIXED ( prefix, asyncStats, DecompressedBytes) ;
228+ DUMP_PREFIXED ( prefix, asyncStats, Rows) ;
229+ DUMP_PREFIXED ( prefix, asyncStats, Chunks) ;
230+ DUMP_PREFIXED ( prefix, asyncStats, Splits) ;
231+ DUMP_PREFIXED ( prefix, asyncStats, FirstMessageTs, . ToString ()) ;
232+ DUMP_PREFIXED ( prefix, asyncStats, PauseMessageTs, . ToString ()) ;
233+ DUMP_PREFIXED ( prefix, asyncStats, ResumeMessageTs, . ToString ()) ;
234+ DUMP_PREFIXED ( prefix, asyncStats, LastMessageTs, . ToString ()) ;
235+ DUMP_PREFIXED ( prefix, asyncStats, WaitTime, . ToString ()) ;
236236 };
237237
238238 auto dumpOutputStats = [&](auto prefix, auto & outputStats) {
239- html << prefix << " MaxMemoryUsage: " << outputStats. MaxMemoryUsage << " <br /> " ;
240- html << prefix << " MaxRowsInMemory: " << outputStats. MaxRowsInMemory << " <br /> " ;
239+ DUMP_PREFIXED ( prefix, outputStats, MaxMemoryUsage) ;
240+ DUMP_PREFIXED ( prefix, outputStats, MaxRowsInMemory) ;
241241 dumpAsyncStats (prefix, outputStats);
242242 };
243243
244244 auto dumpInputChannelStats = [&](auto prefix, auto & pushStats) {
245- html << prefix << " ChannelId: " << pushStats. ChannelId << " <br /> " ;
246- html << prefix << " SrcStageId: " << pushStats. SrcStageId << " <br /> " ;
247- html << prefix << " RowsInMemory: " << pushStats. RowsInMemory << " <br /> " ;
248- html << prefix << " MaxMemoryUsage: " << pushStats. MaxMemoryUsage << " <br /> " ;
249- html << prefix << " DeserializationTime: " << pushStats. DeserializationTime . ToString () << " <br /> " ;
245+ DUMP_PREFIXED ( prefix, pushStats, ChannelId) ;
246+ DUMP_PREFIXED ( prefix, pushStats, SrcStageId) ;
247+ DUMP_PREFIXED ( prefix, pushStats, RowsInMemory) ;
248+ DUMP_PREFIXED ( prefix, pushStats, MaxMemoryUsage) ;
249+ DUMP_PREFIXED ( prefix, pushStats, DeserializationTime, . ToString ()) ;
250250 dumpAsyncStats (prefix, pushStats);
251251 };
252252
@@ -265,19 +265,32 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
265265 html << " CheckpointingMode: " << NDqProto::ECheckpointingMode_Name (info.CheckpointingMode ) << " <br />" ;
266266 DUMP (info, FreeSpace);
267267 html << " IsPaused: " << info.IsPaused () << " <br />" ;
268- if (info.Channel ) {
269- html << " DqInputChannel.ChannelId: " << info.Channel ->GetChannelId () << " <br />" ;
270- html << " DqInputChannel.FreeSpace: " << info.Channel ->GetFreeSpace () << " <br />" ;
271- html << " DqInputChannel.StoredBytes: " << info.Channel ->GetStoredBytes () << " <br />" ;
272- html << " DqInputChannel.Empty: " << info.Channel ->Empty () << " <br />" ;
273- html << " DqInputChannel.InputType: " << (info.Channel ->GetInputType () ? info.Channel ->GetInputType ()->GetKindAsStr () : TString{" unknown" }) << " <br />" ;
274- html << " DqInputChannel.InputWidth: " << (info.Channel ->GetInputWidth () ? ToString (*info.Channel ->GetInputWidth ()) : TString{" unknown" }) << " <br />" ;
275- html << " DqInputChannel.IsFinished: " << info.Channel ->IsFinished () << " <br />" ;
276-
277- const auto & pushStats = info.Channel ->GetPushStats ();
268+ auto channel = info.Channel ;
269+ if (!channel) {
270+ auto stats = GetTaskRunnerStats ();
271+ if (stats) {
272+ auto stageIt = stats->InputChannels .find (info.SrcStageId );
273+ if (stageIt != stats->InputChannels .end ()) {
274+ auto channelIt = stageIt->second .find (info.ChannelId );
275+ if (channelIt != stageIt->second .end ()) {
276+ channel = channelIt->second ;
277+ }
278+ }
279+ }
280+ }
281+ if (channel) {
282+ html << " DqInputChannel.ChannelId: " << channel->GetChannelId () << " <br />" ;
283+ html << " DqInputChannel.FreeSpace: " << channel->GetFreeSpace () << " <br />" ;
284+ html << " DqInputChannel.StoredBytes: " << channel->GetStoredBytes () << " <br />" ;
285+ html << " DqInputChannel.Empty: " << channel->Empty () << " <br />" ;
286+ html << " DqInputChannel.InputType: " << (channel->GetInputType () ? channel->GetInputType ()->GetKindAsStr () : TString{" unknown" }) << " <br />" ;
287+ html << " DqInputChannel.InputWidth: " << (channel->GetInputWidth () ? ToString (*channel->GetInputWidth ()) : TString{" unknown" }) << " <br />" ;
288+ html << " DqInputChannel.IsFinished: " << channel->IsFinished () << " <br />" ;
289+
290+ const auto & pushStats = channel->GetPushStats ();
278291 dumpInputChannelStats (" DqInputChannel.PushStats." , pushStats);
279292
280- const auto & popStats = info. Channel ->GetPopStats ();
293+ const auto & popStats = channel ->GetPopStats ();
281294 dumpInputStats (" DqInputChannel.PopStats." sv, popStats);
282295 }
283296 }
@@ -290,18 +303,19 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
290303 html << " PendingWatermark: " << !!info.PendingWatermark << " " << (!info.PendingWatermark ? TString{} : info.PendingWatermark ->ToString ()) << " <br />" ;
291304 html << " WatermarksMode: " << NDqProto::EWatermarksMode_Name (info.WatermarksMode ) << " <br />" ;
292305 html << " FreeSpace: " << info.GetFreeSpace () << " <br />" ;
293- if (info.Buffer ) {
294- html << " DqInputBuffer.InputIndex: " << info.Buffer ->GetInputIndex () << " <br />" ;
295- html << " DqInputBuffer.FreeSpace: " << info.Buffer ->GetFreeSpace () << " <br />" ;
296- html << " DqInputBuffer.StoredBytes: " << info.Buffer ->GetStoredBytes () << " <br />" ;
297- html << " DqInputBuffer.Empty: " << info.Buffer ->Empty () << " <br />" ;
298- html << " DqInputBuffer.InputType: " << (info.Buffer ->GetInputType () ? info.Buffer ->GetInputType ()->GetKindAsStr () : TString{" unknown" }) << " <br />" ;
299- html << " DqInputBuffer.InputWidth: " << (info.Buffer ->GetInputWidth () ? ToString (*info.Buffer ->GetInputWidth ()) : TString{" unknown" }) << " <br />" ;
300- html << " DqInputBuffer.IsFinished: " << info.Buffer ->IsFinished () << " <br />" ;
301- html << " DqInputBuffer.IsPaused: " << info.Buffer ->IsPaused () << " <br />" ;
302- html << " DqInputBuffer.IsPending: " << info.Buffer ->IsPending () << " <br />" ;
303-
304- const auto & popStats = info.Buffer ->GetPopStats ();
306+ auto buffer = info.Buffer ;
307+ if (buffer) {
308+ html << " DqInputBuffer.InputIndex: " << buffer->GetInputIndex () << " <br />" ;
309+ html << " DqInputBuffer.FreeSpace: " << buffer->GetFreeSpace () << " <br />" ;
310+ html << " DqInputBuffer.StoredBytes: " << buffer->GetStoredBytes () << " <br />" ;
311+ html << " DqInputBuffer.Empty: " << buffer->Empty () << " <br />" ;
312+ html << " DqInputBuffer.InputType: " << (buffer->GetInputType () ? buffer->GetInputType ()->GetKindAsStr () : TString{" unknown" }) << " <br />" ;
313+ html << " DqInputBuffer.InputWidth: " << (buffer->GetInputWidth () ? ToString (*buffer->GetInputWidth ()) : TString{" unknown" }) << " <br />" ;
314+ html << " DqInputBuffer.IsFinished: " << buffer->IsFinished () << " <br />" ;
315+ html << " DqInputBuffer.IsPaused: " << buffer->IsPaused () << " <br />" ;
316+ html << " DqInputBuffer.IsPending: " << buffer->IsPending () << " <br />" ;
317+
318+ const auto & popStats = buffer->GetPopStats ();
305319 dumpInputStats (" DqInputBuffer." sv, popStats);
306320 }
307321 if (info.AsyncInput ) {
@@ -332,18 +346,32 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
332346 html << " AsyncData.Watermark: " << info.AsyncData ->Watermark << " <br />" ;
333347 }
334348
335- if (info.Channel ) {
336- html << " DqOutputChannel.ChannelId: " << info.Channel ->GetChannelId () << " <br />" ;
337- html << " DqOutputChannel.ValuesCount: " << info.Channel ->GetValuesCount () << " <br />" ;
338- html << " DqOutputChannel.IsFull: " << info.Channel ->IsFull () << " <br />" ;
339- html << " DqOutputChannel.HasData: " << info.Channel ->HasData () << " <br />" ;
340- html << " DqOutputChannel.IsFinished: " << info.Channel ->IsFinished () << " <br />" ;
341- html << " DqInputChannel.OutputType: " << (info.Channel ->GetOutputType () ? info.Channel ->GetOutputType ()->GetKindAsStr () : TString{" unknown" }) << " <br />" ;
349+ auto channel = info.Channel ;
350+ if (!channel) {
351+ auto stats = GetTaskRunnerStats ();
352+ if (stats) {
353+ auto stageIt = stats->OutputChannels .find (info.DstStageId );
354+ if (stageIt != stats->OutputChannels .end ()) {
355+ auto channelIt = stageIt->second .find (info.ChannelId );
356+ if (channelIt != stageIt->second .end ()) {
357+ channel = channelIt->second ;
358+ }
359+ }
360+ }
361+ }
362+
363+ if (channel) {
364+ html << " DqOutputChannel.ChannelId: " << channel->GetChannelId () << " <br />" ;
365+ html << " DqOutputChannel.ValuesCount: " << channel->GetValuesCount () << " <br />" ;
366+ html << " DqOutputChannel.IsFull: " << channel->IsFull () << " <br />" ;
367+ html << " DqOutputChannel.HasData: " << channel->HasData () << " <br />" ;
368+ html << " DqOutputChannel.IsFinished: " << channel->IsFinished () << " <br />" ;
369+ html << " DqInputChannel.OutputType: " << (channel->GetOutputType () ? channel->GetOutputType ()->GetKindAsStr () : TString{" unknown" }) << " <br />" ;
342370
343- const auto & pushStats = info. Channel ->GetPushStats ();
371+ const auto & pushStats = channel ->GetPushStats ();
344372 dumpOutputStats (" DqOutputChannel.PushStats." sv, pushStats);
345373
346- const auto & popStats = info. Channel ->GetPopStats ();
374+ const auto & popStats = channel ->GetPopStats ();
347375 html << " DqOutputChannel.PopStats.ChannelId: " << popStats.ChannelId << " <br />" ;
348376 html << " DqOutputChannel.PopStats.DstStageId: " << popStats.DstStageId << " <br />" ;
349377 html << " DqOutputChannel.PopStats.MaxMemoryUsage: " << popStats.MaxMemoryUsage << " <br />" ;
@@ -364,8 +392,8 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
364392 DUMP (info, Finished);
365393 DUMP (info, FinishIsAcknowledged);
366394 DUMP (info, PopStarted);
367- if (info.Buffer ) {
368- const auto & buffer = *info.Buffer ;
395+ if (info.Buffer || TaskRunnerStats. GetSink (id) ) {
396+ const auto & buffer = info. Buffer ? *info.Buffer : *TaskRunnerStats. GetSink (id) ;
369397 html << " DqOutputBuffer.OutputIndex: " << buffer.GetOutputIndex () << " <br />" ;
370398 html << " DqOutputBuffer.IsFull: " << buffer.IsFull () << " <br />" ;
371399 html << " DqOutputBuffer.OutputType: " << (buffer.GetOutputType () ? buffer.GetOutputType ()->GetKindAsStr () : TString{" unknown" }) << " <br />" ;
@@ -404,6 +432,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
404432 }
405433 }
406434#undef DUMP
435+ #undef DUMP_PREFIXED
407436
408437 Send (ev->Sender , new NActors::NMon::TEvHttpInfoRes (html.Str ()));
409438 }
0 commit comments