1313#include " transactions/operators/ev_write/secondary.h"
1414#include " transactions/operators/ev_write/sync.h"
1515
16+ #include < ydb/core/tx/columnshard/tablet/write_queue.h>
1617#include < ydb/core/tx/conveyor/usage/service.h>
1718#include < ydb/core/tx/data_events/events.h>
1819
@@ -46,26 +47,27 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const
4647 Y_ABORT (" invalid function usage" );
4748 }
4849
49- AFL_TRACE (NKikimrServices::TX_COLUMNSHARD_WRITE)(" event" , " write_overload" )(" size" , writeSize)(" path_id" , writeMeta.GetTableId ())(
50+ AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD_WRITE)(" event" , " write_overload" )(" size" , writeSize)(" path_id" , writeMeta.GetTableId ())(
5051 " reason" , overloadReason);
5152
5253 ctx.Send (writeMeta.GetSource (), event.release (), 0 , cookie);
5354}
5455
55- TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded (const ui64 pathId) const {
56- if (IsAnyChannelYellowStop ()) {
57- return EOverloadStatus::Disk;
58- }
59-
56+ TColumnShard::EOverloadStatus TColumnShard::CheckOverloadedWait (const ui64 pathId) const {
6057 if (InsertTable && InsertTable->IsOverloadedByCommitted (pathId)) {
6158 return EOverloadStatus::InsertTable;
6259 }
63-
6460 Counters.GetCSCounters ().OnIndexMetadataLimit (NOlap::IColumnEngine::GetMetadataLimit ());
6561 if (TablesManager.GetPrimaryIndex () && TablesManager.GetPrimaryIndex ()->IsOverloadedByMetadata (NOlap::IColumnEngine::GetMetadataLimit ())) {
6662 return EOverloadStatus::OverloadMetadata;
6763 }
64+ return EOverloadStatus::None;
65+ }
6866
67+ TColumnShard::EOverloadStatus TColumnShard::CheckOverloadedImmediate (const ui64 pathId) const {
68+ if (IsAnyChannelYellowStop ()) {
69+ return EOverloadStatus::Disk;
70+ }
6971 ui64 txLimit = Settings.OverloadTxInFlight ;
7072 ui64 writesLimit = Settings.OverloadWritesInFlight ;
7173 ui64 writesSizeLimit = Settings.OverloadWritesSizeInFlight ;
@@ -93,7 +95,8 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e
9395 NActors::TLogContextBuilder::Build (NKikimrServices::TX_COLUMNSHARD_WRITE)(" tablet_id" , TabletID ())(" event" , " TEvWritePortionResult" );
9496 std::vector<TNoDataWrite> noDataWrites = ev->Get ()->DetachNoDataWrites ();
9597 for (auto && i : noDataWrites) {
96- AFL_WARN (NKikimrServices::TX_COLUMNSHARD_WRITE)(" event" , " no_data_write_finished" )(" writing_size" , i.GetDataSize ())(" writing_id" , i.GetWriteMeta ().GetId ());
98+ AFL_WARN (NKikimrServices::TX_COLUMNSHARD_WRITE)(" event" , " no_data_write_finished" )(" writing_size" , i.GetDataSize ())(
99+ " writing_id" , i.GetWriteMeta ().GetId ());
97100 Counters.GetWritesMonitor ()->OnFinishWrite (i.GetDataSize (), 1 );
98101 }
99102 if (ev->Get ()->GetWriteStatus () == NKikimrProto::OK) {
@@ -255,7 +258,10 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
255258
256259 NEvWrite::TWriteData writeData (writeMeta, arrowData, snapshotSchema->GetIndexInfo ().GetReplaceKey (),
257260 StoragesManager->GetInsertOperator ()->StartWritingAction (NOlap::NBlobOperations::EConsumer::WRITING), false );
258- auto overloadStatus = CheckOverloaded (pathId);
261+ auto overloadStatus = CheckOverloadedImmediate (pathId);
262+ if (overloadStatus == EOverloadStatus::None) {
263+ overloadStatus = CheckOverloadedWait (pathId);
264+ }
259265 if (overloadStatus != EOverloadStatus::None) {
260266 std::unique_ptr<NActors::IEventBase> result = std::make_unique<TEvColumnShard::TEvWriteResult>(
261267 TabletID (), writeData.GetWriteMeta (), NKikimrTxColumnShard::EResultStatus::OVERLOADED);
@@ -560,11 +566,17 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
560566 return ;
561567 }
562568
563- auto overloadStatus = CheckOverloaded (pathId);
569+ if (!AppDataVerified ().ColumnShardConfig .GetWritingEnabled ()) {
570+ sendError (" writing disabled" , NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED);
571+ return ;
572+ }
573+
574+ auto overloadStatus = CheckOverloadedImmediate (pathId);
564575 if (overloadStatus != EOverloadStatus::None) {
565576 std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError (
566577 TabletID (), 0 , NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, " overload data error" );
567- OverloadWriteFail (overloadStatus, NEvWrite::TWriteMeta (0 , pathId, source, {}, TGUID::CreateTimebased ().AsGuidString ()), arrowData->GetSize (), cookie, std::move (result), ctx);
578+ OverloadWriteFail (overloadStatus, NEvWrite::TWriteMeta (0 , pathId, source, {}, TGUID::CreateTimebased ().AsGuidString ()),
579+ arrowData->GetSize (), cookie, std::move (result), ctx);
568580 return ;
569581 }
570582
@@ -580,25 +592,8 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
580592 lockId = record.GetLockTxId ();
581593 }
582594
583- if (!AppDataVerified ().ColumnShardConfig .GetWritingEnabled ()) {
584- sendError (" writing disabled" , NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED);
585- return ;
586- }
587-
588- OperationsManager->RegisterLock (lockId, Generation ());
589- auto writeOperation = OperationsManager->RegisterOperation (
590- pathId, lockId, cookie, granuleShardingVersionId, *mType , AppDataVerified ().FeatureFlags .GetEnableWritePortionsOnInsert ());
591-
592- AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD_WRITE)(" writing_size" , arrowData->GetSize ())(" operation_id" , writeOperation->GetIdentifier ())(
593- " in_flight" , Counters.GetWritesMonitor ()->GetWritesInFlight ())(" size_in_flight" , Counters.GetWritesMonitor ()->GetWritesSizeInFlight ());
594- Counters.GetWritesMonitor ()->OnStartWrite (arrowData->GetSize ());
595-
596- Y_ABORT_UNLESS (writeOperation);
597- writeOperation->SetBehaviour (behaviour);
598- NOlap::TWritingContext wContext (TabletID (), SelfId (), schema, StoragesManager, Counters.GetIndexationCounters ().SplitterCounters ,
599- Counters.GetCSCounters ().WritingCounters , NOlap::TSnapshot::Max (), writeOperation->GetActivityChecker ());
600- arrowData->SetSeparationPoints (GetIndexAs<NOlap::TColumnEngineForLogs>().GetGranulePtrVerified (pathId)->GetBucketPositions ());
601- writeOperation->Start (*this , arrowData, source, wContext);
595+ WriteTasksQueue->Enqueue (TWriteTask (arrowData, schema, source, granuleShardingVersionId, pathId, cookie, lockId, *mType , behaviour));
596+ WriteTasksQueue->Drain (false , ctx);
602597}
603598
604599} // namespace NKikimr::NColumnShard
0 commit comments