@@ -93,12 +93,15 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e
9393 NActors::TLogContextBuilder::Build (NKikimrServices::TX_COLUMNSHARD_WRITE)(" tablet_id" , TabletID ())(" event" , " TEvWritePortionResult" );
9494 std::vector<TNoDataWrite> noDataWrites = ev->Get ()->DetachNoDataWrites ();
9595 for (auto && i : noDataWrites) {
96+ AFL_WARN (NKikimrServices::TX_COLUMNSHARD_WRITE)(" event" , " no_data_write_finished" )(" writing_size" , i.GetDataSize ())(" writing_id" , i.GetWriteMeta ().GetId ());
9697 Counters.GetWritesMonitor ()->OnFinishWrite (i.GetDataSize (), 1 );
9798 }
9899 if (ev->Get ()->GetWriteStatus () == NKikimrProto::OK) {
99100 std::vector<TInsertedPortions> writtenPacks = ev->Get ()->DetachInsertedPacks ();
100101 const TMonotonic now = TMonotonic::Now ();
101102 for (auto && i : writtenPacks) {
103+ AFL_WARN (NKikimrServices::TX_COLUMNSHARD_WRITE)(" writing_size" , i.GetDataSize ())(" event" , " data_write_finished" )(
104+ " writing_id" , i.GetWriteMeta ().GetId ());
102105 Counters.OnWritePutBlobsSuccess (now - i.GetWriteMeta ().GetWriteStartInstant (), i.GetRecordsCount ());
103106 Counters.GetWritesMonitor ()->OnFinishWrite (i.GetDataSize (), 1 );
104107 }
@@ -115,6 +118,8 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e
115118 for (auto && i : writtenPacks) {
116119 Counters.OnWritePutBlobsFailed (now - i.GetWriteMeta ().GetWriteStartInstant (), i.GetRecordsCount ());
117120 Counters.GetCSCounters ().OnWritePutBlobsFail (now - i.GetWriteMeta ().GetWriteStartInstant ());
121+ AFL_WARN (NKikimrServices::TX_COLUMNSHARD_WRITE)(" writing_size" , i.GetDataSize ())(" event" , " data_write_error" )(
122+ " writing_id" , i.GetWriteMeta ().GetId ());
118123 Counters.GetWritesMonitor ()->OnFinishWrite (i.GetDataSize (), 1 );
119124 }
120125 Execute (new TTxBlobsWritingFailed (this , ev->Get ()->GetWriteStatus (), std::move (writtenPacks)), ctx);
@@ -131,10 +136,11 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
131136 auto baseAggregations = wBuffer.GetAggregations ();
132137 wBuffer.InitReplyReceived (TMonotonic::Now ());
133138
134- Counters.GetWritesMonitor ()->OnFinishWrite (wBuffer.GetSumSize (), wBuffer.GetAggregations ().size ());
135-
136139 for (auto && aggr : baseAggregations) {
137140 const auto & writeMeta = aggr->GetWriteMeta ();
141+ AFL_WARN (NKikimrServices::TX_COLUMNSHARD_WRITE)(" event" , " blobs_write_finished" )(" writing_size" , aggr->GetSize ())(
142+ " writing_id" , writeMeta.GetId ())(" status" , putResult.GetPutStatus ());
143+ Counters.GetWritesMonitor ()->OnFinishWrite (aggr->GetSize (), 1 );
138144
139145 if (!TablesManager.IsReadyForWrite (writeMeta.GetTableId ())) {
140146 ACFL_ERROR (" event" , " absent_pathId" )(" path_id" , writeMeta.GetTableId ())(" has_index" , TablesManager.HasPrimaryIndex ());
@@ -210,7 +216,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
210216 granuleShardingVersion = record.GetGranuleShardingVersion ();
211217 }
212218
213- NEvWrite::TWriteMeta writeMeta (writeId, pathId, source, granuleShardingVersion);
219+ NEvWrite::TWriteMeta writeMeta (writeId, pathId, source, granuleShardingVersion, TGUID::CreateTimebased (). AsGuidString () );
214220 if (record.HasModificationType ()) {
215221 writeMeta.SetModificationType (TEnumOperator<NEvWrite::EModificationType>::DeserializeFromProto (record.GetModificationType ()));
216222 }
@@ -288,7 +294,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
288294 writeData.MutableWriteMeta ().SetWriteMiddle1StartInstant (TMonotonic::Now ());
289295
290296 NOlap::TWritingContext context (TabletID (), SelfId (), snapshotSchema, StoragesManager, Counters.GetIndexationCounters ().SplitterCounters ,
291- Counters.GetCSCounters ().WritingCounters , GetLastTxSnapshot ());
297+ Counters.GetCSCounters ().WritingCounters , GetLastTxSnapshot (), std::make_shared<TAtomicCounter>( 1 ) );
292298 std::shared_ptr<NConveyor::ITask> task =
293299 std::make_shared<NOlap::TBuildBatchesTask>(BufferizationWriteActorId, std::move (writeData), context);
294300 NConveyor::TInsertServiceOperator::AsyncTaskToExecute (task);
@@ -460,6 +466,15 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
460466 const auto & record = ev->Get ()->Record ;
461467 const auto source = ev->Sender ;
462468 const auto cookie = ev->Cookie ;
469+
470+ if (!TablesManager.GetPrimaryIndex ()) {
471+ Counters.GetTabletCounters ()->IncCounter (COUNTER_WRITE_FAIL);
472+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError (
473+ TabletID (), 0 , NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, " schema not ready for writing" );
474+ ctx.Send (source, result.release (), 0 , cookie);
475+ return ;
476+ }
477+
463478 const auto behaviourConclusion = TOperationsManager::GetBehaviour (*ev->Get ());
464479 AFL_TRACE (NKikimrServices::TX_COLUMNSHARD_WRITE)(" ev_write" , record.DebugString ());
465480 if (behaviourConclusion.IsFail ()) {
@@ -560,12 +575,10 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
560575 if (overloadStatus != EOverloadStatus::None) {
561576 std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError (
562577 TabletID (), 0 , NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, " overload data error" );
563- OverloadWriteFail (overloadStatus, NEvWrite::TWriteMeta (0 , pathId, source, {}), arrowData->GetSize (), cookie, std::move (result), ctx);
578+ OverloadWriteFail (overloadStatus, NEvWrite::TWriteMeta (0 , pathId, source, {}, TGUID::CreateTimebased (). AsGuidString () ), arrowData->GetSize (), cookie, std::move (result), ctx);
564579 return ;
565580 }
566581
567- Counters.GetWritesMonitor ()->OnStartWrite (arrowData->GetSize ());
568-
569582 std::optional<ui32> granuleShardingVersionId;
570583 if (record.HasGranuleShardingVersionId ()) {
571584 granuleShardingVersionId = record.GetGranuleShardingVersionId ();
@@ -586,10 +599,15 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
586599 OperationsManager->RegisterLock (lockId, Generation ());
587600 auto writeOperation = OperationsManager->RegisterOperation (
588601 pathId, lockId, cookie, granuleShardingVersionId, *mType , AppDataVerified ().FeatureFlags .GetEnableWritePortionsOnInsert ());
602+
603+ AFL_WARN (NKikimrServices::TX_COLUMNSHARD_WRITE)(" writing_size" , arrowData->GetSize ())(" operation_id" , writeOperation->GetIdentifier ())(
604+ " in_flight" , Counters.GetWritesMonitor ()->GetWritesInFlight ())(" size_in_flight" , Counters.GetWritesMonitor ()->GetWritesSizeInFlight ());
605+ Counters.GetWritesMonitor ()->OnStartWrite (arrowData->GetSize ());
606+
589607 Y_ABORT_UNLESS (writeOperation);
590608 writeOperation->SetBehaviour (behaviour);
591- NOlap::TWritingContext wContext (pathId , SelfId (), schema, StoragesManager, Counters.GetIndexationCounters ().SplitterCounters ,
592- Counters.GetCSCounters ().WritingCounters , NOlap::TSnapshot::Max ());
609+ NOlap::TWritingContext wContext (TabletID () , SelfId (), schema, StoragesManager, Counters.GetIndexationCounters ().SplitterCounters ,
610+ Counters.GetCSCounters ().WritingCounters , NOlap::TSnapshot::Max (), writeOperation-> GetActivityChecker () );
593611 arrowData->SetSeparationPoints (GetIndexAs<NOlap::TColumnEngineForLogs>().GetGranulePtrVerified (pathId)->GetBucketPositions ());
594612 writeOperation->Start (*this , arrowData, source, wContext);
595613}
0 commit comments