Skip to content

Commit 6d9da7e

Browse files
authored
Fix metrics of inactive partitions (#12543)
1 parent 8d885f6 commit 6d9da7e

File tree

5 files changed

+54
-38
lines changed

5 files changed

+54
-38
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,10 @@ void TPartition::HandleWakeup(const TActorContext& ctx) {
332332
ctx.Send(Tablet, new TEvPQ::TEvMetering(EMeteringJson::UsedStorageV1, usedStorage));
333333
}
334334

335+
if (ManageWriteTimestampEstimate || !IsActive()) {
336+
WriteTimestampEstimate = ctx.Now();
337+
}
338+
335339
ReportCounters(ctx);
336340

337341
ProcessHasDataRequests(ctx);
@@ -712,12 +716,14 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
712716
}
713717
continue;
714718
}
719+
720+
userInfo.EndOffset = EndOffset;
721+
715722
if (clientId == userInfo.User) { //fill lags
716723
NKikimrPQ::TClientInfo* clientInfo = result.MutableLagsInfo();
717724
clientInfo->SetClientId(userInfo.User);
718725
auto write = clientInfo->MutableWritePosition();
719726
write->SetOffset(userInfo.Offset);
720-
userInfo.EndOffset = EndOffset;
721727
write->SetWriteTimestamp((userInfo.GetWriteTimestamp() ? userInfo.GetWriteTimestamp() : GetWriteTimeEstimate(userInfo.Offset)).MilliSeconds());
722728
write->SetCreateTimestamp(userInfo.GetCreateTimestamp().MilliSeconds());
723729
auto read = clientInfo->MutableReadPosition();
@@ -727,32 +733,43 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
727733
write->SetSize(GetSizeLag(userInfo.Offset));
728734
read->SetSize(GetSizeLag(userInfo.GetReadOffset()));
729735

730-
clientInfo->SetReadLagMs(userInfo.GetReadOffset() < (i64)EndOffset
731-
? (userInfo.GetReadTimestamp() - TInstant::MilliSeconds(read->GetWriteTimestamp())).MilliSeconds()
732-
: 0);
733736
clientInfo->SetLastReadTimestampMs(userInfo.GetReadTimestamp().MilliSeconds());
734-
clientInfo->SetWriteLagMs(userInfo.GetWriteLagMs());
735-
ui64 totalLag = clientInfo->GetReadLagMs() + userInfo.GetWriteLagMs() + (ctx.Now() - userInfo.GetReadTimestamp()).MilliSeconds();
736-
clientInfo->SetTotalLagMs(totalLag);
737+
if (IsActive() || userInfo.GetReadOffset() < (i64)EndOffset) {
738+
clientInfo->SetReadLagMs(userInfo.GetReadOffset() < (i64)EndOffset
739+
? (userInfo.GetReadTimestamp() - TInstant::MilliSeconds(read->GetWriteTimestamp())).MilliSeconds()
740+
: 0);
741+
clientInfo->SetWriteLagMs(userInfo.GetWriteLagMs());
742+
ui64 totalLag = clientInfo->GetReadLagMs() + userInfo.GetWriteLagMs() + (ctx.Now() - userInfo.GetReadTimestamp()).MilliSeconds();
743+
clientInfo->SetTotalLagMs(totalLag);
744+
} else {
745+
clientInfo->SetReadLagMs(0);
746+
clientInfo->SetWriteLagMs(0);
747+
clientInfo->SetTotalLagMs(0);
748+
}
737749
}
738750

739751
if (ev->Get()->GetStatForAllConsumers) { //fill lags
740752
auto* clientInfo = result.AddConsumerResult();
741753
clientInfo->SetConsumer(userInfo.User);
742-
auto readTimestamp = (userInfo.GetReadWriteTimestamp() ? userInfo.GetReadWriteTimestamp() : GetWriteTimeEstimate(userInfo.GetReadOffset())).MilliSeconds();
743-
clientInfo->SetReadLagMs(userInfo.GetReadOffset() < (i64)EndOffset
744-
? (userInfo.GetReadTimestamp() - TInstant::MilliSeconds(readTimestamp)).MilliSeconds()
745-
: 0);
746754
clientInfo->SetLastReadTimestampMs(userInfo.GetReadTimestamp().MilliSeconds());
747-
clientInfo->SetWriteLagMs(userInfo.GetWriteLagMs());
755+
756+
if (IsActive() || userInfo.GetReadOffset() < (i64)EndOffset) {
757+
auto readTimestamp = (userInfo.GetReadWriteTimestamp() ? userInfo.GetReadWriteTimestamp() : GetWriteTimeEstimate(userInfo.GetReadOffset())).MilliSeconds();
758+
clientInfo->SetReadLagMs(userInfo.GetReadOffset() < (i64)EndOffset
759+
? (userInfo.GetReadTimestamp() - TInstant::MilliSeconds(readTimestamp)).MilliSeconds()
760+
: 0);
761+
clientInfo->SetWriteLagMs(userInfo.GetWriteLagMs());
762+
} else {
763+
clientInfo->SetReadLagMs(0);
764+
clientInfo->SetWriteLagMs(0);
765+
}
748766

749767
clientInfo->SetAvgReadSpeedPerMin(userInfo.AvgReadBytes[1].GetValue());
750768
clientInfo->SetAvgReadSpeedPerHour(userInfo.AvgReadBytes[2].GetValue());
751769
clientInfo->SetAvgReadSpeedPerDay(userInfo.AvgReadBytes[3].GetValue());
752770

753771
clientInfo->SetReadingFinished(LastOffsetHasBeenCommited(userInfo));
754772
}
755-
756773
}
757774

