@@ -313,17 +313,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
313
313
virtual void DoExecuteImpl () = 0;
314
314
virtual void DoTerminateImpl () {}
315
315
316
- virtual bool DoHandleChannelsAfterFinishImpl () {
317
- Y_ABORT_UNLESS (Checkpoints);
318
-
319
- if (Checkpoints->HasPendingCheckpoint () && !Checkpoints->ComputeActorStateSaved () && ReadyToCheckpoint ()) {
320
- Checkpoints->DoCheckpoint ();
321
- }
322
-
323
- // Send checkpoints to output channels.
324
- ProcessOutputsImpl (ERunStatus::Finished);
325
- return true ; // returns true, when channels were handled synchronously
326
- }
316
+ virtual bool DoHandleChannelsAfterFinishImpl () = 0;
327
317
328
318
void ProcessOutputsImpl (ERunStatus status) {
329
319
ProcessOutputsState.LastRunStatus = status;
@@ -621,131 +611,50 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
621
611
}
622
612
}
623
613
624
- public:
625
- i64 GetInputChannelFreeSpace (ui64 channelId) const override {
626
- const TInputChannelInfo* inputChannel = InputChannelsMap.FindPtr (channelId);
627
- YQL_ENSURE (inputChannel, " task: " << Task.GetId () << " , unknown input channelId: " << channelId);
628
-
629
- return inputChannel->Channel ->GetFreeSpace ();
630
- }
631
-
632
- void TakeInputChannelData (TChannelDataOOB&& channelData, bool ack) override {
633
- TInputChannelInfo* inputChannel = InputChannelsMap.FindPtr (channelData.Proto .GetChannelId ());
634
- YQL_ENSURE (inputChannel, " task: " << Task.GetId () << " , unknown input channelId: " << channelData.Proto .GetChannelId ());
635
-
636
- auto channel = inputChannel->Channel ;
637
-
638
- if (channelData.RowCount ()) {
639
- TDqSerializedBatch batch;
640
- batch.Proto = std::move (*channelData.Proto .MutableData ());
641
- batch.Payload = std::move (channelData.Payload );
642
- auto guard = BindAllocator ();
643
- channel->Push (std::move (batch));
644
- }
645
-
646
- if (channelData.Proto .HasCheckpoint ()) {
647
- Y_ABORT_UNLESS (inputChannel->CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED);
648
- Y_ABORT_UNLESS (Checkpoints);
649
- const auto & checkpoint = channelData.Proto .GetCheckpoint ();
650
- inputChannel->Pause (checkpoint);
651
- Checkpoints->RegisterCheckpoint (checkpoint, channelData.Proto .GetChannelId ());
652
- }
653
-
654
- if (channelData.Proto .GetFinished ()) {
655
- channel->Finish ();
656
- }
657
-
658
- if (ack) {
659
- Channels->SendChannelDataAck (channel->GetChannelId (), channel->GetFreeSpace ());
660
- }
661
-
662
- ContinueExecute (EResumeSource::CATakeInput);
663
- }
664
-
665
- void PeerFinished (ui64 channelId) override {
666
- TOutputChannelInfo* outputChannel = OutputChannelsMap.FindPtr (channelId);
667
- YQL_ENSURE (outputChannel, " task: " << Task.GetId () << " , output channelId: " << channelId);
668
-
669
- outputChannel->Finished = true ;
670
- outputChannel->Channel ->Finish ();
671
-
672
- CA_LOG_D (" task: " << Task.GetId () << " , output channelId: " << channelId << " finished prematurely, "
673
- << " about to clear buffer" );
674
-
675
- {
676
- auto guard = BindAllocator ();
677
- ui32 dropRows = outputChannel->Channel ->Drop ();
614
+ protected: // TDqComputeActorChannels::ICallbacks
615
+ // i64 GetInputChannelFreeSpace(ui64 channelId) is pure and must be overridded in derived class
678
616
679
- CA_LOG_I (" task: " << Task.GetId () << " , output channelId: " << channelId << " finished prematurely, "
680
- << " drop " << dropRows << " rows" );
681
- }
617
+ // void TakeInputChannelData(TChannelDataOOB&& channelData, bool ack) is pure and must be overridded in derived class
682
618
683
- DoExecute ();
684
- }
619
+ // void PeerFinished(ui64 channelId) is pure and must be overridded in derived class
685
620
686
- void ResumeExecution (EResumeSource source) override {
621
+ void ResumeExecution (EResumeSource source) override final {
687
622
ContinueExecute (source);
688
623
}
689
624
690
- void OnSinkStateSaved (NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override {
625
+ protected:
626
+ void OnSinkStateSaved (NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override final {
691
627
Y_ABORT_UNLESS (Checkpoints); // If we are checkpointing, we must have already constructed "checkpoints" object.
692
628
Checkpoints->OnSinkStateSaved (std::move (state), outputIndex, checkpoint);
693
629
}
694
630
695
- void OnTransformStateSaved (NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override {
631
+ void OnTransformStateSaved (NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override final {
696
632
Y_ABORT_UNLESS (Checkpoints); // If we are checkpointing, we must have already constructed "checkpoints" object.
697
633
Checkpoints->OnTransformStateSaved (std::move (state), outputIndex, checkpoint);
698
634
}
699
635
700
- void OnSinkFinished (ui64 outputIndex) override {
636
+ void OnSinkFinished (ui64 outputIndex) override final {
701
637
SinksMap.at (outputIndex).FinishIsAcknowledged = true ;
702
638
ContinueExecute (EResumeSource::CASinkFinished);
703
639
}
704
640
705
- void OnTransformFinished (ui64 outputIndex) override {
641
+ void OnTransformFinished (ui64 outputIndex) override final {
706
642
OutputTransformsMap.at (outputIndex).FinishIsAcknowledged = true ;
707
643
ContinueExecute (EResumeSource::CATransformFinished);
708
644
}
645
+
646
+ protected: // TDqComputeActorCheckpoints::ICallbacks
647
+ // bool ReadyToCheckpoint() is pure and must be overriden in a derived class
709
648
710
- protected:
711
- bool ReadyToCheckpoint () const override {
712
- for (auto & [id, channelInfo] : InputChannelsMap) {
713
- if (channelInfo.CheckpointingMode == NDqProto::CHECKPOINTING_MODE_DISABLED) {
714
- continue ;
715
- }
716
-
717
- if (!channelInfo.IsPaused ()) {
718
- return false ;
719
- }
720
- if (!channelInfo.Channel ->Empty ()) {
721
- return false ;
722
- }
723
- }
724
- return true ;
725
- }
726
-
727
- void CommitState (const NDqProto::TCheckpoint& checkpoint) override {
649
+ void CommitState (const NDqProto::TCheckpoint& checkpoint) override final {
728
650
CA_LOG_D (" Commit state" );
729
651
for (auto & [inputIndex, source] : SourcesMap) {
730
652
Y_ABORT_UNLESS (source.AsyncInput );
731
653
source.AsyncInput ->CommitState (checkpoint);
732
654
}
733
655
}
734
656
735
- void InjectBarrierToOutputs (const NDqProto::TCheckpoint& checkpoint) override {
736
- Y_ABORT_UNLESS (CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED);
737
- for (const auto & [id, channelInfo] : OutputChannelsMap) {
738
- if (!channelInfo.IsTransformOutput ) {
739
- channelInfo.Channel ->Push (NDqProto::TCheckpoint (checkpoint));
740
- }
741
- }
742
- for (const auto & [outputIndex, sink] : SinksMap) {
743
- sink.Buffer ->Push (NDqProto::TCheckpoint (checkpoint));
744
- }
745
- for (const auto & [outputIndex, transform] : OutputTransformsMap) {
746
- transform.Buffer ->Push (NDqProto::TCheckpoint (checkpoint));
747
- }
748
- }
657
+ // void InjectBarrierToOutputs(const NDqProto::TCheckpoint& checkpoint) is pure and must be overriden in a derived class
749
658
750
659
void ResumeInputsByWatermark (TInstant watermark) {
751
660
for (auto & [id, sourceInfo] : SourcesMap) {
@@ -771,15 +680,16 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
771
680
}
772
681
}
773
682
774
- void ResumeInputsByCheckpoint () override {
683
+ void ResumeInputsByCheckpoint () override final {
775
684
for (auto & [id, channelInfo] : InputChannelsMap) {
776
685
channelInfo.ResumeByCheckpoint ();
777
686
}
778
687
}
779
688
689
+ protected:
780
690
virtual void DoLoadRunnerState (TString&& blob) = 0;
781
691
782
- void LoadState (NDqProto::TComputeActorState&& state) override {
692
+ void LoadState (NDqProto::TComputeActorState&& state) override final {
783
693
CA_LOG_D (" Load state" );
784
694
TMaybe<TString> error = Nothing ();
785
695
const NDqProto::TMiniKqlProgramState& mkqlProgramState = state.GetMiniKqlProgram ();
@@ -814,13 +724,13 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
814
724
}
815
725
}
816
726
817
- void Start () override {
727
+ void Start () override final {
818
728
Running = true ;
819
729
State = NDqProto::COMPUTE_STATE_EXECUTING;
820
730
ContinueExecute (EResumeSource::CAStart);
821
731
}
822
732
823
- void Stop () override {
733
+ void Stop () override final {
824
734
Running = false ;
825
735
State = NDqProto::COMPUTE_STATE_UNKNOWN;
826
736
}
@@ -1255,99 +1165,12 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
1255
1165
NKikimr::NMiniKQL::TScopedAlloc& GetAllocator () {
1256
1166
return *Alloc.get ();
1257
1167
}
1258
- private:
1259
- virtual const TDqMemoryQuota::TProfileStats* GetMemoryProfileStats () const {
1260
- Y_ABORT_UNLESS (MemoryQuota);
1261
- return MemoryQuota->GetProfileStats ();
1262
- }
1263
-
1264
- virtual void DrainOutputChannel (TOutputChannelInfo& outputChannel) {
1265
- YQL_ENSURE (!outputChannel.Finished || Checkpoints);
1266
-
1267
- const bool wasFinished = outputChannel.Finished ;
1268
- auto channelId = outputChannel.Channel ->GetChannelId ();
1269
1168
1270
- CA_LOG_T (" About to drain channelId: " << channelId
1271
- << " , hasPeer: " << outputChannel.HasPeer
1272
- << " , finished: " << outputChannel.Channel ->IsFinished ());
1169
+ virtual const TDqMemoryQuota::TProfileStats* GetMemoryProfileStats () const = 0;
1273
1170
1274
- ProcessOutputsState.HasDataToSend |= !outputChannel.Finished ;
1275
- ProcessOutputsState.AllOutputsFinished &= outputChannel.Finished ;
1171
+ virtual void DrainOutputChannel (TOutputChannelInfo& outputChannel) = 0;
1276
1172
1277
- UpdateBlocked (outputChannel, !Channels->HasFreeMemoryInChannel (channelId));
1278
-
1279
- ui32 sentChunks = 0 ;
1280
- while ((!outputChannel.Finished || Checkpoints) &&
1281
- Channels->HasFreeMemoryInChannel (outputChannel.ChannelId ))
1282
- {
1283
- const static ui32 drainPackSize = 16 ;
1284
- std::vector<typename TOutputChannelInfo::TDrainedChannelMessage> channelData = outputChannel.DrainChannel (drainPackSize);
1285
- ui32 idx = 0 ;
1286
- for (auto && i : channelData) {
1287
- if (auto * w = i.GetWatermarkOptional ()) {
1288
- CA_LOG_I (" Resume inputs by watermark" );
1289
- // This is excessive, inputs should be resumed after async CA received response with watermark from task runner.
1290
- // But, let it be here, it's better to have the same code as in checkpoints
1291
- ResumeInputsByWatermark (TInstant::MicroSeconds (w->GetTimestampUs ()));
1292
- }
1293
- if (i.GetCheckpointOptional ()) {
1294
- CA_LOG_I (" Resume inputs by checkpoint" );
1295
- ResumeInputsByCheckpoint ();
1296
- }
1297
-
1298
- Channels->SendChannelData (i.BuildChannelData (outputChannel.ChannelId ), ++idx == channelData.size ());
1299
- ++sentChunks;
1300
- }
1301
- if (drainPackSize != channelData.size ()) {
1302
- if (!outputChannel.Finished ) {
1303
- CA_LOG_T (" output channelId: " << outputChannel.ChannelId << " , nothing to send and is not finished" );
1304
- }
1305
- break ;
1306
- }
1307
- }
1308
-
1309
- ProcessOutputsState.HasDataToSend |= !outputChannel.Finished ;
1310
- ProcessOutputsState.AllOutputsFinished &= outputChannel.Finished ;
1311
- ProcessOutputsState.DataWasSent |= (!wasFinished && outputChannel.Finished ) || sentChunks;
1312
- }
1313
-
1314
- virtual void DrainAsyncOutput (ui64 outputIndex, TAsyncOutputInfoBase& outputInfo) {
1315
- ProcessOutputsState.AllOutputsFinished &= outputInfo.Finished ;
1316
- if (outputInfo.Finished && !Checkpoints) {
1317
- return ;
1318
- }
1319
-
1320
- Y_ABORT_UNLESS (outputInfo.Buffer );
1321
- Y_ABORT_UNLESS (outputInfo.AsyncOutput );
1322
- Y_ABORT_UNLESS (outputInfo.Actor );
1323
-
1324
- const ui32 allowedOvercommit = AllowedChannelsOvercommit ();
1325
- const i64 sinkFreeSpaceBeforeSend = outputInfo.AsyncOutput ->GetFreeSpace ();
1326
-
1327
- i64 toSend = sinkFreeSpaceBeforeSend + allowedOvercommit;
1328
- CA_LOG_D (" About to drain async output " << outputIndex
1329
- << " . FreeSpace: " << sinkFreeSpaceBeforeSend
1330
- << " , allowedOvercommit: " << allowedOvercommit
1331
- << " , toSend: " << toSend
1332
- << " , finished: " << outputInfo.Buffer ->IsFinished ());
1333
-
1334
- i64 sent = 0 ;
1335
- while (toSend > 0 && (!outputInfo.Finished || Checkpoints)) {
1336
- const ui32 sentChunk = SendDataChunkToAsyncOutput (outputIndex, outputInfo, toSend);
1337
- if (sentChunk == 0 ) {
1338
- break ;
1339
- }
1340
- sent += sentChunk;
1341
- toSend = outputInfo.AsyncOutput ->GetFreeSpace () + allowedOvercommit;
1342
- }
1343
-
1344
- CA_LOG_D (" Drain async output " << outputIndex
1345
- << " . Free space decreased: " << (sinkFreeSpaceBeforeSend - outputInfo.AsyncOutput ->GetFreeSpace ())
1346
- << " , sent data from buffer: " << sent);
1347
-
1348
- ProcessOutputsState.HasDataToSend |= !outputInfo.Finished ;
1349
- ProcessOutputsState.DataWasSent |= outputInfo.Finished || sent;
1350
- }
1173
+ virtual void DrainAsyncOutput (ui64 outputIndex, TAsyncOutputInfoBase& outputInfo) = 0;
1351
1174
1352
1175
ui32 SendDataChunkToAsyncOutput (ui64 outputIndex, TAsyncOutputInfoBase& outputInfo, ui64 bytes) {
1353
1176
auto sink = outputInfo.Buffer ;
@@ -1599,7 +1422,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
1599
1422
InternalError (fatalCode, issues);
1600
1423
}
1601
1424
1602
- void OnSinkError (ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) override {
1425
+ void OnSinkError (ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) override final {
1603
1426
if (fatalCode == NYql::NDqProto::StatusIds::UNSPECIFIED) {
1604
1427
SinksMap.at (outputIndex).IssuesBuffer .Push (issues);
1605
1428
return ;
@@ -1609,7 +1432,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
1609
1432
InternalError (fatalCode, issues);
1610
1433
}
1611
1434
1612
- void OnOutputTransformError (ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) override {
1435
+ void OnOutputTransformError (ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) override final {
1613
1436
if (fatalCode == NYql::NDqProto::StatusIds::UNSPECIFIED) {
1614
1437
OutputTransformsMap.at (outputIndex).IssuesBuffer .Push (issues);
1615
1438
return ;
@@ -1635,7 +1458,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
1635
1458
return true ;
1636
1459
}
1637
1460
1638
- virtual ui64 CalcMkqlMemoryLimit () {
1461
+ ui64 CalcMkqlMemoryLimit () {
1639
1462
auto & opts = Task.GetProgram ().GetSettings ();
1640
1463
return opts.GetHasMapJoin ()/* || opts.GetHasSort()*/
1641
1464
? MemoryLimits.MkqlHeavyProgramMemoryLimit
@@ -1738,9 +1561,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
1738
1561
virtual const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats () = 0;
1739
1562
virtual const NYql::NDq::TDqMeteringStats* GetMeteringStats () = 0;
1740
1563
1741
- virtual const IDqAsyncOutputBuffer* GetSink (ui64, const TAsyncOutputInfoBase& sinkInfo) const {
1742
- return sinkInfo.Buffer .Get ();
1743
- }
1564
+ virtual const IDqAsyncOutputBuffer* GetSink (ui64 outputIdx, const TAsyncOutputInfoBase& sinkInfo) const = 0;
1744
1565
1745
1566
public:
1746
1567
0 commit comments