Skip to content

Commit a3e3f9a

Browse files
authored
Merge 2532149 into b13bd4a
2 parents b13bd4a + 2532149 commit a3e3f9a

File tree

16 files changed

+244
-35
lines changed

16 files changed

+244
-35
lines changed

ydb/core/persqueue/blob.h

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ struct TClientBlob {
101101
void SerializeTo(TBuffer& buffer) const;
102102
static TClientBlob Deserialize(const char *data, ui32 size);
103103

104-
static void CheckBlob(const TKey& key, const TString& blob);
104+
static void CheckBlob(const TKey& key, const TString& blob);
105105
};
106106

107107
static constexpr const ui32 MAX_BLOB_SIZE = 8_MB;
@@ -167,22 +167,35 @@ struct TBatch {
167167
ui64 GetOffset() const {
168168
return Header.GetOffset();
169169
}
170+
170171
ui16 GetPartNo() const {
171172
return Header.GetPartNo();
172173
}
174+
173175
ui32 GetUnpackedSize() const {
174176
return Header.GetUnpackedSize();
175177
}
178+
176179
ui32 GetCount() const {
177180
return Header.GetCount();
178181
}
182+
179183
ui16 GetInternalPartsCount() const {
180184
return Header.GetInternalPartsCount();
181185
}
186+
182187
bool IsGreaterThan(ui64 offset, ui16 partNo) const {
183188
return GetOffset() > offset || GetOffset() == offset && GetPartNo() > partNo;
184189
}
185190

191+
bool Empty() const {
192+
return Blobs.empty();
193+
}
194+
195+
TInstant GetLastMessageWriteTimestamp() const {
196+
return Blobs.back().WriteTimestamp;
197+
}
198+
186199
TBatch(const NKikimrPQ::TBatchHeader &header, const char* data)
187200
: Packed(true)
188201
, Header(header)
@@ -239,7 +252,7 @@ struct THead {
239252
private:
240253
std::deque<TBatch> Batches;
241254
ui16 InternalPartsCount = 0;
242-
255+
243256
friend class TPartitionedBlob;
244257

245258
class TBatchAccessor {

ydb/core/persqueue/partition.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) {
356356
meta.SetStartOffset(StartOffset);
357357
meta.SetEndOffset(Max(NewHead.GetNextOffset(), EndOffset));
358358
meta.SetSubDomainOutOfSpace(SubDomainOutOfSpace);
359+
meta.SetLastMessageWriteTimestamp(PendingWriteTimestamp.MilliSeconds());
359360

360361
if (IsSupportive()) {
361362
auto* counterData = meta.MutableCounterData();

ydb/core/persqueue/partition.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
116116
friend TInitInfoRangeStep;
117117
friend TInitDataRangeStep;
118118
friend TInitDataStep;
119+
friend TInitEndWriteTimestampStep;
119120

120121
friend TPartitionSourceManager;
121122

@@ -666,6 +667,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
666667
// [DataKeysBody ][DataKeysHead ]
667668
ui64 StartOffset;
668669
ui64 EndOffset;
670+
TInstant EndWriteTimestamp;
671+
TInstant PendingWriteTimestamp;
669672

670673
ui64 WriteInflightSize;
671674
TActorId Tablet;

ydb/core/persqueue/partition_init.cpp

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ TInitializer::TInitializer(TPartition* partition)
3030
Steps.push_back(MakeHolder<TInitInfoRangeStep>(this));
3131
Steps.push_back(MakeHolder<TInitDataRangeStep>(this));
3232
Steps.push_back(MakeHolder<TInitDataStep>(this));
33+
Steps.push_back(MakeHolder<TInitEndWriteTimestampStep>(this));
3334

3435
CurrentStep = Steps.begin();
3536
}
@@ -316,6 +317,8 @@ void TInitMetaStep::LoadMeta(const NKikimrClient::TResponse& kvResponse, const T
316317
}
317318
*/
318319
Partition()->SubDomainOutOfSpace = meta.GetSubDomainOutOfSpace();
320+
Partition()->EndWriteTimestamp = TInstant::MilliSeconds(meta.GetLastMessageWriteTimestamp());
321+
Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp;
319322
if (Partition()->IsSupportive()) {
320323
const auto& counterData = meta.GetCounterData();
321324
Partition()->BytesWrittenGrpc.SetSavedValue(counterData.GetBytesWrittenGrpc());
@@ -653,6 +656,112 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte
653656
}
654657

655658

659+
//
660+
// TInitEndWriteTimestampStep
661+
//
662+
663+
TInitEndWriteTimestampStep::TInitEndWriteTimestampStep(TInitializer* initializer)
664+
: TBaseKVStep(initializer, "TInitEndWriteTimestampStep", true) {
665+
}
666+
667+
void TInitEndWriteTimestampStep::Execute(const TActorContext &ctx) {
668+
if (Partition()->EndWriteTimestamp != TInstant::Zero() || Partition()->EndOffset <= Partition()->StartOffset) {
669+
PQ_LOG_D("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
670+
<< "' partition " << Partition()->Partition
671+
<< " skiped because already initialized.");
672+
return Done(ctx);
673+
}
674+
675+
PQ_LOG_D("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
676+
<< "' partition " << Partition()->Partition
677+
<< " EndOffset " << Partition()->EndOffset
678+
<< " StartOffset " << Partition()->StartOffset
679+
<< " .");
680+
681+
auto& head = Partition()->Head;
682+
for (auto it = head.GetBatches().rbegin(); it != head.GetBatches().rend(); ++it) {
683+
if (!it->Empty()) {
684+
Partition()->EndWriteTimestamp = head.GetBatches().back().GetLastMessageWriteTimestamp();
685+
Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp;
686+
687+
PQ_LOG_I("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
688+
<< "' partition " << Partition()->Partition
689+
<< " from head completed. Value " << Partition()->EndWriteTimestamp);
690+
691+
return Done(ctx);
692+
}
693+
}
694+
695+
if (Partition()->DataKeysBody.empty()) {
696+
PQ_LOG_I("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
697+
<< "' partition " << Partition()->Partition
698+
<< " skiped because DataKeys is empty.");
699+
700+
return Done(ctx);
701+
}
702+
703+
auto& p = Partition()->DataKeysBody.back();
704+
705+
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
706+
auto read = request->Record.AddCmdRead();
707+
read->SetKey({p.Key.Data(), p.Key.Size()});
708+
ctx.Send(Partition()->Tablet, request.Release());
709+
}
710+
711+
void TInitEndWriteTimestampStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorContext &ctx) {
712+
if (!ValidateResponse(*this, ev, ctx)) {
713+
PoisonPill(ctx);
714+
return;
715+
}
716+
717+
auto& response = ev->Get()->Record;
718+
Y_ABORT_UNLESS(1 == response.ReadResultSize());
719+
720+
auto& read = response.GetReadResult(0);
721+
Y_ABORT_UNLESS(read.HasStatus());
722+
switch(read.GetStatus()) {
723+
case NKikimrProto::OK: {
724+
auto& key = Partition()->DataKeysBody.back().Key;
725+
726+
for (TBlobIterator it(key, read.GetValue()); it.IsValid(); it.Next()) {
727+
auto b = it.GetBatch();
728+
if (!b.Empty()) {
729+
Partition()->EndWriteTimestamp = b.GetLastMessageWriteTimestamp();
730+
}
731+
}
732+
733+
PQ_LOG_I("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
734+
<< "' partition " << Partition()->Partition
735+
<< " from last blob completed. Value " << Partition()->EndWriteTimestamp);
736+
737+
Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp;
738+
739+
break;
740+
}
741+
case NKikimrProto::OVERRUN:
742+
Y_ABORT("implement overrun in readresult!!");
743+
return;
744+
case NKikimrProto::NODATA:
745+
Y_ABORT("NODATA can't be here");
746+
return;
747+
case NKikimrProto::ERROR:
748+
PQ_LOG_ERROR("tablet " << Partition()->TabletID << " HandleOnInit topic '" << TopicName()
749+
<< "' partition " << PartitionId()
750+
<< " status NKikimrProto::ERROR result message: \"" << read.GetMessage()
751+
<< " \" errorReason: \"" << response.GetErrorReason() << "\""
752+
);
753+
PoisonPill(ctx);
754+
return;
755+
default:
756+
Cerr << "ERROR " << read.GetStatus() << " message: \"" << read.GetMessage() << "\"\n";
757+
Y_ABORT("bad status");
758+
759+
};
760+
761+
Done(ctx);
762+
}
763+
764+
656765
//
657766
// TPartition
658767
//

ydb/core/persqueue/partition_init.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,13 @@ class TInitDataStep: public TBaseKVStep {
152152
void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override;
153153
};
154154

155+
class TInitEndWriteTimestampStep: public TBaseKVStep {
156+
friend class TPartitionTestWrapper;
157+
public:
158+
TInitEndWriteTimestampStep(TInitializer* initializer);
159+
160+
void Execute(const TActorContext& ctx) override;
161+
void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override;
162+
};
163+
155164
} // NKikimr::NPQ

ydb/core/persqueue/partition_monitoring.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
5959
PROPERTY("StartOffset", StartOffset);
6060
PROPERTY("EndOffset", EndOffset);
6161
PROPERTY("LastOffset", Head.GetNextOffset());
62+
PROPERTY("Last message WriteTimestamp", EndWriteTimestamp.ToRfc822String());
6263
PROPERTY("HeadOffset", Head.Offset << ", count: " << Head.GetCount());
6364
}
6465
}

