77
88namespace NKikimr ::NEvWrite {
99
10- TWritersController::TWritersController (const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId)
10+ TWritersController::TWritersController (const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId, const bool immediateWrite )
1111 : WritesCount(writesCount)
1212 , LongTxActorId(longTxActorId)
13+ , ImmediateWrite(immediateWrite)
1314 , LongTxId(longTxId)
1415 {
1516 Y_ABORT_UNLESS (writesCount);
@@ -39,28 +40,63 @@ namespace NKikimr::NEvWrite {
3940 }
4041 }
4142
42- TShardWriter::TShardWriter (const ui64 shardId, const ui64 tableId, const TString& dedupId, const IShardInfo::TPtr& data,
43- const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType )
43+ TShardWriter::TShardWriter (const ui64 shardId, const ui64 tableId, const ui64 schemaVersion, const TString& dedupId, const IShardInfo::TPtr& data,
44+ const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType , const bool immediateWrite )
4445 : ShardId(shardId)
4546 , WritePartIdx(writePartIdx)
4647 , TableId(tableId)
48+ , SchemaVersion(schemaVersion)
4749 , DedupId(dedupId)
4850 , DataForShard(data)
4951 , ExternalController(externalController)
5052 , LeaderPipeCache(MakePipePerNodeCacheID(false ))
5153 , ActorSpan(parentSpan.BuildChildrenSpan(" ShardWriter" ))
5254 , ModificationType(mType )
55+ , ImmediateWrite(immediateWrite)
5356 {
5457 }
5558
59+ void TShardWriter::SendWriteRequest () {
60+ if (ImmediateWrite) {
61+ auto ev = MakeHolder<NEvents::TDataEvents::TEvWrite>(NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
62+ DataForShard->Serialize (*ev, TableId, SchemaVersion);
63+ SendToTablet (std::move (ev));
64+ } else {
65+ auto ev = MakeHolder<TEvColumnShard::TEvWrite>(SelfId (), ExternalController->GetLongTxId (), TableId, DedupId, " " , WritePartIdx, ModificationType);
66+ DataForShard->Serialize (*ev);
67+ SendToTablet (std::move (ev));
68+ }
69+ }
70+
5671 void TShardWriter::Bootstrap () {
57- auto ev = MakeHolder<TEvWrite>(SelfId (), ExternalController->GetLongTxId (), TableId, DedupId, " " , WritePartIdx, ModificationType);
58- DataForShard->Serialize (*ev);
59- SendToTablet (std::move (ev));
72+ SendWriteRequest ();
6073 Become (&TShardWriter::StateMain);
74+ Schedule (TDuration::Seconds (10 ), new TEvents::TEvWakeup ());
6175 }
6276
63- void TShardWriter::Handle (TEvWriteResult::TPtr& ev) {
77+ void TShardWriter::Handle (NEvents::TDataEvents::TEvWriteResult::TPtr& ev) {
78+ const auto * msg = ev->Get ();
79+ Y_ABORT_UNLESS (msg->Record .GetOrigin () == ShardId);
80+
81+ const auto ydbStatus = msg->GetStatus ();
82+ if (ydbStatus == NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED) {
83+ if (RetryWriteRequest (true )) {
84+ return ;
85+ }
86+ }
87+
88+ auto gPassAway = PassAwayGuard ();
89+ if (ydbStatus != NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED) {
90+ ExternalController->OnFail (Ydb::StatusIds::INTERNAL_ERROR,
91+ TStringBuilder () << " Cannot write data into shard " << ShardId << " in longTx " <<
92+ ExternalController->GetLongTxId ().ToString ());
93+ return ;
94+ }
95+
96+ ExternalController->OnSuccess (ShardId, 0 , WritePartIdx);
97+ }
98+
99+ void TShardWriter::Handle (TEvColumnShard::TEvWriteResult::TPtr& ev) {
64100 const auto * msg = ev->Get ();
65101 Y_ABORT_UNLESS (msg->Record .GetOrigin () == ShardId);
66102
@@ -103,6 +139,7 @@ namespace NKikimr::NEvWrite {
103139
104140 void TShardWriter::HandleTimeout (const TActorContext& /* ctx*/ ) {
105141 RetryWriteRequest (false );
142+ Schedule (TDuration::Seconds (10 ), new TEvents::TEvWakeup ());
106143 }
107144
108145 bool TShardWriter::RetryWriteRequest (const bool delayed) {
@@ -113,9 +150,7 @@ namespace NKikimr::NEvWrite {
113150 Schedule (OverloadTimeout (), new TEvents::TEvWakeup ());
114151 } else {
115152 ++NumRetries;
116- auto ev = MakeHolder<TEvWrite>(SelfId (), ExternalController->GetLongTxId (), TableId, DedupId, " " , WritePartIdx, ModificationType);
117- DataForShard->Serialize (*ev);
118- SendToTablet (std::move (ev));
153+ SendWriteRequest ();
119154 }
120155 return true ;
121156 }
0 commit comments