@@ -23,13 +23,15 @@ TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data)
2323{
2424}
2525
26- TEvWorker::TEvData::TEvData (const TVector<TRecord>& records)
27- : Records(records)
26+ TEvWorker::TEvData::TEvData (const TString& source, const TVector<TRecord>& records)
27+ : Source(source)
28+ , Records(records)
2829{
2930}
3031
31- TEvWorker::TEvData::TEvData (TVector<TRecord>&& records)
32- : Records(std::move(records))
32+ TEvWorker::TEvData::TEvData (const TString& source, TVector<TRecord>&& records)
33+ : Source(source)
34+ , Records(std::move(records))
3335{
3436}
3537
@@ -42,6 +44,7 @@ void TEvWorker::TEvData::TRecord::Out(IOutputStream& out) const {
4244
4345TString TEvWorker::TEvData::ToString () const {
4446 return TStringBuilder () << ToStringHeader () << " {"
47+ << " Source: " << Source
4548 << " Records [" << JoinSeq (" ," , Records) << " ]"
4649 << " }" ;
4750}
@@ -115,16 +118,16 @@ class TWorker: public TActorBootstrapped<TWorker> {
115118 << " : sender# " << ev->Sender );
116119
117120 Reader.Registered ();
118- if (!InFlightRecords ) {
121+ if (!InFlightData ) {
119122 Send (Reader, new TEvWorker::TEvPoll ());
120123 }
121124 } else if (ev->Sender == Writer) {
122125 LOG_I (" Handshake with writer"
123126 << " : sender# " << ev->Sender );
124127
125128 Writer.Registered ();
126- if (InFlightRecords ) {
127- Send (Writer, new TEvWorker::TEvData (InFlightRecords ));
129+ if (InFlightData ) {
130+ Send (Writer, new TEvWorker::TEvData (InFlightData-> Source , InFlightData-> Records ));
128131 }
129132 } else {
130133 LOG_W (" Handshake from unknown actor"
@@ -142,7 +145,7 @@ class TWorker: public TActorBootstrapped<TWorker> {
142145 return ;
143146 }
144147
145- InFlightRecords. clear ();
148+ InFlightData. Reset ();
146149 if (Reader) {
147150 Send (ev->Forward (Reader));
148151 }
@@ -157,8 +160,8 @@ class TWorker: public TActorBootstrapped<TWorker> {
157160 return ;
158161 }
159162
160- Y_ABORT_UNLESS (InFlightRecords. empty () );
161- InFlightRecords = ev->Get ()->Records ;
163+ Y_ABORT_UNLESS (!InFlightData );
164+ InFlightData = MakeHolder<TEvWorker::TEvData>( ev->Get ()->Source , ev-> Get ()-> Records ) ;
162165
163166 if (Writer) {
164167 Send (ev->Forward (Writer));
@@ -239,7 +242,7 @@ class TWorker: public TActorBootstrapped<TWorker> {
239242 mutable TMaybe<TString> LogPrefix;
240243 TActorInfo Reader;
241244 TActorInfo Writer;
242- TVector <TEvWorker::TEvData::TRecord> InFlightRecords ;
245+ THolder <TEvWorker::TEvData> InFlightData ;
243246};
244247
245248IActor* CreateWorker (std::function<IActor*(void )>&& createReaderFn, std::function<IActor*(void )>&& createWriterFn) {
0 commit comments