@@ -135,6 +135,32 @@ TAutoPtr<TEvPersQueue::TEvHasDataInfoResponse> TPartition::MakeHasDataInfoRespon
135135 return res;
136136}
137137
138+ bool TPartition::ProcessHasDataRequest (const THasDataReq& request, const TActorContext& ctx) {
139+ auto sendResponse = [&](ui64 lagSize, bool readingFinished) {
140+ auto response = MakeHasDataInfoResponse (lagSize, request.Cookie , readingFinished);
141+ ctx.Send (request.Sender , response.Release ());
142+ };
143+
144+ if (!IsActive ()) {
145+ if (request.Offset < EndOffset && (!request.ReadTimestamp || *request.ReadTimestamp <= EndWriteTimestamp)) {
146+ sendResponse (GetSizeLag (request.Offset ), false );
147+ } else {
148+ sendResponse (0 , true );
149+
150+ auto now = ctx.Now ();
151+ auto & userInfo = UsersInfoStorage->GetOrCreate (request.ClientId , ctx);
152+ userInfo.UpdateReadOffset ((i64 )EndOffset - 1 , now, now, now, true );
153+ }
154+ } else if (request.Offset < EndOffset) {
155+ sendResponse (GetSizeLag (request.Offset ), false );
156+ } else {
157+ return false ;
158+ }
159+
160+ return true ;
161+ }
162+
163+
138164void TPartition::ProcessHasDataRequests (const TActorContext& ctx) {
139165 if (!InitDone) {
140166 return ;
@@ -150,13 +176,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
150176 };
151177
152178 for (auto request = HasDataRequests.begin (); request != HasDataRequests.end ();) {
153- if (request->Offset < EndOffset && (IsActive () || !request->ReadTimestamp || *request->ReadTimestamp < EndWriteTimestamp)) {
154- auto response = MakeHasDataInfoResponse (GetSizeLag (request->Offset ), request->Cookie );
155- ctx.Send (request->Sender , response.Release ());
156- } else if (!IsActive ()) {
157- auto response = MakeHasDataInfoResponse (0 , request->Cookie , true );
158- ctx.Send (request->Sender , response.Release ());
159- } else {
179+ if (!ProcessHasDataRequest (*request, ctx)) {
160180 break ;
161181 }
162182
@@ -185,33 +205,23 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
185205 auto & record = ev->Get ()->Record ;
186206 Y_ABORT_UNLESS (record.HasSender ());
187207
188- auto cookie = record. HasCookie () ? TMaybe<ui64>(record. GetCookie ()) : TMaybe<ui64> ();
208+ auto now = ctx. Now ();
189209
210+ auto cookie = record.HasCookie () ? TMaybe<ui64>(record.GetCookie ()) : TMaybe<ui64>();
190211 auto readTimestamp = GetReadFrom (record.GetMaxTimeLagMs (), record.GetReadTimestampMs (), TInstant::Zero () /* TODO */ , ctx);
191212
192213 TActorId sender = ActorIdFromProto (record.GetSender ());
193- if (InitDone && EndOffset > (ui64)record.GetOffset () && (!readTimestamp || EndWriteTimestamp >= *readTimestamp)) { // already has data, answer right now
194- auto response = MakeHasDataInfoResponse (GetSizeLag (record.GetOffset ()), cookie);
195- ctx.Send (sender, response.Release ());
196- } else if (InitDone && !IsActive ()) {
197- auto now = ctx.Now ();
198214
199- auto & userInfo = UsersInfoStorage-> GetOrCreate ( record.GetClientId (), ctx);
200- userInfo. UpdateReadOffset (( i64 )EndOffset - 1 , now, now, now, true ) ;
215+ THasDataReq req{++HasDataReqNum, (ui64) record.GetOffset (), sender, cookie,
216+ record. HasClientId () && InitDone ? record. GetClientId () : " " , readTimestamp} ;
201217
202- auto response = MakeHasDataInfoResponse (0 , cookie, true );
203- ctx.Send (sender, response.Release ());
204- } else {
205- THasDataReq req{++HasDataReqNum, (ui64)record.GetOffset (), sender, cookie,
206- record.HasClientId () && InitDone ? record.GetClientId () : " " , readTimestamp};
218+ if (!InitDone || !ProcessHasDataRequest (req, ctx)) {
207219 THasDataDeadline dl{TInstant::MilliSeconds (record.GetDeadline ()), req};
208- auto res = HasDataRequests.insert (req);
220+ auto res = HasDataRequests.insert (std::move ( req) );
209221 HasDataDeadlines.insert (dl);
210222 Y_ABORT_UNLESS (res.second );
211223
212224 if (InitDone && record.HasClientId () && !record.GetClientId ().empty ()) {
213- auto now = ctx.Now ();
214-
215225 auto & userInfo = UsersInfoStorage->GetOrCreate (record.GetClientId (), ctx);
216226 ++userInfo.Subscriptions ;
217227 userInfo.UpdateReadOffset ((i64 )EndOffset - 1 , now, now, now);
0 commit comments