758775
result.SetStartOffset(StartOffset);

ydb/core/persqueue/partition_read.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,11 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
194194
auto response = MakeHasDataInfoResponse(GetSizeLag(record.GetOffset()), cookie);
195195
ctx.Send(sender, response.Release());
196196
} else if (InitDone && !IsActive()) {
197+
auto now = ctx.Now();
198+
199+
auto& userInfo = UsersInfoStorage->GetOrCreate(record.GetClientId(), ctx);
200+
userInfo.UpdateReadOffset((i64)EndOffset - 1, now, now, now, true);
201+
197202
auto response = MakeHasDataInfoResponse(0, cookie, true);
198203
ctx.Send(sender, response.Release());
199204
} else {

ydb/core/persqueue/user_info.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,12 +269,12 @@ struct TUserInfo: public TUserInfoBase {
269269

270270
}
271271

272-
void UpdateReadOffset(const i64 offset, TInstant writeTimestamp, TInstant createTimestamp, TInstant now) {
272+
void UpdateReadOffset(const i64 offset, TInstant writeTimestamp, TInstant createTimestamp, TInstant now, bool force = false) {
273273
ReadOffset = offset;
274274
ReadWriteTimestamp = writeTimestamp;
275275
ReadCreateTimestamp = createTimestamp;
276276
WriteLagMs.Update((ReadWriteTimestamp - ReadCreateTimestamp).MilliSeconds(), ReadWriteTimestamp);
277-
if (Subscriptions > 0) {
277+
if (Subscriptions > 0 || force) {
278278
ReadTimestamp = now;
279279
}
280280
}

ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2433,10 +2433,6 @@ void TPartitionActor::InitLockPartition(const TActorContext& ctx) {
24332433

24342434

24352435
void TPartitionActor::WaitDataInPartition(const TActorContext& ctx) {
2436-
if (ReadingFinishedSent) {
2437-
return;
2438-
}
2439-
24402436
if (WaitDataInfly.size() > 1) { //already got 2 requests inflight
24412437
return;
24422438
}
@@ -2507,12 +2503,14 @@ void TPartitionActor::Handle(TEvPersQueue::TEvHasDataInfoResponse::TPtr& ev, con
25072503
EndOffset = record.GetEndOffset();
25082504
SizeLag = record.GetSizeLag();
25092505

2510-
if (ReadOffset < EndOffset) {
2511-
WaitForData = false;
2512-
WaitDataInfly.clear();
2513-
SendPartitionReady(ctx);
2514-
} else if (PipeClient) {
2515-
WaitDataInPartition(ctx);
2506+
if (!record.GetReadingFinished()) {
2507+
if (ReadOffset < EndOffset) {
2508+
WaitForData = false;
2509+
WaitDataInfly.clear();
2510+
SendPartitionReady(ctx);
2511+
} else if (PipeClient) {
2512+
WaitDataInPartition(ctx);
2513+
}
25162514
}
25172515

25182516
if (!ReadingFinishedSent) {

ydb/services/persqueue_v1/actors/partition_actor.cpp

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,10 +1004,6 @@ void TPartitionActor::WaitDataInPartition(const TActorContext& ctx) {
10041004
return;
10051005
}
10061006

1007-
if (ReadingFinishedSent) {
1008-
return;
1009-
}
1010-
10111007
Y_ABORT_UNLESS(InitDone);
10121008
Y_ABORT_UNLESS(PipeClient);
10131009
Y_ABORT_UNLESS(ReadOffset >= EndOffset || MaxTimeLagMs || ReadTimestampMs);
@@ -1068,12 +1064,14 @@ void TPartitionActor::Handle(TEvPersQueue::TEvHasDataInfoResponse::TPtr& ev, con
10681064
EndOffset = record.GetEndOffset();
10691065
SizeLag = record.GetSizeLag();
10701066

1071-
if (ReadOffset < EndOffset && !record.GetReadingFinished()) {
1072-
WaitForData = false;
1073-
WaitDataInfly.clear();
1074-
SendPartitionReady(ctx);
1075-
} else if (PipeClient) {
1076-
WaitDataInPartition(ctx);
1067+
if (!record.GetReadingFinished()) {
1068+
if (ReadOffset < EndOffset) {
1069+
WaitForData = false;
1070+
WaitDataInfly.clear();
1071+
SendPartitionReady(ctx);
1072+
} else if (PipeClient) {
1073+
WaitDataInPartition(ctx);
1074+
}
10771075
}
10781076

10791077
if (!ReadingFinishedSent) {
@@ -1236,9 +1234,7 @@ void TPartitionActor::Handle(TEvPQProxy::TEvDeadlineExceeded::TPtr& ev, const TA
12361234

12371235
void TPartitionActor::HandleWakeup(const TActorContext& ctx) {
12381236
DoWakeup(ctx);
1239-
if (!ReadingFinishedSent) {
1240-
ctx.Schedule(PREWAIT_DATA, new TEvents::TEvWakeup());
1241-
}
1237+
ctx.Schedule(PREWAIT_DATA, new TEvents::TEvWakeup());
12421238
}
12431239

12441240
void TPartitionActor::DoWakeup(const TActorContext& ctx) {

0 commit comments

Comments
 (0)