ydb/core/persqueue/partition_read.cpp

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,34 @@ namespace NKikimr::NPQ {
3030

3131
static const ui32 MAX_USER_ACTS = 1000;
3232

33+
struct TReadFrom {
34+
35+
TReadFrom(ui32 maxTimeLagMs, ui64 readTimestampMs, TInstant consumerReadFromTimestamp = TInstant())
36+
: MaxTimeLagMs(maxTimeLagMs)
37+
, ReadTimestampMs(readTimestampMs)
38+
, ConsumerReadFromTimestamp(consumerReadFromTimestamp) {
39+
}
40+
41+
bool HasReadTimestamp() const {
42+
return MaxTimeLagMs > 0 || ReadTimestampMs > 0 || ConsumerReadFromTimestamp > TInstant::MilliSeconds(1);
43+
}
44+
45+
TMaybe<TInstant> AsInstant(const TActorContext& ctx) {
46+
if (!HasReadTimestamp()) {
47+
return {};
48+
}
49+
50+
TInstant timestamp = MaxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(MaxTimeLagMs) : TInstant::Zero();
51+
timestamp = Max(timestamp, TInstant::MilliSeconds(ReadTimestampMs));
52+
timestamp = Max(timestamp, ConsumerReadFromTimestamp);
53+
return timestamp;
54+
}
55+
56+
ui32 MaxTimeLagMs;
57+
ui64 ReadTimestampMs;
58+
TInstant ConsumerReadFromTimestamp;
59+
};
60+
3361
void TPartition::SendReadingFinished(const TString& consumer) {
3462
Send(Tablet, new TEvPQ::TEvReadingPartitionStatusRequest(consumer, Partition.OriginalPartitionId, TabletGeneration, ++PQRBCookie));
3563
}
@@ -133,7 +161,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
133161
};
134162

