Skip to content

Commit ed844e3

Browse files
authored
Merge 40a5e60 into a328b30
2 parents a328b30 + 40a5e60 commit ed844e3

21 files changed

+417
-56
lines changed

ydb/core/persqueue/blob.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,10 @@ void TBatch::Pack() {
399399
Header.SetPayloadSize(PackedData.size());
400400
}
401401

402+
for (auto& b : Blobs) {
403+
EndWriteTimestamp = std::max(EndWriteTimestamp, b.WriteTimestamp);
404+
}
405+
402406

403407
TVector<TClientBlob> tmp;
404408
Blobs.swap(tmp);
@@ -414,11 +418,14 @@ void TBatch::Unpack() {
414418
UnpackTo(&Blobs);
415419
Y_ABORT_UNLESS(InternalPartsPos.empty());
416420
for (ui32 i = 0; i < Blobs.size(); ++i) {
417-
if (!Blobs[i].IsLastPart())
421+
auto& b = Blobs[i];
422+
if (!b.IsLastPart()) {
418423
InternalPartsPos.push_back(i);
424+
}
425+
EndWriteTimestamp = std::max(EndWriteTimestamp, b.WriteTimestamp);
419426
}
420427
Y_ABORT_UNLESS(InternalPartsPos.size() == GetInternalPartsCount());
421-
428+
422429
PackedData.Clear();
423430
}
424431

@@ -978,4 +985,3 @@ bool TPartitionedBlob::IsNextPart(const TString& sourceId, const ui64 seqNo, con
978985

979986
}// NPQ
980987
}// NKikimr
981-

ydb/core/persqueue/blob.h

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ struct TClientBlob {
101101
void SerializeTo(TBuffer& buffer) const;
102102
static TClientBlob Deserialize(const char *data, ui32 size);
103103

104-
static void CheckBlob(const TKey& key, const TString& blob);
104+
static void CheckBlob(const TKey& key, const TString& blob);
105105
};
106106

