@@ -7,6 +7,27 @@ using namespace NActors;
77
88namespace NYql ::NDq {
99
10+ struct TMockChannelCallbacks : public TDqComputeActorChannels ::ICallbacks
11+ {
12+ i64 GetInputChannelFreeSpace (ui64 channelId) const override {
13+ Y_UNUSED (channelId);
14+ return 100 ;
15+ }
16+
17+ void TakeInputChannelData (TChannelDataOOB&& channelData, bool ack) override {
18+ Y_UNUSED (channelData);
19+ Y_UNUSED (ack);
20+ }
21+
22+ void PeerFinished (ui64 channelId) override {
23+ Y_UNUSED (channelId);
24+ }
25+
26+ void ResumeExecution (EResumeSource source) override {
27+ Y_UNUSED (source);
28+ }
29+ };
30+
1031struct TActorSystem : NActors::TTestActorRuntimeBase
1132{
1233 TString UnusedComponent = TString(" Unused" );
@@ -26,13 +47,18 @@ struct TActorSystem: NActors::TTestActorRuntimeBase
2647struct TChannelsTestFixture : public NUnitTest ::TBaseFixture
2748{
2849 TActorSystem ActorSystem;
29- TActorId Channels;
50+ TDqComputeActorChannels* Channels;
51+ TDqComputeActorChannels::TInputChannelState* InputChannel;
52+ TActorId ChannelsId;
3053 TActorId EdgeActor;
54+ TMockChannelCallbacks Callbacks;
3155
3256 void SetUp (NUnitTest::TTestContext& /* context */ ) override
3357 {
3458 ActorSystem.Start ();
3559
60+ EdgeActor = ActorSystem.AllocateEdgeActor ();
61+
3662 NDqProto::TDqTask task;
3763 auto channels = std::unique_ptr<TDqComputeActorChannels>(new TDqComputeActorChannels (
3864 /* ParentActorId = */ {},
@@ -41,15 +67,36 @@ struct TChannelsTestFixture: public NUnitTest::TBaseFixture
4167 /* retry = */ true ,
4268 NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE,
4369 /* channelBufferSize = */ 1000000 ,
44- /* callbacks = */ nullptr ,
70+ /* callbacks = */ &Callbacks ,
4571 /* activityType = */ 0
4672 ));
4773 channels->InputChannelsMap .emplace ((ui64)0 , TDqComputeActorChannels::TInputChannelState {});
74+ InputChannel = &channels->InCh (0 );
75+ Channels = channels.release ();
76+ ChannelsId = ActorSystem.Register (Channels);
77+ }
4878
49- Channels = ActorSystem.Register (channels.release ());
50- ActorSystem.DispatchEvents (
51- {.FinalEvents = {{TEvents::TSystem::Bootstrap}}},
52- TDuration::MilliSeconds (10 ));
79+ void RunInActorContext (const std::function<void (void )>& f) {
80+ bool executed = false ;
81+ auto prev = ActorSystem.SetEventFilter ([&](TTestActorRuntimeBase &, TAutoPtr<IEventHandle> &) -> bool {
82+ if (!executed) {
83+ executed = true ;
84+ f ();
85+ return true ;
86+ } else {
87+ return false ;
88+ }
89+ });
90+ ActorSystem.Send (
91+ EdgeActor,
92+ TActorId{},
93+ new TEvents::TEvWakeup
94+ );
95+ ActorSystem.SetEventFilter (prev);
96+ }
97+
98+ void SendAck (i64 freeSpace) {
99+ Channels->SendChannelDataAck (*InputChannel, freeSpace);
53100 }
54101};
55102
@@ -59,10 +106,28 @@ Y_UNIT_TEST_SUITE(TComputeActorTest) {
59106 Y_UNIT_TEST_F (ReceiveData, TChannelsTestFixture) {
60107 auto ev = MakeHolder<TEvDqCompute::TEvChannelData>();
61108 ActorSystem.Send (
62- Channels ,
109+ ChannelsId ,
63110 EdgeActor,
64111 ev.Release ()
65112 );
113+
114+ UNIT_ASSERT (!InputChannel->Peer .has_value ());
115+
116+ ev = MakeHolder<TEvDqCompute::TEvChannelData>();
117+ ev->Record .SetSeqNo (1 );
118+
119+ ActorSystem.Send (
120+ ChannelsId,
121+ EdgeActor,
122+ ev.Release ()
123+ );
124+
125+ UNIT_ASSERT (InputChannel->Peer == EdgeActor);
126+ UNIT_ASSERT (InputChannel->InFlight .size () == 0 );
127+
128+ RunInActorContext ([&]() { SendAck (10 ); });
129+
130+ UNIT_ASSERT (InputChannel->InFlight .size () == 1 );
66131 }
67132}
68133
0 commit comments