@@ -96,9 +96,10 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e
9696 TInsertedPortions writtenData = ev->Get ()->DetachInsertedData ();
9797 if (ev->Get ()->GetWriteStatus () == NKikimrProto::OK) {
9898 const TMonotonic now = TMonotonic::Now ();
99- for (auto && i: writtenData.GetWriteResults ()) {
99+ for (auto && i : writtenData.GetWriteResults ()) {
100100 AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD_WRITE)(" writing_size" , i.GetDataSize ())(" event" , " data_write_finished" )(
101101 " writing_id" , i.GetWriteMeta ().GetId ());
102+ i.MutableWriteMeta ().OnStage (NEvWrite::EWriteStage::Finished);
102103 Counters.OnWritePutBlobsSuccess (now - i.GetWriteMeta ().GetWriteStartInstant (), i.GetRecordsCount ());
103104 Counters.GetWritesMonitor ()->OnFinishWrite (i.GetDataSize (), 1 );
104105 }
@@ -111,6 +112,7 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e
111112 AFL_WARN (NKikimrServices::TX_COLUMNSHARD_WRITE)(" writing_size" , i.GetDataSize ())(" event" , " data_write_error" )(
112113 " writing_id" , i.GetWriteMeta ().GetId ());
113114 Counters.GetWritesMonitor ()->OnFinishWrite (i.GetDataSize (), 1 );
115+ i.MutableWriteMeta ().OnStage (NEvWrite::EWriteStage::Finished);
114116 }
115117
116118 Execute (new TTxBlobsWritingFailed (this , ev->Get ()->GetWriteStatus (), std::move (writtenData)), ctx);
@@ -129,6 +131,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
129131
130132 for (auto && aggr : baseAggregations) {
131133 const auto & writeMeta = aggr->GetWriteMeta ();
134+ aggr->MutableWriteMeta ().OnStage (NEvWrite::EWriteStage::Finished);
132135 AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD_WRITE)(" event" , " blobs_write_finished" )(" writing_size" , aggr->GetSize ())(
133136 " writing_id" , writeMeta.GetId ())(" status" , putResult.GetPutStatus ());
134137 Counters.GetWritesMonitor ()->OnFinishWrite (aggr->GetSize (), 1 );
@@ -160,12 +163,6 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
160163 } else {
161164 const TMonotonic now = TMonotonic::Now ();
162165 Counters.OnWritePutBlobsSuccess (now - writeMeta.GetWriteStartInstant (), aggr->GetRows ());
163- Counters.GetCSCounters ().OnWriteMiddle1PutBlobsSuccess (now - writeMeta.GetWriteMiddle1StartInstant ());
164- Counters.GetCSCounters ().OnWriteMiddle2PutBlobsSuccess (now - writeMeta.GetWriteMiddle2StartInstant ());
165- Counters.GetCSCounters ().OnWriteMiddle3PutBlobsSuccess (now - writeMeta.GetWriteMiddle3StartInstant ());
166- Counters.GetCSCounters ().OnWriteMiddle4PutBlobsSuccess (now - writeMeta.GetWriteMiddle4StartInstant ());
167- Counters.GetCSCounters ().OnWriteMiddle5PutBlobsSuccess (now - writeMeta.GetWriteMiddle5StartInstant ());
168- Counters.GetCSCounters ().OnWriteMiddle6PutBlobsSuccess (now - writeMeta.GetWriteMiddle6StartInstant ());
169166 LOG_S_DEBUG (" Write (record) into pathId " << writeMeta.GetTableId ()
170167 << (writeMeta.GetWriteId () ? (" writeId " + ToString (writeMeta.GetWriteId ())).c_str () : " " )
171168 << " at tablet " << TabletID ());
@@ -195,7 +192,9 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
195192 granuleShardingVersion = record.GetGranuleShardingVersion ();
196193 }
197194
198- NEvWrite::TWriteMeta writeMeta (writeId, pathId, source, granuleShardingVersion, TGUID::CreateTimebased ().AsGuidString ());
195+ auto writeMetaPtr = std::make_shared<NEvWrite::TWriteMeta>(writeId, pathId, source, granuleShardingVersion,
196+ TGUID::CreateTimebased ().AsGuidString (), Counters.GetCSCounters ().WritingCounters ->GetWriteFlowCounters ());
197+ auto & writeMeta = *writeMetaPtr;
199198 if (record.HasModificationType ()) {
200199 writeMeta.SetModificationType (TEnumOperator<NEvWrite::EModificationType>::DeserializeFromProto (record.GetModificationType ()));
201200 }
@@ -243,7 +242,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
243242 return returnFail (COUNTER_WRITE_FAIL, EWriteFailReason::IncorrectSchema);
244243 }
245244
246- NEvWrite::TWriteData writeData (writeMeta , arrowData, snapshotSchema->GetIndexInfo ().GetReplaceKey (),
245+ NEvWrite::TWriteData writeData (writeMetaPtr , arrowData, snapshotSchema->GetIndexInfo ().GetReplaceKey (),
247246 StoragesManager->GetInsertOperator ()->StartWritingAction (NOlap::NBlobOperations::EConsumer::WRITING), false );
248247 auto overloadStatus = CheckOverloadedImmediate (pathId);
249248 if (overloadStatus == EOverloadStatus::None) {
@@ -273,13 +272,11 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
273272 LOG_S_DEBUG (" Write (blob) " << writeData.GetSize () << " bytes into pathId " << writeMeta.GetTableId ()
274273 << (writeMeta.GetWriteId () ? (" writeId " + ToString (writeMeta.GetWriteId ())).c_str () : " " )
275274 << Counters.GetWritesMonitor ()->DebugString () << " at tablet " << TabletID ());
276- writeData.MutableWriteMeta ().SetWriteMiddle1StartInstant (TMonotonic::Now ());
277275
278276 NOlap::TWritingContext context (TabletID (), SelfId (), snapshotSchema, StoragesManager, Counters.GetIndexationCounters ().SplitterCounters ,
279277 Counters.GetCSCounters ().WritingCounters , GetLastTxSnapshot (), std::make_shared<TAtomicCounter>(1 ), true ,
280278 BufferizationInsertionWriteActorId, BufferizationPortionsWriteActorId);
281- std::shared_ptr<NConveyor::ITask> task =
282- std::make_shared<NOlap::TBuildBatchesTask>(std::move (writeData), context);
279+ std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildBatchesTask>(std::move (writeData), context);
283280 NConveyor::TInsertServiceOperator::AsyncTaskToExecute (task);
284281 }
285282}
@@ -571,7 +568,9 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
571568 if (overloadStatus != EOverloadStatus::None) {
572569 std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError (
573570 TabletID (), 0 , NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, " overload data error" );
574- OverloadWriteFail (overloadStatus, NEvWrite::TWriteMeta (0 , pathId, source, {}, TGUID::CreateTimebased ().AsGuidString ()),
571+ OverloadWriteFail (overloadStatus,
572+ NEvWrite::TWriteMeta (0 , pathId, source, {}, TGUID::CreateTimebased ().AsGuidString (),
573+ Counters.GetCSCounters ().WritingCounters ->GetWriteFlowCounters ()),
575574 arrowData->GetSize (), cookie, std::move (result), ctx);
576575 return ;
577576 }
0 commit comments