Skip to content

Commit 1f341b9

Browse files
authored
Merge aeb937d into f5beef5
2 parents f5beef5 + aeb937d commit 1f341b9

File tree

16 files changed

+291
-36
lines changed

16 files changed

+291
-36
lines changed

ydb/core/persqueue/blob.h

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ struct TClientBlob {
101101
void SerializeTo(TBuffer& buffer) const;
102102
static TClientBlob Deserialize(const char *data, ui32 size);
103103

104-
static void CheckBlob(const TKey& key, const TString& blob);
104+
static void CheckBlob(const TKey& key, const TString& blob);
105105
};
106106

107107
static constexpr const ui32 MAX_BLOB_SIZE = 8_MB;
@@ -167,22 +167,35 @@ struct TBatch {
167167
ui64 GetOffset() const {
168168
return Header.GetOffset();
169169
}
170+
170171
ui16 GetPartNo() const {
171172
return Header.GetPartNo();
172173
}
174+
173175
ui32 GetUnpackedSize() const {
174176
return Header.GetUnpackedSize();
175177
}
178+
176179
ui32 GetCount() const {
177180
return Header.GetCount();
178181
}
182+
179183
ui16 GetInternalPartsCount() const {
180184
return Header.GetInternalPartsCount();
181185
}
186+
182187
bool IsGreaterThan(ui64 offset, ui16 partNo) const {
183188
return GetOffset() > offset || GetOffset() == offset && GetPartNo() > partNo;
184189
}
185190

191+
bool Empty() const {
192+
return Blobs.empty();
193+
}
194+
195+
TInstant GetLastMessageWriteTimestamp() const {
196+
return Blobs.back().WriteTimestamp;
197+
}
198+
186199
TBatch(const NKikimrPQ::TBatchHeader &header, const char* data)
187200
: Packed(true)
188201
, Header(header)
@@ -239,7 +252,7 @@ struct THead {
239252
private:
240253
std::deque<TBatch> Batches;
241254
ui16 InternalPartsCount = 0;
242-
255+
243256
friend class TPartitionedBlob;
244257

245258
class TBatchAccessor {

ydb/core/persqueue/partition.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) {
356356
meta.SetStartOffset(StartOffset);
357357
meta.SetEndOffset(Max(NewHead.GetNextOffset(), EndOffset));
358358
meta.SetSubDomainOutOfSpace(SubDomainOutOfSpace);
359+
meta.SetLastMessageWriteTimestamp(PendingWriteTimestamp.MilliSeconds());
359360

360361
if (IsSupportive()) {
361362
auto* counterData = meta.MutableCounterData();

ydb/core/persqueue/partition.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
116116
friend TInitInfoRangeStep;
117117
friend TInitDataRangeStep;
118118
friend TInitDataStep;
119+
friend TInitEndWriteTimestampStep;
119120

120121
friend TPartitionSourceManager;
121122

@@ -440,6 +441,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
440441
void HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
441442
void Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
442443

444+
ui64 GetReadOffset(ui64 offset, TMaybe<TInstant> readTimestamp) const;
443445

444446
public:
445447
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -666,6 +668,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
666668
// [DataKeysBody ][DataKeysHead ]
667669
ui64 StartOffset;
668670
ui64 EndOffset;
671+
TInstant EndWriteTimestamp;
672+
TInstant PendingWriteTimestamp;
669673

670674
ui64 WriteInflightSize;
671675
TActorId Tablet;

ydb/core/persqueue/partition_init.cpp

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ TInitializer::TInitializer(TPartition* partition)
3030
Steps.push_back(MakeHolder<TInitInfoRangeStep>(this));
3131
Steps.push_back(MakeHolder<TInitDataRangeStep>(this));
3232
Steps.push_back(MakeHolder<TInitDataStep>(this));
33+
Steps.push_back(MakeHolder<TInitEndWriteTimestampStep>(this));
3334

3435
CurrentStep = Steps.begin();
3536
}
@@ -316,6 +317,8 @@ void TInitMetaStep::LoadMeta(const NKikimrClient::TResponse& kvResponse, const T
316317
}
317318
*/
318319
Partition()->SubDomainOutOfSpace = meta.GetSubDomainOutOfSpace();
320+
Partition()->EndWriteTimestamp = TInstant::MilliSeconds(meta.GetLastMessageWriteTimestamp());
321+
Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp;
319322
if (Partition()->IsSupportive()) {
320323
const auto& counterData = meta.GetCounterData();
321324
Partition()->BytesWrittenGrpc.SetSavedValue(counterData.GetBytesWrittenGrpc());
@@ -653,6 +656,112 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte
653656
}
654657

655658

659+
//
660+
// TInitEndWriteTimestampStep
661+
//
662+
663+
TInitEndWriteTimestampStep::TInitEndWriteTimestampStep(TInitializer* initializer)
664+
: TBaseKVStep(initializer, "TInitEndWriteTimestampStep", true) {
665+
}
666+
667+
void TInitEndWriteTimestampStep::Execute(const TActorContext &ctx) {
668+
if (Partition()->EndWriteTimestamp != TInstant::Zero() || (Partition()->HeadKeys.empty() && Partition()->DataKeysBody.empty())) {
669+
PQ_LOG_D("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
670+
<< "' partition " << Partition()->Partition
671+
<< " skiped because already initialized.");
672+
return Done(ctx);
673+
}
674+
675+
PQ_LOG_D("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
676+
<< "' partition " << Partition()->Partition
677+
<< ".");
678+
679+
auto& head = Partition()->Head;
680+
for (auto it = head.GetBatches().rbegin(); it != head.GetBatches().rend(); ++it) {
681+
if (!it->Empty()) {
682+
Partition()->EndWriteTimestamp = head.GetBatches().back().GetLastMessageWriteTimestamp();
683+
Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp;
684+
685+
PQ_LOG_I("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
686+
<< "' partition " << Partition()->Partition
687+
<< " from head completed. Value " << Partition()->EndWriteTimestamp);
688+
689+
return Done(ctx);
690+
}
691+
}
692+
693+
if (Partition()->DataKeysBody.empty()) {
694+
PQ_LOG_I("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
695+
<< "' partition " << Partition()->Partition
696+
<< " skiped because DataKeys is empty.");
697+
698+
return Done(ctx);
699+
}
700+
701+
auto& p = Partition()->DataKeysBody.back();
702+
703+
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
704+
auto read = request->Record.AddCmdRead();
705+
read->SetKey({p.Key.Data(), p.Key.Size()});
706+
707+
ctx.Send(Partition()->Tablet, request.Release());
708+
}
709+
710+
void TInitEndWriteTimestampStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorContext &ctx) {
711+
if (!ValidateResponse(*this, ev, ctx)) {
712+
PoisonPill(ctx);
713+
return;
714+
}
715+
716+
auto& response = ev->Get()->Record;
717+
Y_ABORT_UNLESS(1 == response.ReadResultSize());
718+
719+
auto& read = response.GetReadResult(0);
720+
Y_ABORT_UNLESS(read.HasStatus());
721+
switch(read.GetStatus()) {
722+
case NKikimrProto::OK: {
723+
auto& key = Partition()->DataKeysBody.back().Key;
724+
725+
for (TBlobIterator it(key, read.GetValue()); it.IsValid(); it.Next()) {
726+
auto b = it.GetBatch();
727+
b.Unpack();
728+
if (!b.Empty()) {
729+
Partition()->EndWriteTimestamp = b.GetLastMessageWriteTimestamp();
730+
}
731+
}
732+
733+
PQ_LOG_I("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
734+
<< "' partition " << Partition()->Partition
735+
<< " from last blob completed. Value " << Partition()->EndWriteTimestamp);
736+
737+
Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp;
738+
739+
break;
740+
}
741+
case NKikimrProto::OVERRUN:
742+
Y_ABORT("implement overrun in readresult!!");
743+
return;
744+
case NKikimrProto::NODATA:
745+
Y_ABORT("NODATA can't be here");
746+
return;
747+
case NKikimrProto::ERROR:
748+
PQ_LOG_ERROR("tablet " << Partition()->TabletID << " HandleOnInit topic '" << TopicName()
749+
<< "' partition " << PartitionId()
750+
<< " status NKikimrProto::ERROR result message: \"" << read.GetMessage()
751+
<< " \" errorReason: \"" << response.GetErrorReason() << "\""
752+
);
753+
PoisonPill(ctx);
754+
return;
755+
default:
756+
Cerr << "ERROR " << read.GetStatus() << " message: \"" << read.GetMessage() << "\"\n";
757+
Y_ABORT("bad status");
758+
759+
};
760+
761+
Done(ctx);
762+
}
763+
764+
656765
//
657766
// TPartition
658767
//

ydb/core/persqueue/partition_init.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,13 @@ class TInitDataStep: public TBaseKVStep {
152152
void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override;
153153
};
154154

155+
class TInitEndWriteTimestampStep: public TBaseKVStep {
156+
friend class TPartitionTestWrapper;
157+
public:
158+
TInitEndWriteTimestampStep(TInitializer* initializer);
159+
160+
void Execute(const TActorContext& ctx) override;
161+
void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override;
162+
};
163+
155164
} // NKikimr::NPQ

ydb/core/persqueue/partition_monitoring.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
5959
PROPERTY("StartOffset", StartOffset);
6060
PROPERTY("EndOffset", EndOffset);
6161
PROPERTY("LastOffset", Head.GetNextOffset());
62+
PROPERTY("Last message WriteTimestamp", EndWriteTimestamp.ToRfc822String());
6263
PROPERTY("HeadOffset", Head.Offset << ", count: " << Head.GetCount());
6364
}
6465
}

