@@ -30,6 +30,24 @@ namespace NKikimr::NPQ {
3030
3131static const ui32 MAX_USER_ACTS = 1000 ;
3232
33+ TMaybe<TInstant> GetReadFrom (ui32 maxTimeLagMs, ui64 readTimestampMs, TInstant consumerReadFromTimestamp, const TActorContext& ctx) {
34+ if (!(maxTimeLagMs > 0 || readTimestampMs > 0 || consumerReadFromTimestamp > TInstant::MilliSeconds (1 ))) {
35+ return {};
36+ }
37+
38+ TInstant timestamp = maxTimeLagMs > 0 ? ctx.Now () - TDuration::MilliSeconds (maxTimeLagMs) : TInstant::Zero ();
39+ timestamp = Max (timestamp, TInstant::MilliSeconds (readTimestampMs));
40+ timestamp = Max (timestamp, consumerReadFromTimestamp);
41+ return timestamp;
42+ }
43+
44+ ui64 TPartition::GetReadOffset (ui64 offset, TMaybe<TInstant> readTimestamp) const {
45+ if (!readTimestamp) {
46+ return offset;
47+ }
48+ return Max (GetOffsetEstimate (DataKeysBody, *readTimestamp, Min (Head.Offset , EndOffset - 1 )), offset);
49+ }
50+
3351void TPartition::SendReadingFinished (const TString& consumer) {
3452 Send (Tablet, new TEvPQ::TEvReadingPartitionStatusRequest (consumer, Partition.OriginalPartitionId , TabletGeneration, ++PQRBCookie));
3553}
@@ -133,7 +151,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
133151 };
134152
135153 for (auto request = HasDataRequests.begin (); request != HasDataRequests.end ();) {
136- if (request->Offset < EndOffset) {
154+ if (request->Offset < EndOffset && ( IsActive () || !request-> ReadTimestamp || *request-> ReadTimestamp < EndWriteTimestamp) ) {
137155 auto response = MakeHasDataInfoResponse (GetSizeLag (request->Offset ), request->Cookie );
138156 ctx.Send (request->Sender , response.Release ());
139157 } else if (!IsActive ()) {
@@ -170,16 +188,18 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
170188
171189 auto cookie = record.HasCookie () ? TMaybe<ui64>(record.GetCookie ()) : TMaybe<ui64>();
172190
191+ auto readTimestamp = GetReadFrom (record.GetMaxTimeLagMs (), record.GetReadTimestampMs (), TInstant::Zero () /* TODO */ , ctx);
192+
173193 TActorId sender = ActorIdFromProto (record.GetSender ());
174- if (InitDone && EndOffset > (ui64)record.GetOffset ()) { // already has data, answer right now
194+ if (InitDone && EndOffset > (ui64)record.GetOffset () && (!readTimestamp || EndWriteTimestamp >= *readTimestamp) ) { // already has data, answer right now
175195 auto response = MakeHasDataInfoResponse (GetSizeLag (record.GetOffset ()), cookie);
176196 ctx.Send (sender, response.Release ());
177197 } else if (InitDone && !IsActive ()) {
178198 auto response = MakeHasDataInfoResponse (0 , cookie, true );
179199 ctx.Send (sender, response.Release ());
180200 } else {
181201 THasDataReq req{++HasDataReqNum, (ui64)record.GetOffset (), sender, cookie,
182- record.HasClientId () && InitDone ? record.GetClientId () : " " };
202+ record.HasClientId () && InitDone ? record.GetClientId () : " " , readTimestamp };
183203 THasDataDeadline dl{TInstant::MilliSeconds (record.GetDeadline ()), req};
184204 auto res = HasDataRequests.insert (req);
185205 HasDataDeadlines.insert (dl);
@@ -763,11 +783,10 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr&& readEvent, TDuration waitQuotaTim
763783 }
764784 userInfo->ReadsInQuotaQueue --;
765785 ui64 offset = read->Offset ;
766- if (read->PartNo == 0 && (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo->ReadFromTimestamp > TInstant::MilliSeconds (1 ))) {
767- TInstant timestamp = read->MaxTimeLagMs > 0 ? ctx.Now () - TDuration::MilliSeconds (read->MaxTimeLagMs ) : TInstant::Zero ();
768- timestamp = Max (timestamp, TInstant::MilliSeconds (read->ReadTimestampMs ));
769- timestamp = Max (timestamp, userInfo->ReadFromTimestamp );
770- offset = Max (GetOffsetEstimate (DataKeysBody, timestamp, Min (Head.Offset , EndOffset - 1 )), offset);
786+
787+ auto readTimestamp = GetReadFrom (read->MaxTimeLagMs , read->ReadTimestampMs , userInfo->ReadFromTimestamp , ctx);
788+ if (read->PartNo == 0 && readTimestamp) {
789+ offset = GetReadOffset (offset, readTimestamp);
771790 userInfo->ReadOffsetRewindSum += offset - read->Offset ;
772791 }
773792
0 commit comments