@@ -28,6 +28,7 @@ TInitializer::TInitializer(TPartition* partition)
2828 Steps.push_back (MakeHolder<TInitInfoRangeStep>(this ));
2929 Steps.push_back (MakeHolder<TInitDataRangeStep>(this ));
3030 Steps.push_back (MakeHolder<TInitDataStep>(this ));
31+ Steps.push_back (MakeHolder<TInitEndWriteTimestampStep>(this ));
3132
3233 CurrentStep = Steps.begin ();
3334}
@@ -311,14 +312,14 @@ void TInitMetaStep::LoadMeta(const NKikimrClient::TResponse& kvResponse, const T
311312 bool res = meta.ParseFromString (response.GetValue ());
312313 Y_ABORT_UNLESS (res);
313314
314- /* Bring back later, when switch to 21-2 will be unable
315- StartOffset = meta.GetStartOffset();
316- EndOffset = meta.GetEndOffset();
317- if (StartOffset == EndOffset) {
318- NewHead.Offset = Head.Offset = EndOffset;
319- }
320- */
315+ Partition ()->StartOffset = meta.GetStartOffset ();
316+ Partition ()->EndOffset = meta.GetEndOffset ();
317+ if (Partition ()->StartOffset == Partition ()->EndOffset ) {
318+ Partition ()->NewHead .Offset = Partition ()->Head .Offset = Partition ()->EndOffset ;
319+ }
321320 Partition ()->SubDomainOutOfSpace = meta.GetSubDomainOutOfSpace ();
321+ Partition ()->EndWriteTimestamp = TInstant::MilliSeconds (meta.GetEndWriteTimestamp ());
322+ Partition ()->PendingWriteTimestamp = Partition ()->EndWriteTimestamp ;
322323 if (Partition ()->IsSupportive ()) {
323324 const auto & counterData = meta.GetCounterData ();
324325 Partition ()->BytesWrittenGrpc .SetSavedValue (counterData.GetBytesWrittenGrpc ());
@@ -503,7 +504,7 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons
503504 if (k.GetPartNo () > 0 ) ++startOffset;
504505 head.PartNo = 0 ;
505506 } else {
506- Y_ABORT_UNLESS (endOffset <= k.GetOffset (), " %s " , pair.GetKey ().c_str ());
507+ Y_ABORT_UNLESS (endOffset <= k.GetOffset (), " %" PRIu64 " <= % " PRIu64 " %s " , endOffset, k. GetOffset () , pair.GetKey ().c_str ());
507508 if (endOffset < k.GetOffset ()) {
508509 gapOffsets.push_back (std::make_pair (endOffset, k.GetOffset ()));
509510 gapSize += k.GetOffset () - endOffset;
@@ -631,7 +632,7 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte
631632
632633 Y_ABORT_UNLESS (offset + 1 >= Partition ()->StartOffset );
633634 Y_ABORT_UNLESS (offset < Partition ()->EndOffset );
634- Y_ABORT_UNLESS (size == read.GetValue ().size ());
635+ Y_ABORT_UNLESS (size == read.GetValue ().size (), " size=% " PRIu32 " == read.GetValue().size() =% " PRIu64, size, read. GetValue (). size () );
635636
636637 for (TBlobIterator it (key, read.GetValue ()); it.IsValid (); it.Next ()) {
637638 head.AddBatch (it.GetBatch ());
@@ -667,6 +668,43 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte
667668}
668669
669670
671+ //
672+ // TInitEndWriteTimestampStep
673+ //
674+
675+ TInitEndWriteTimestampStep::TInitEndWriteTimestampStep (TInitializer* initializer)
676+ : TInitializerStep(initializer, " TInitEndWriteTimestampStep" , true ) {
677+ }
678+
679+ void TInitEndWriteTimestampStep::Execute (const TActorContext &ctx) {
680+ if (Partition ()->EndWriteTimestamp != TInstant::Zero () || (Partition ()->HeadKeys .empty () && Partition ()->DataKeysBody .empty ())) {
681+ LOG_ERROR_S (ctx, NKikimrServices::PERSQUEUE,
682+ " Initializing EndWriteTimestamp of the topic '" << Partition ()->TopicName ()
683+ << " ' partition " << Partition ()->Partition
684+ << " skiped because already initialized." );
685+ return Done (ctx);
686+ }
687+
688+ TDataKey* lastKey = nullptr ;
689+ if (!Partition ()->HeadKeys .empty ()) {
690+ lastKey = &Partition ()->HeadKeys .back ();
691+ } else if (!Partition ()->DataKeysBody .empty ()) {
692+ lastKey = &Partition ()->DataKeysBody .back ();
693+ }
694+
695+ if (lastKey) {
696+ Partition ()->EndWriteTimestamp = lastKey->Timestamp ;
697+ Partition ()->PendingWriteTimestamp = Partition ()->EndWriteTimestamp ;
698+ }
699+
700+ LOG_ERROR_S (ctx, NKikimrServices::PERSQUEUE,
701+ " Initializing EndWriteTimestamp of the topic '" << Partition ()->TopicName ()
702+ << " ' partition " << Partition ()->Partition
703+ << " from keys completed. Value " << Partition ()->EndWriteTimestamp );
704+
705+ return Done (ctx);
706+ }
707+
670708//
671709// TPartition
672710//
0 commit comments