ydb/core/persqueue/partition_read.cpp

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,24 @@ namespace NKikimr::NPQ {
3030

3131
static const ui32 MAX_USER_ACTS = 1000;
3232

33+
TMaybe<TInstant> GetReadFrom(ui32 maxTimeLagMs, ui64 readTimestampMs, TInstant consumerReadFromTimestamp, const TActorContext& ctx) {
34+
if (!(maxTimeLagMs > 0 || readTimestampMs > 0 || consumerReadFromTimestamp > TInstant::MilliSeconds(1))) {
35+
return {};
36+
}
37+
38+
TInstant timestamp = maxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(maxTimeLagMs) : TInstant::Zero();
39+
timestamp = Max(timestamp, TInstant::MilliSeconds(readTimestampMs));
40+
timestamp = Max(timestamp, consumerReadFromTimestamp);
41+
return timestamp;
42+
}
43+
44+
ui64 TPartition::GetReadOffset(ui64 offset, TMaybe<TInstant> readTimestamp) const {
45+
if (!readTimestamp) {
46+
return offset;
47+
}
48+
return Max(GetOffsetEstimate(DataKeysBody, *readTimestamp, Min(Head.Offset, EndOffset - 1)), offset);
49+
}
50+
3351
void TPartition::SendReadingFinished(const TString& consumer) {
3452
Send(Tablet, new TEvPQ::TEvReadingPartitionStatusRequest(consumer, Partition.OriginalPartitionId, TabletGeneration, ++PQRBCookie));
3553
}
@@ -170,15 +188,18 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
170188

171189
auto cookie = record.HasCookie() ? TMaybe<ui64>(record.GetCookie()) : TMaybe<ui64>();
172190

191+
auto readTimestamp = GetReadFrom(record.GetMaxTimeLagMs(), record.GetReadTimestampMs(), TInstant::Zero() /* TODO */, ctx);
192+
173193
TActorId sender = ActorIdFromProto(record.GetSender());
174-
if (InitDone && EndOffset > (ui64)record.GetOffset()) { //already has data, answer right now
194+
if (InitDone && EndOffset > (ui64)record.GetOffset() && (!readTimestamp || EndWriteTimestamp >= *readTimestamp)) { //already has data, answer right now
175195
auto response = MakeHasDataInfoResponse(GetSizeLag(record.GetOffset()), cookie);
176196
ctx.Send(sender, response.Release());
177197
} else if (InitDone && !IsActive()) {
178198
auto response = MakeHasDataInfoResponse(0, cookie, true);
179199
ctx.Send(sender, response.Release());
180200
} else {
181-
THasDataReq req{++HasDataReqNum, (ui64)record.GetOffset(), sender, cookie,
201+
202+
THasDataReq req{++HasDataReqNum, GetReadOffset((ui64)record.GetOffset(), readTimestamp), sender, cookie,
182203
record.HasClientId() && InitDone ? record.GetClientId() : ""};
183204
THasDataDeadline dl{TInstant::MilliSeconds(record.GetDeadline()), req};
184205
auto res = HasDataRequests.insert(req);
@@ -763,11 +784,10 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr&& readEvent, TDuration waitQuotaTim
763784
}
764785
userInfo->ReadsInQuotaQueue--;
765786
ui64 offset = read->Offset;
766-
if (read->PartNo == 0 && (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo->ReadFromTimestamp > TInstant::MilliSeconds(1))) {
767-
TInstant timestamp = read->MaxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(read->MaxTimeLagMs) : TInstant::Zero();
768-
timestamp = Max(timestamp, TInstant::MilliSeconds(read->ReadTimestampMs));
769-
timestamp = Max(timestamp, userInfo->ReadFromTimestamp);
770-
offset = Max(GetOffsetEstimate(DataKeysBody, timestamp, Min(Head.Offset, EndOffset - 1)), offset);
787+
788+
auto readTimestamp = GetReadFrom(read->MaxTimeLagMs, read->ReadTimestampMs, userInfo->ReadFromTimestamp, ctx);
789+
if (read->PartNo == 0 && readTimestamp) {
790+
offset = GetReadOffset(offset, readTimestamp);
771791
userInfo->ReadOffsetRewindSum += offset - read->Offset;
772792
}
773793

ydb/core/persqueue/partition_write.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,8 +381,9 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
381381
void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
382382
PQ_LOG_T("TPartition::SyncMemoryStateWithKVState.");
383383

384-
if (!CompactedKeys.empty())
384+
if (!CompactedKeys.empty()) {
385385
HeadKeys.clear();
386+
}
386387

387388
if (NewHeadKey.Size > 0) {
388389
while (!HeadKeys.empty() &&
@@ -437,6 +438,8 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
437438
}
438439

439440
EndOffset = Head.GetNextOffset();
441+
EndWriteTimestamp = PendingWriteTimestamp;
442+
440443
NewHead.Clear();
441444
NewHead.Offset = EndOffset;
442445

@@ -1397,12 +1400,19 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq
13971400
Y_ABORT_UNLESS(Head.GetBatch(pp).GetPartNo() == key.GetPartNo());
13981401
for (; pp < Head.GetBatches().size(); ++pp) { //TODO - merge small batches here
13991402
Y_ABORT_UNLESS(Head.GetBatch(pp).Packed);
1400-
Head.GetBatch(pp).SerializeTo(valueD);
1403+
auto& batch = Head.GetBatch(pp);
1404+
batch.SerializeTo(valueD);
1405+
if (!batch.Empty()) {
1406+
PendingWriteTimestamp = batch.GetLastMessageWriteTimestamp();
1407+
}
14011408
}
14021409
}
14031410
for (auto& b : NewHead.GetBatches()) {
14041411
Y_ABORT_UNLESS(b.Packed);
14051412
b.SerializeTo(valueD);
1413+
if (!b.Empty()) {
1414+
PendingWriteTimestamp = b.GetLastMessageWriteTimestamp();
1415+
}
14061416
}
14071417

14081418
Y_ABORT_UNLESS(res.second >= valueD.size());

ydb/core/persqueue/read_balancer__balancing_app.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ void TBalancer::RenderApp(NApp::TNavigationBar& __navigationBar) const {
213213
}
214214
TABLER() {
215215
TABLED() { }
216-
TABLED() { __stream << "<__streamong>Total:</__streamong>"; }
216+
TABLED() { __stream << "<strong>Total:</strong>"; }
217217
TABLED() { }
218218
TABLED() { __stream << familyAllCount << " / " << activeFamilyCount << " / " << releasingFamilyCount; }
219219
TABLED() { __stream << (activePartitionCount + inactivePartitionCount + releasingPartitionCount) << " / " << activePartitionCount << " / "

0 commit comments

Comments
 (0)