@@ -303,10 +303,11 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
303303 // TODO: Maybe there are better ways to initialize new shards...
304304 for (const auto & shardInfo : ShardedWriteController->GetPendingShards ()) {
305305 TxManager->AddShard (shardInfo.ShardId , IsOlap (), TablePath);
306- TxManager-> AddAction (shardInfo. ShardId , IKqpTransactionManager::EAction::WRITE) ;
306+ IKqpTransactionManager::TActionFlags flags = IKqpTransactionManager::EAction::WRITE;
307307 if (shardInfo.HasRead ) {
308- TxManager-> AddAction (shardInfo. ShardId , IKqpTransactionManager::EAction::READ) ;
308+ flags |= IKqpTransactionManager::EAction::READ;
309309 }
310+ TxManager->AddAction (shardInfo.ShardId , flags);
310311 }
311312 }
312313
@@ -540,7 +541,6 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
540541 << " ShardID=" << ev->Get ()->Record .GetOrigin () << " ,"
541542 << " Sink=" << this ->SelfId () << " ."
542543 << getIssues ().ToOneLineString ());
543-
544544 // TODO: Add new status for splits in datashard. This is tmp solution.
545545 if (getIssues ().ToOneLineString ().Contains (" in a pre/offline state assuming this is due to a finished split (wrong shard state)" )) {
546546 ResetShardRetries (ev->Get ()->Record .GetOrigin (), ev->Cookie );
@@ -561,13 +561,12 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
561561 << " ShardID=" << ev->Get ()->Record .GetOrigin () << " ,"
562562 << " Sink=" << this ->SelfId () << " ."
563563 << getIssues ().ToOneLineString ());
564-
565- RuntimeError (
566- TStringBuilder () << " Disk space exhausted for table `"
567- << TablePath << " `. "
568- << getIssues ().ToOneLineString (),
569- NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
570- getIssues ());
564+ RuntimeError (
565+ TStringBuilder () << " Disk space exhausted for table `"
566+ << TablePath << " `. "
567+ << getIssues ().ToOneLineString (),
568+ NYql::NDqProto::StatusIds::UNAVAILABLE,
569+ getIssues ());
571570 return ;
572571 }
573572 case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: {
@@ -670,7 +669,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
670669 preparedInfo.Coordinator = domainCoordinators.Select (*TxId);
671670 }
672671
673- OnMessageAcknowledged (ev->Get ()->Record .GetOrigin ());
672+ OnMessageReceived (ev->Get ()->Record .GetOrigin ());
674673 const auto result = ShardedWriteController->OnMessageAcknowledged (
675674 ev->Get ()->Record .GetOrigin (), ev->Cookie );
676675 if (result) {
@@ -713,7 +712,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
713712 return ;
714713 }
715714
716- OnMessageAcknowledged (ev->Get ()->Record .GetOrigin ());
715+ OnMessageReceived (ev->Get ()->Record .GetOrigin ());
717716 const auto result = ShardedWriteController->OnMessageAcknowledged (
718717 ev->Get ()->Record .GetOrigin (), ev->Cookie );
719718 if (result && result->IsShardEmpty && Mode == EMode::IMMEDIATE_COMMIT) {
@@ -723,7 +722,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
723722 }
724723 }
725724
726- void OnMessageAcknowledged (const ui64 shardId) {
725+ void OnMessageReceived (const ui64 shardId) {
727726 if (auto it = SendTime.find (shardId); it != std::end (SendTime)) {
728727 Counters->WriteActorWritesLatencyHistogram ->Collect ((TInstant::Now () - it->second ).MilliSeconds ());
729728 SendTime.erase (it);
@@ -847,7 +846,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
847846 << " , Attempts=" << metadata->SendAttempts << " , Mode=" << static_cast <int >(Mode));
848847 Send (
849848 PipeCacheId,
850- new TEvPipeCache::TEvForward (evWrite.release (), shardId, true ),
849+ new TEvPipeCache::TEvForward (evWrite.release (), shardId, /* subscribe */ true ),
851850 IEventHandle::FlagTrackDelivery,
852851 metadata->Cookie ,
853852 TableWriteActorSpan.GetTraceId ());
@@ -1270,6 +1269,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
12701269 , Alloc(std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(__LOCATION__))
12711270 , TypeEnv(*Alloc)
12721271 , Counters(settings.Counters)
1272+ , TxProxyMon(settings.TxProxyMon)
12731273 , BufferWriteActor(TWilsonKqp::BufferWriteActor, NWilson::TTraceId(settings.TraceId), " TKqpBufferWriteActor" , NWilson::EFlags::AUTO_END)
12741274 , BufferWriteActorState(TWilsonKqp::BufferWriteActorState, BufferWriteActor.GetTraceId(),
12751275 " BufferWriteActorState::Writing" , NWilson::EFlags::AUTO_END)
@@ -1552,6 +1552,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
15521552 FillEvWritePrepare (evWrite.get (), shardId, *TxId, TxManager);
15531553 }
15541554
1555+ SendTime[shardId] = TInstant::Now ();
15551556 CA_LOG_D (" Send EvWrite (external) to ShardID=" << shardId << " , isPrepare=" << !isRollback << " , isImmediateCommit=" << isRollback << " , TxId=" << evWrite->Record .GetTxId ()
15561557 << " , LockTxId=" << evWrite->Record .GetLockTxId () << " , LockNodeId=" << evWrite->Record .GetLockNodeId ()
15571558 << " , Locks= " << [&]() {
@@ -1565,10 +1566,11 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
15651566 << " , OperationsCount=" << 0 << " , IsFinal=" << 1
15661567 << " , Attempts=" << 0 );
15671568
1569+ // TODO: Track latecy
15681570 Send (
15691571 NKikimr::MakePipePerNodeCacheID (false ),
1570- new TEvPipeCache::TEvForward (evWrite.release (), shardId, true ),
1571- 0 ,
1572+ new TEvPipeCache::TEvForward (evWrite.release (), shardId, /* subscribe */ true ),
1573+ IEventHandle::FlagTrackDelivery ,
15721574 0 );
15731575 }
15741576 }
@@ -1672,26 +1674,30 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
16721674
16731675 switch (res->GetStatus ()) {
16741676 case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusAccepted:
1675- // TODO: metrics
1677+ TxProxyMon-> ClientTxStatusAccepted -> Inc ();
16761678 break ;
16771679 case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusProcessed:
1680+ TxProxyMon->ClientTxStatusProcessed ->Inc ();
16781681 break ;
16791682 case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusConfirmed:
1683+ TxProxyMon->ClientTxStatusConfirmed ->Inc ();
16801684 break ;
16811685
16821686 case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusPlanned:
1687+ TxProxyMon->ClientTxStatusPlanned ->Inc ();
16831688 break ;
16841689
16851690 case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusOutdated:
16861691 case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusDeclined:
16871692 case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusDeclinedNoSpace:
16881693 case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusRestarting:
1689- // TODO: CancelProposal???
1694+ TxProxyMon-> ClientTxStatusCoordinatorDeclined -> Inc ();
16901695 ReplyErrorAndDie (TStringBuilder () << " Failed to plan transaction, status: " << res->GetStatus (), NYql::NDqProto::StatusIds::UNAVAILABLE, {});
16911696 break ;
16921697
16931698 case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusUnknown:
16941699 case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusAborted:
1700+ TxProxyMon->ClientTxStatusCoordinatorDeclined ->Inc ();
16951701 ReplyErrorAndDie (TStringBuilder () << " Unexpected TEvProposeTransactionStatus status: " << res->GetStatus (), NYql::NDqProto::StatusIds::INTERNAL_ERROR, {});
16961702 break ;
16971703 }
@@ -1797,7 +1803,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
17971803 << " ShardID=" << ev->Get ()->Record .GetOrigin () << " ,"
17981804 << " Sink=" << this ->SelfId () << " ."
17991805 << getIssues ().ToOneLineString ());
1800-
18011806 ReplyErrorAndDie (
18021807 TStringBuilder () << " Internal error for table. "
18031808 << getIssues ().ToOneLineString (),
@@ -1810,11 +1815,10 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
18101815 << " ShardID=" << ev->Get ()->Record .GetOrigin () << " ,"
18111816 << " Sink=" << this ->SelfId () << " ."
18121817 << getIssues ().ToOneLineString ());
1813-
18141818 ReplyErrorAndDie (
18151819 TStringBuilder () << " Disk space exhausted for table. "
18161820 << getIssues ().ToOneLineString (),
1817- NYql::NDqProto::StatusIds::PRECONDITION_FAILED ,
1821+ NYql::NDqProto::StatusIds::UNAVAILABLE ,
18181822 getIssues ());
18191823 return ;
18201824 }
@@ -1824,7 +1828,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
18241828 << " Sink=" << this ->SelfId () << " ."
18251829 << " Ignored this error."
18261830 << getIssues ().ToOneLineString ());
1827- // TODO: support waiting
18281831 ReplyErrorAndDie (
18291832 TStringBuilder () << " Tablet " << ev->Get ()->Record .GetOrigin () << " is overloaded."
18301833 << getIssues ().ToOneLineString (),
@@ -1886,11 +1889,23 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
18861889 }
18871890 }
18881891
1892+ void OnMessageReceived (const ui64 shardId) {
1893+ if (auto it = SendTime.find (shardId); it != std::end (SendTime)) {
1894+ Counters->WriteActorWritesLatencyHistogram ->Collect ((TInstant::Now () - it->second ).MilliSeconds ());
1895+ SendTime.erase (it);
1896+ }
1897+ }
1898+
18891899 void ProcessWritePreparedShard (NKikimr::NEvents::TDataEvents::TEvWriteResult::TPtr& ev) {
18901900 if (State != EState::PREPARING) {
18911901 CA_LOG_D (" Ignored write prepared event." );
18921902 return ;
18931903 }
1904+ OnMessageReceived (ev->Get ()->Record .GetOrigin ());
1905+ CA_LOG_D (" Got prepared result TxId=" << ev->Get ()->Record .GetTxId ()
1906+ << " , TabletId=" << ev->Get ()->Record .GetOrigin ()
1907+ << " , Cookie=" << ev->Cookie );
1908+
18941909 const auto & record = ev->Get ()->Record ;
18951910 IKqpTransactionManager::TPrepareResult preparedInfo;
18961911 preparedInfo.ShardId = record.GetOrigin ();
@@ -1912,6 +1927,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
19121927 CA_LOG_D (" Ignored write completed event." );
19131928 return ;
19141929 }
1930+ OnMessageReceived (ev->Get ()->Record .GetOrigin ());
19151931 CA_LOG_D (" Got completed result TxId=" << ev->Get ()->Record .GetTxId ()
19161932 << " , TabletId=" << ev->Get ()->Record .GetOrigin ()
19171933 << " , Cookie=" << ev->Cookie
@@ -2033,6 +2049,9 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
20332049 IShardedWriteControllerPtr ShardedWriteController = nullptr ;
20342050
20352051 TIntrusivePtr<TKqpCounters> Counters;
2052+ TIntrusivePtr<NTxProxy::TTxProxyMon> TxProxyMon;
2053+ THashMap<ui64, TInstant> SendTime;
2054+
20362055 NWilson::TSpan BufferWriteActor;
20372056 NWilson::TSpan BufferWriteActorState;
20382057};
0 commit comments