135163
for (auto request = HasDataRequests.begin(); request != HasDataRequests.end();) {
136-
if (request->Offset < EndOffset) {
164+
if (request->Offset < EndOffset && (!request->ReadTimestamp || *request->ReadTimestamp < EndWriteTimestamp)) {
137165
auto response = MakeHasDataInfoResponse(GetSizeLag(request->Offset), request->Cookie);
138166
ctx.Send(request->Sender, response.Release());
139167
} else if (!IsActive()) {
@@ -170,16 +198,19 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
170198

171199
auto cookie = record.HasCookie() ? TMaybe<ui64>(record.GetCookie()) : TMaybe<ui64>();
172200

201+
TReadFrom readFrom(record.GetMaxTimeLagMs(), record.GetReadTimestampMs(), TInstant::Zero() /* TODO */);
202+
auto readTimestamp = readFrom.AsInstant(ctx);
203+
173204
TActorId sender = ActorIdFromProto(record.GetSender());
174-
if (InitDone && EndOffset > (ui64)record.GetOffset()) { //already has data, answer right now
205+
if (InitDone && EndOffset > (ui64)record.GetOffset() && (!readTimestamp || EndWriteTimestamp > *readTimestamp)) { //already has data, answer right now
175206
auto response = MakeHasDataInfoResponse(GetSizeLag(record.GetOffset()), cookie);
176207
ctx.Send(sender, response.Release());
177208
} else if (InitDone && !IsActive()) {
178209
auto response = MakeHasDataInfoResponse(0, cookie, true);
179210
ctx.Send(sender, response.Release());
180211
} else {
181212
THasDataReq req{++HasDataReqNum, (ui64)record.GetOffset(), sender, cookie,
182-
record.HasClientId() && InitDone ? record.GetClientId() : ""};
213+
record.HasClientId() && InitDone ? record.GetClientId() : "", readTimestamp};
183214
THasDataDeadline dl{TInstant::MilliSeconds(record.GetDeadline()), req};
184215
auto res = HasDataRequests.insert(req);
185216
HasDataDeadlines.insert(dl);
@@ -763,11 +794,9 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr&& readEvent, TDuration waitQuotaTim
763794
}
764795
userInfo->ReadsInQuotaQueue--;
765796
ui64 offset = read->Offset;
766-
if (read->PartNo == 0 && (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo->ReadFromTimestamp > TInstant::MilliSeconds(1))) {
767-
TInstant timestamp = read->MaxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(read->MaxTimeLagMs) : TInstant::Zero();
768-
timestamp = Max(timestamp, TInstant::MilliSeconds(read->ReadTimestampMs));
769-
timestamp = Max(timestamp, userInfo->ReadFromTimestamp);
770-
offset = Max(GetOffsetEstimate(DataKeysBody, timestamp, Min(Head.Offset, EndOffset - 1)), offset);
797+
TReadFrom readFrom(read->MaxTimeLagMs, read->ReadTimestampMs, userInfo->ReadFromTimestamp);
798+
if (read->PartNo == 0 && readFrom.HasReadTimestamp()) {
799+
offset = Max(GetOffsetEstimate(DataKeysBody, *readFrom.AsInstant(ctx), Min(Head.Offset, EndOffset - 1)), offset);
771800
userInfo->ReadOffsetRewindSum += offset - read->Offset;
772801
}
773802

ydb/core/persqueue/partition_util.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ struct TPartition::THasDataReq {
113113
TActorId Sender;
114114
TMaybe<ui64> Cookie;
115115
TString ClientId;
116+
TMaybe<TInstant> ReadTimestamp;
116117

117118
bool operator < (const THasDataReq& req) const {
118119
return Num < req.Num;

ydb/core/persqueue/partition_write.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,8 +381,9 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
381381
void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
382382
PQ_LOG_T("TPartition::SyncMemoryStateWithKVState.");
383383

384-
if (!CompactedKeys.empty())
384+
if (!CompactedKeys.empty()) {
385385
HeadKeys.clear();
386+
}
386387

387388
if (NewHeadKey.Size > 0) {
388389
while (!HeadKeys.empty() &&
@@ -437,6 +438,8 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
437438
}
438439

439440
EndOffset = Head.GetNextOffset();
441+
EndWriteTimestamp = PendingWriteTimestamp;
442+
440443
NewHead.Clear();
441444
NewHead.Offset = EndOffset;
442445

@@ -1397,12 +1400,19 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq
13971400
Y_ABORT_UNLESS(Head.GetBatch(pp).GetPartNo() == key.GetPartNo());
13981401
for (; pp < Head.GetBatches().size(); ++pp) { //TODO - merge small batches here
13991402
Y_ABORT_UNLESS(Head.GetBatch(pp).Packed);
1400-
Head.GetBatch(pp).SerializeTo(valueD);
1403+
auto& batch = Head.GetBatch(pp);
1404+
batch.SerializeTo(valueD);
1405+
if (!batch.Empty()) {
1406+
PendingWriteTimestamp = batch.GetLastMessageWriteTimestamp();
1407+
}
14011408
}
14021409
}
14031410
for (auto& b : NewHead.GetBatches()) {
14041411
Y_ABORT_UNLESS(b.Packed);
14051412
b.SerializeTo(valueD);
1413+
if (!b.Empty()) {
1414+
PendingWriteTimestamp = b.GetLastMessageWriteTimestamp();
1415+
}
14061416
}
14071417

14081418
Y_ABORT_UNLESS(res.second >= valueD.size());

ydb/core/persqueue/read_balancer__balancing_app.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ void TBalancer::RenderApp(NApp::TNavigationBar& __navigationBar) const {
213213
}
214214
TABLER() {
215215
TABLED() { }
216-
TABLED() { __stream << "<__streamong>Total:</__streamong>"; }
216+
TABLED() { __stream << "<strong>Total:</strong>"; }
217217
TABLED() { }
218218
TABLED() { __stream << familyAllCount << " / " << activeFamilyCount << " / " << releasingFamilyCount; }
219219
TABLED() { __stream << (activePartitionCount + inactivePartitionCount + releasingPartitionCount) << " / " << activePartitionCount << " / "

0 commit comments

Comments
 (0)