@@ -1041,16 +1041,21 @@ void TPersQueue::InitTxWrites(const NKikimrPQ::TTabletTxInfo& info,
10411041 for (size_t i = 0 ; i != info.TxWritesSize (); ++i) {
10421042 auto & txWrite = info.GetTxWrites (i);
10431043 const TWriteId writeId = GetWriteId (txWrite);
1044- ui32 partitionId = txWrite.GetOriginalPartitionId ();
1045- TPartitionId shadowPartitionId (partitionId, writeId, txWrite.GetInternalPartitionId ());
10461044
1047- TxWrites[writeId].Partitions .emplace (partitionId, shadowPartitionId);
1045+ TTxWriteInfo& writeInfo = TxWrites[writeId];
1046+ if (txWrite.HasOriginalPartitionId ()) {
1047+ ui32 partitionId = txWrite.GetOriginalPartitionId ();
1048+ TPartitionId shadowPartitionId (partitionId, writeId, txWrite.GetInternalPartitionId ());
10481049
1049- AddSupportivePartition (shadowPartitionId);
1050- CreateSupportivePartitionActor (shadowPartitionId, ctx);
1051- SubscribeWriteId (writeId, ctx);
1050+ writeInfo.Partitions .emplace (partitionId, shadowPartitionId);
1051+
1052+ AddSupportivePartition (shadowPartitionId);
1053+ CreateSupportivePartitionActor (shadowPartitionId, ctx);
1054+
1055+ NextSupportivePartitionId = Max (NextSupportivePartitionId, shadowPartitionId.InternalPartitionId + 1 );
1056+ }
10521057
1053- NextSupportivePartitionId = Max (NextSupportivePartitionId, shadowPartitionId. InternalPartitionId + 1 );
1058+ SubscribeWriteId (writeId, ctx );
10541059 }
10551060
10561061 NewSupportivePartitions.clear ();
@@ -3283,7 +3288,7 @@ bool TPersQueue::CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& ope
32833288 TPartitionId partitionId (operation.GetPartitionId (),
32843289 writeId,
32853290 operation.GetSupportivePartition ());
3286- PQ_LOG_D (" partitionId= " << partitionId);
3291+ PQ_LOG_D (" PartitionId " << partitionId << " for WriteId " << writeId );
32873292 return Partitions.contains (partitionId);
32883293}
32893294
@@ -3294,7 +3299,6 @@ bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBod
32943299 }
32953300
32963301 const TWriteId writeId = GetWriteId (txBody);
3297- PQ_LOG_D (" writeId=" << writeId);
32983302
32993303 for (auto & operation : txBody.GetOperations ()) {
33003304 auto isWrite = [](const NKikimrPQ::TPartitionOperation& o) {
@@ -3320,7 +3324,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33203324 const NKikimrPQ::TDataTransaction& txBody = event.GetData ();
33213325
33223326 if (TabletState != NKikimrPQ::ENormal) {
3323- PQ_LOG_D (" invalid PQ tablet state (" << NKikimrPQ::ETabletState_Name (TabletState) << " )" );
3327+ PQ_LOG_D (" TxId " << event. GetTxId () << " invalid PQ tablet state (" << NKikimrPQ::ETabletState_Name (TabletState) << " )" );
33243328 SendProposeTransactionAbort (ActorIdFromProto (event.GetSourceActor ()),
33253329 event.GetTxId (),
33263330 NKikimrPQ::TError::ERROR,
@@ -3334,7 +3338,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33343338 //
33353339
33363340 if (txBody.OperationsSize () <= 0 ) {
3337- PQ_LOG_D (" empty list of operations" );
3341+ PQ_LOG_D (" TxId " << event. GetTxId () << " empty list of operations" );
33383342 SendProposeTransactionAbort (ActorIdFromProto (event.GetSourceActor ()),
33393343 event.GetTxId (),
33403344 NKikimrPQ::TError::BAD_REQUEST,
@@ -3344,7 +3348,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33443348 }
33453349
33463350 if (!CheckTxWriteOperations (txBody)) {
3347- PQ_LOG_D (" invalid WriteId " << txBody.GetWriteId ());
3351+ PQ_LOG_D (" TxId " << event. GetTxId () << " invalid WriteId " << txBody.GetWriteId ());
33483352 SendProposeTransactionAbort (ActorIdFromProto (event.GetSourceActor ()),
33493353 event.GetTxId (),
33503354 NKikimrPQ::TError::BAD_REQUEST,
@@ -3353,9 +3357,36 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33533357 return ;
33543358 }
33553359
3360+ if (txBody.HasWriteId ()) {
3361+ const TWriteId writeId = GetWriteId (txBody);
3362+ if (!TxWrites.contains (writeId)) {
3363+ PQ_LOG_D (" TxId " << event.GetTxId () << " unknown WriteId " << writeId);
3364+ SendProposeTransactionAbort (ActorIdFromProto (event.GetSourceActor ()),
3365+ event.GetTxId (),
3366+ NKikimrPQ::TError::BAD_REQUEST,
3367+ " unknown WriteId" ,
3368+ ctx);
3369+ return ;
3370+ }
3371+
3372+ TTxWriteInfo& writeInfo = TxWrites.at (writeId);
3373+ if (writeInfo.Deleting ) {
3374+ PQ_LOG_W (" TxId " << event.GetTxId () << " WriteId " << writeId << " will be deleted" );
3375+ SendProposeTransactionAbort (ActorIdFromProto (event.GetSourceActor ()),
3376+ event.GetTxId (),
3377+ NKikimrPQ::TError::BAD_REQUEST,
3378+ " WriteId will be deleted" ,
3379+ ctx);
3380+ return ;
3381+ }
3382+
3383+ writeInfo.TxId = event.GetTxId ();
3384+ PQ_LOG_D (" TxId " << event.GetTxId () << " has WriteId " << writeId);
3385+ }
3386+
33563387 TMaybe<TPartitionId> partitionId = FindPartitionId (txBody);
33573388 if (!partitionId.Defined ()) {
3358- PQ_LOG_D ( " unknown partition for WriteId " << txBody.GetWriteId ());
3389+ PQ_LOG_W ( " TxId " << event. GetTxId () << " unknown partition for WriteId " << txBody.GetWriteId ());
33593390 SendProposeTransactionAbort (ActorIdFromProto (event.GetSourceActor ()),
33603391 event.GetTxId (),
33613392 NKikimrPQ::TError::INTERNAL,
@@ -3568,13 +3599,15 @@ bool TPersQueue::CanProcessTxWrites() const
35683599void TPersQueue::SubscribeWriteId (const TWriteId& writeId,
35693600 const TActorContext& ctx)
35703601{
3602+ PQ_LOG_D (" send TEvSubscribeLock for WriteId " << writeId);
35713603 ctx.Send (NLongTxService::MakeLongTxServiceID (writeId.NodeId ),
35723604 new NLongTxService::TEvLongTxService::TEvSubscribeLock (writeId.KeyId , writeId.NodeId ));
35733605}
35743606
35753607void TPersQueue::UnsubscribeWriteId (const TWriteId& writeId,
35763608 const TActorContext& ctx)
35773609{
3610+ PQ_LOG_D (" send TEvUnsubscribeLock for WriteId " << writeId);
35783611 ctx.Send (NLongTxService::MakeLongTxServiceID (writeId.NodeId ),
35793612 new NLongTxService::TEvLongTxService::TEvUnsubscribeLock (writeId.KeyId , writeId.NodeId ));
35803613}
@@ -3876,11 +3909,16 @@ void TPersQueue::SavePlanStep(NKikimrPQ::TTabletTxInfo& info)
38763909void TPersQueue::SaveTxWrites (NKikimrPQ::TTabletTxInfo& info)
38773910{
38783911 for (auto & [writeId, write] : TxWrites) {
3879- for ( auto [partitionId, shadowPartitionId] : write.Partitions ) {
3912+ if ( write.Partitions . empty () ) {
38803913 auto * txWrite = info.MutableTxWrites ()->Add ();
38813914 SetWriteId (*txWrite, writeId);
3882- txWrite->SetOriginalPartitionId (partitionId);
3883- txWrite->SetInternalPartitionId (shadowPartitionId.InternalPartitionId );
3915+ } else {
3916+ for (auto [partitionId, shadowPartitionId] : write.Partitions ) {
3917+ auto * txWrite = info.MutableTxWrites ()->Add ();
3918+ SetWriteId (*txWrite, writeId);
3919+ txWrite->SetOriginalPartitionId (partitionId);
3920+ txWrite->SetInternalPartitionId (shadowPartitionId.InternalPartitionId );
3921+ }
38843922 }
38853923 }
38863924
@@ -4325,6 +4363,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
43254363
43264364 WriteTx (tx, NKikimrPQ::TTransaction::EXECUTED);
43274365
4366+ PQ_LOG_D (" delete partitions for TxId " << tx.TxId );
4367+ BeginDeletePartitions (tx);
4368+
43284369 tx.State = NKikimrPQ::TTransaction::EXECUTED;
43294370 PQ_LOG_D (" TxId " << tx.TxId <<
43304371 " , NewState " << NKikimrPQ::TTransaction_EState_Name (tx.State ));
@@ -4343,8 +4384,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
43434384
43444385 case NKikimrPQ::TTransaction::WAIT_RS_ACKS:
43454386 PQ_LOG_D (" HaveAllRecipientsReceive " << tx.HaveAllRecipientsReceive () <<
4346- " , WriteIdIsDisabled " << WriteIdIsDisabled (tx.WriteId ));
4347- if (tx.HaveAllRecipientsReceive () && WriteIdIsDisabled (tx.WriteId )) {
4387+ " , AllSupportivePartitionsHaveBeenDeleted " << AllSupportivePartitionsHaveBeenDeleted (tx.WriteId ));
4388+ if (tx.HaveAllRecipientsReceive () && AllSupportivePartitionsHaveBeenDeleted (tx.WriteId )) {
43484389 DeleteTx (tx);
43494390 // implicitly switch to the state DELETING
43504391 }
@@ -4369,7 +4410,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
43694410 }
43704411}
43714412
4372- bool TPersQueue::WriteIdIsDisabled (const TMaybe<TWriteId>& writeId) const
4413+ bool TPersQueue::AllSupportivePartitionsHaveBeenDeleted (const TMaybe<TWriteId>& writeId) const
43734414{
43744415 if (!writeId.Defined ()) {
43754416 return true ;
@@ -4380,26 +4421,21 @@ bool TPersQueue::WriteIdIsDisabled(const TMaybe<TWriteId>& writeId) const
43804421 TabletID (), writeId->NodeId , writeId->KeyId );
43814422 const TTxWriteInfo& writeInfo = TxWrites.at (*writeId);
43824423
4383- bool disabled =
4384- (writeInfo.LongTxSubscriptionStatus != NKikimrLongTxService::TEvLockStatus::STATUS_SUBSCRIBED) &&
4424+ PQ_LOG_D (" WriteId " << *writeId <<
4425+ " Partitions.size=" << writeInfo.Partitions .size ());
4426+ bool deleted =
43854427 writeInfo.Partitions .empty ()
43864428 ;
43874429
4388- PQ_LOG_D (" WriteId " << *writeId << " is " << (disabled ? " disabled" : " enabled" ));
4389-
4390- return disabled;
4430+ return deleted;
43914431}
43924432
43934433void TPersQueue::DeleteWriteId (const TMaybe<TWriteId>& writeId)
43944434{
4395- if (!writeId.Defined ()) {
4435+ if (!writeId.Defined () || !TxWrites. contains (*writeId) ) {
43964436 return ;
43974437 }
43984438
4399- Y_ABORT_UNLESS (TxWrites.contains (*writeId),
4400- " PQ %" PRIu64 " , WriteId {%" PRIu64 " , %" PRIu64 " }" ,
4401- TabletID (), writeId->NodeId , writeId->KeyId );
4402-
44034439 PQ_LOG_D (" delete WriteId " << *writeId);
44044440 TxWrites.erase (*writeId);
44054441}
@@ -4729,7 +4765,7 @@ void TPersQueue::ProcessCheckPartitionStatusRequests(const TPartitionId& partiti
47294765 }
47304766}
47314767
4732- void TPersQueue::Handle (NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& ctx )
4768+ void TPersQueue::Handle (NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev)
47334769{
47344770 PQ_LOG_D (" Handle TEvLongTxService::TEvLockStatus " << ev->Get ()->Record .ShortDebugString ());
47354771
@@ -4750,22 +4786,14 @@ void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& e
47504786 return ;
47514787 }
47524788
4753- if (!writeInfo.TxId .Defined ()) {
4754- PQ_LOG_D (" delete write info for WriteId " << writeId);
4755- // the message TEvProposeTransaction will not come anymore
4756- BeginDeletePartitions (writeInfo);
4789+ if (writeInfo.TxId .Defined ()) {
4790+ // the message `TEvProposeTransaction` has already arrived
4791+ PQ_LOG_D (" there is already a transaction TxId " << writeInfo.TxId << " for WriteId " << writeId);
47574792 return ;
47584793 }
47594794
4760- ui64 txId = *writeInfo.TxId ;
4761- PQ_LOG_D (" delete write info for WriteId " << writeId << " and TxId " << txId);
4762-
4763- auto * tx = GetTransaction (ctx, txId);
4764- if (!tx ||
4765- (tx->State == NKikimrPQ::TTransaction::EXECUTED) ||
4766- (tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS)) {
4767- BeginDeletePartitions (writeInfo);
4768- }
4795+ PQ_LOG_D (" delete partitions for WriteId " << writeId);
4796+ BeginDeletePartitions (writeInfo);
47694797}
47704798
47714799void TPersQueue::Handle (TEvPQ::TEvReadingPartitionStatusRequest::TPtr& ev, const TActorContext& ctx)
@@ -4865,6 +4893,16 @@ void TPersQueue::BeginDeletePartitions(TTxWriteInfo& writeInfo)
48654893 writeInfo.Deleting = true ;
48664894}
48674895
4896+ void TPersQueue::BeginDeletePartitions (const TDistributedTransaction& tx)
4897+ {
4898+ if (!tx.WriteId .Defined () || !TxWrites.contains (*tx.WriteId )) {
4899+ return ;
4900+ }
4901+
4902+ TTxWriteInfo& writeInfo = TxWrites.at (*tx.WriteId );
4903+ BeginDeletePartitions (writeInfo);
4904+ }
4905+
48684906TString TPersQueue::LogPrefix () const {
48694907 return TStringBuilder () << " [PQ: " << TabletID () << " ] " ;
48704908}
@@ -4919,7 +4957,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
49194957 HFuncTraced (TEvMediatorTimecast::TEvRegisterTabletResult, Handle);
49204958 HFuncTraced (TEvPQ::TEvCheckPartitionStatusRequest, Handle);
49214959 HFuncTraced (TEvPQ::TEvPartitionScaleStatusChanged, Handle);
4922- HFuncTraced (NLongTxService::TEvLongTxService::TEvLockStatus, Handle);
4960+ hFuncTraced (NLongTxService::TEvLongTxService::TEvLockStatus, Handle);
49234961 HFuncTraced (TEvPQ::TEvReadingPartitionStatusRequest, Handle);
49244962 HFuncTraced (TEvPQ::TEvDeletePartitionDone, Handle);
49254963 HFuncTraced (TEvPQ::TEvTransactionCompleted, Handle);
0 commit comments