107107
static constexpr const ui32 MAX_BLOB_SIZE = 8_MB;
@@ -121,6 +121,7 @@ struct TBatch {
121121
TVector<ui32> InternalPartsPos;
122122
NKikimrPQ::TBatchHeader Header;
123123
TBuffer PackedData;
124+
TInstant EndWriteTimestamp;
124125

125126
TBatch()
126127
: Packed(false)
@@ -162,27 +163,42 @@ struct TBatch {
162163
Header.SetUnpackedSize(unpackedSize);
163164
Header.SetCount(count);
164165
Header.SetInternalPartsCount(InternalPartsPos.size());
166+
167+
EndWriteTimestamp = std::max(EndWriteTimestamp, b.WriteTimestamp);
165168
}
166169

167170
ui64 GetOffset() const {
168171
return Header.GetOffset();
169172
}
173+
170174
ui16 GetPartNo() const {
171175
return Header.GetPartNo();
172176
}
177+
173178
ui32 GetUnpackedSize() const {
174179
return Header.GetUnpackedSize();
175180
}
181+
176182
ui32 GetCount() const {
177183
return Header.GetCount();
178184
}
185+
179186
ui16 GetInternalPartsCount() const {
180187
return Header.GetInternalPartsCount();
181188
}
189+
182190
bool IsGreaterThan(ui64 offset, ui16 partNo) const {
183191
return GetOffset() > offset || GetOffset() == offset && GetPartNo() > partNo;
184192
}
185193

194+
bool Empty() const {
195+
return Blobs.empty();
196+
}
197+
198+
TInstant GetEndWriteTimestamp() const {
199+
return EndWriteTimestamp;
200+
}
201+
186202
TBatch(const NKikimrPQ::TBatchHeader &header, const char* data)
187203
: Packed(true)
188204
, Header(header)
@@ -239,7 +255,7 @@ struct THead {
239255
private:
240256
std::deque<TBatch> Batches;
241257
ui16 InternalPartsCount = 0;
242-
258+
243259
friend class TPartitionedBlob;
244260

245261
class TBatchAccessor {

ydb/core/persqueue/partition.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,14 @@ ui64 TPartition::ImportantClientsMinOffset() const {
313313
return minOffset;
314314
}
315315

316+
TInstant TPartition::GetEndWriteTimestamp() const {
317+
return EndWriteTimestamp;
318+
}
319+
320+
THead& TPartition::GetHead() {
321+
return Head;
322+
}
323+
316324
void TPartition::HandleWakeup(const TActorContext& ctx) {
317325
FilterDeadlinedWrites(ctx);
318326

@@ -356,6 +364,7 @@ void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) {
356364
meta.SetStartOffset(StartOffset);
357365
meta.SetEndOffset(Max(NewHead.GetNextOffset(), EndOffset));
358366
meta.SetSubDomainOutOfSpace(SubDomainOutOfSpace);
367+
meta.SetEndWriteTimestamp(PendingWriteTimestamp.MilliSeconds());
359368

360369
if (IsSupportive()) {
361370
auto* counterData = meta.MutableCounterData();

ydb/core/persqueue/partition.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
116116
friend TInitInfoRangeStep;
117117
friend TInitDataRangeStep;
118118
friend TInitDataStep;
119+
friend TInitEndWriteTimestampStep;
119120

120121
friend TPartitionSourceManager;
121122

@@ -440,6 +441,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
440441
void HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
441442
void Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
442443

444+
ui64 GetReadOffset(ui64 offset, TMaybe<TInstant> readTimestamp) const;
443445

444446
public:
445447
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -470,6 +472,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
470472
// Minimal offset, the data from which cannot be deleted, because it is required by an important consumer
471473
ui64 ImportantClientsMinOffset() const;
472474

475+
TInstant GetEndWriteTimestamp() const; // For tests only
476+
THead& GetHead(); // For tests only
473477

474478
//Bootstrap sends kvRead
475479
//Become StateInit
@@ -666,6 +670,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
666670
// [DataKeysBody ][DataKeysHead ]
667671
ui64 StartOffset;
668672
ui64 EndOffset;
673+
TInstant EndWriteTimestamp;
674+
TInstant PendingWriteTimestamp;
669675

670676
ui64 WriteInflightSize;
671677
TActorId Tablet;

ydb/core/persqueue/partition_init.cpp

Lines changed: 116 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ TInitializer::TInitializer(TPartition* partition)
3030
Steps.push_back(MakeHolder<TInitInfoRangeStep>(this));
3131
Steps.push_back(MakeHolder<TInitDataRangeStep>(this));
3232
Steps.push_back(MakeHolder<TInitDataStep>(this));
33+
Steps.push_back(MakeHolder<TInitEndWriteTimestampStep>(this));
3334

3435
CurrentStep = Steps.begin();
3536
}
@@ -316,6 +317,8 @@ void TInitMetaStep::LoadMeta(const NKikimrClient::TResponse& kvResponse, const T
316317
}
317318
*/
318319
Partition()->SubDomainOutOfSpace = meta.GetSubDomainOutOfSpace();
320+
Partition()->EndWriteTimestamp = TInstant::MilliSeconds(meta.GetEndWriteTimestamp());
321+
Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp;
319322
if (Partition()->IsSupportive()) {
320323
const auto& counterData = meta.GetCounterData();
321324
Partition()->BytesWrittenGrpc.SetSavedValue(counterData.GetBytesWrittenGrpc());
@@ -495,7 +498,7 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons
495498
if (k.GetPartNo() > 0) ++startOffset;
496499
head.PartNo = 0;
497500
} else {
498-
Y_ABORT_UNLESS(endOffset <= k.GetOffset(), "%s", pair.GetKey().c_str());
501+
Y_ABORT_UNLESS(endOffset <= k.GetOffset(), "%d <= %d %s", endOffset, k.GetOffset(), pair.GetKey().c_str());
499502
if (endOffset < k.GetOffset()) {
500503
gapOffsets.push_back(std::make_pair(endOffset, k.GetOffset()));
501504
gapSize += k.GetOffset() - endOffset;
@@ -619,7 +622,7 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte
619622

620623
Y_ABORT_UNLESS(offset + 1 >= Partition()->StartOffset);
621624
Y_ABORT_UNLESS(offset < Partition()->EndOffset);
622-
Y_ABORT_UNLESS(size == read.GetValue().size());
625+
Y_ABORT_UNLESS(size == read.GetValue().size(), "size=%d == read.GetValue().size() = %d", size, read.GetValue().size());
623626

624627
for (TBlobIterator it(key, read.GetValue()); it.IsValid(); it.Next()) {
625628
head.AddBatch(it.GetBatch());
@@ -653,6 +656,117 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte
653656
}
654657

655658

659+
//
660+
// TInitEndWriteTimestampStep
661+
//
662+
663+
TInitEndWriteTimestampStep::TInitEndWriteTimestampStep(TInitializer* initializer)
664+
: TBaseKVStep(initializer, "TInitEndWriteTimestampStep", true) {
665+
}
666+
667+
void TInitEndWriteTimestampStep::Execute(const TActorContext &ctx) {
668+
if (Partition()->EndWriteTimestamp != TInstant::Zero() || (Partition()->Head.GetBatches().empty() && Partition()->DataKeysBody.empty())) {
669+
PQ_LOG_D("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
670+
<< "' partition " << Partition()->Partition
671+
<< " skiped because already initialized.");
672+
return Done(ctx);
673+
}
674+
675+
PQ_LOG_D("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
676+
<< "' partition " << Partition()->Partition
677+
<< ".");
678+
679+
auto& head = Partition()->Head;
680+
if (!head.GetBatches().empty()) {
681+
auto& batch = head.GetLastBatch();
682+
Y_VERIFY (batch.Packed);
683+
684+
TVector<TClientBlob> result;
685+
batch.UnpackTo(&result);
686+
Y_VERIFY(!result.empty());
687+
688+
Partition()->EndWriteTimestamp = result.back().WriteTimestamp;
689+
Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp;
690+
691+
PQ_LOG_I("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
692+
<< "' partition " << Partition()->Partition
693+
<< " from head completed. Value " << Partition()->EndWriteTimestamp);
694+
695+
return Done(ctx);
696+
}
697+
698+
if (Partition()->DataKeysBody.empty()) {
699+
PQ_LOG_I("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
700+
<< "' partition " << Partition()->Partition
701+
<< " skiped because DataKeys is empty.");
702+
703+
return Done(ctx);
704+
}
705+
706+
auto& p = Partition()->DataKeysBody.back();
707+
708+
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
709+
auto read = request->Record.AddCmdRead();
710+
read->SetKey({p.Key.Data(), p.Key.Size()});
711+
712+
ctx.Send(Partition()->Tablet, request.Release());
713+
}
714+
715+
void TInitEndWriteTimestampStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorContext &ctx) {
716+
if (!ValidateResponse(*this, ev, ctx)) {
717+
PoisonPill(ctx);
718+
return;
719+
}
720+
721+
auto& response = ev->Get()->Record;
722+
Y_ABORT_UNLESS(1 == response.ReadResultSize());
723+
724+
auto& read = response.GetReadResult(0);
725+
Y_ABORT_UNLESS(read.HasStatus());
726+
switch(read.GetStatus()) {
727+
case NKikimrProto::OK: {
728+
auto& key = Partition()->DataKeysBody.back().Key;
729+
730+
for (TBlobIterator it(key, read.GetValue()); it.IsValid(); it.Next()) {
731+
auto b = it.GetBatch();
732+
b.Unpack();
733+
if (!b.Empty()) {
734+
Partition()->EndWriteTimestamp = b.GetEndWriteTimestamp();
735+
}
736+
}
737+
738+
PQ_LOG_I("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
739+
<< "' partition " << Partition()->Partition
740+
<< " from last blob completed. Value " << Partition()->EndWriteTimestamp);
741+
742+
Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp;
743+
744+
break;
745+
}
746+
case NKikimrProto::OVERRUN:
747+
Y_ABORT("implement overrun in readresult!!");
748+
return;
749+
case NKikimrProto::NODATA:
750+
Y_ABORT("NODATA can't be here");
751+
return;
752+
case NKikimrProto::ERROR:
753+
PQ_LOG_ERROR("tablet " << Partition()->TabletID << " HandleOnInit topic '" << TopicName()
754+
<< "' partition " << PartitionId()
755+
<< " status NKikimrProto::ERROR result message: \"" << read.GetMessage()
756+
<< " \" errorReason: \"" << response.GetErrorReason() << "\""
757+
);
758+
PoisonPill(ctx);
759+
return;
760+
default:
761+
Cerr << "ERROR " << read.GetStatus() << " message: \"" << read.GetMessage() << "\"\n";
762+
Y_ABORT("bad status");
763+
764+
};
765+
766+
Done(ctx);
767+
}
768+
769+
656770
//
657771
// TPartition
658772
//

ydb/core/persqueue/partition_init.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,13 @@ class TInitDataStep: public TBaseKVStep {
152152
void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override;
153153
};
154154

155+
class TInitEndWriteTimestampStep: public TBaseKVStep {
156+
friend class TPartitionTestWrapper;
157+
public:
158+
TInitEndWriteTimestampStep(TInitializer* initializer);
159+
160+
void Execute(const TActorContext& ctx) override;
161+
void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override;
162+
};
163+
155164
} // NKikimr::NPQ

ydb/core/persqueue/partition_monitoring.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
5959
PROPERTY("StartOffset", StartOffset);
6060
PROPERTY("EndOffset", EndOffset);
6161
PROPERTY("LastOffset", Head.GetNextOffset());
62+
PROPERTY("Last message WriteTimestamp", EndWriteTimestamp.ToRfc822String());
6263
PROPERTY("HeadOffset", Head.Offset << ", count: " << Head.GetCount());
6364
}
6465
}

0 commit comments

Comments
 (0)