66namespace NKikimr ::NColumnShard {
77
88bool TTxWrite::InsertOneBlob (TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId) {
9- NKikimrTxColumnShard::TLogicalMetadata meta;
10- meta.SetNumRows (batch->GetRowsCount ());
11- meta.SetRawBytes (batch->GetRawBytes ());
12- meta.SetDirtyWriteTimeSeconds (batch.GetStartInstant ().Seconds ());
13- meta.SetSpecialKeysRawData (batch->GetSpecialKeysFullSafe ());
14- meta.SetSpecialKeysPayloadData (batch->GetSpecialKeysPayloadSafe ());
15-
16- const auto & blobRange = batch.GetRange ();
17- Y_ABORT_UNLESS (blobRange.GetBlobId ().IsValid ());
9+ auto userData = batch.BuildInsertionUserData (*Self);
10+ NOlap::TInsertedData insertData (writeId, userData);
1811
19- // First write wins
2012 TBlobGroupSelector dsGroupSelector (Self->Info ());
2113 NOlap::TDbWrapper dbTable (txc.DB , &dsGroupSelector);
22-
23- const auto & writeMeta = batch.GetAggregation ().GetWriteMeta ();
24- meta.SetModificationType (TEnumOperator<NEvWrite::EModificationType>::SerializeToProto (writeMeta.GetModificationType ()));
25- *meta.MutableSchemaSubset () = batch.GetAggregation ().GetSchemaSubset ().SerializeToProto ();
26- auto schemeVersion = batch.GetAggregation ().GetSchemaVersion ();
27- auto tableSchema = Self->TablesManager .GetPrimaryIndex ()->GetVersionedIndex ().GetSchemaVerified (schemeVersion);
28-
29- auto userData = std::make_shared<NOlap::TUserData>(writeMeta.GetTableId (), blobRange, meta, tableSchema->GetVersion (), batch->GetData ());
30- NOlap::TInsertedData insertData (writeId, userData);
3114 bool ok = Self->InsertTable ->Insert (dbTable, std::move (insertData));
3215 if (ok) {
3316 Self->UpdateInsertTableCounters ();
@@ -36,6 +19,18 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
3619 return false ;
3720}
3821
22+ bool TTxWrite::CommitOneBlob (TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId) {
23+ auto userData = batch.BuildInsertionUserData (*Self);
24+ TBlobGroupSelector dsGroupSelector (Self->Info ());
25+ NOlap::TDbWrapper dbTable (txc.DB , &dsGroupSelector);
26+ NOlap::TCommittedData commitData (userData, Self->GetLastPlannedSnapshot (), Self->Generation (), writeId);
27+ if (Self->TablesManager .HasTable (userData->GetPathId ())) {
28+ Self->InsertTable ->CommitEphemeral (dbTable, std::move (commitData));
29+ }
30+ Self->UpdateInsertTableCounters ();
31+ return true ;
32+ }
33+
3934bool TTxWrite::Execute (TTransactionContext& txc, const TActorContext&) {
4035 TMemoryProfileGuard mpg (" TTxWrite::Execute" );
4136 NActors::TLogContextGuard logGuard =
@@ -65,10 +60,17 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
6560 operation = Self->OperationsManager ->GetOperationVerified ((TOperationWriteId)writeMeta.GetWriteId ());
6661 Y_ABORT_UNLESS (operation->GetStatus () == EOperationStatus::Started);
6762 for (auto && i : aggr->GetSplittedBlobs ()) {
68- const TInsertWriteId insertWriteId = Self->InsertTable ->BuildNextWriteId (txc);
69- aggr->AddInsertWriteId (insertWriteId);
70- AFL_VERIFY (InsertOneBlob (txc, i, insertWriteId))(" write_id" , writeMeta.GetWriteId ())(" insert_write_id" , insertWriteId)(
71- " size" , aggr->GetSplittedBlobs ().size ());
63+ if (operation->GetBehaviour () == EOperationBehaviour::NoTxWrite) {
64+ static TAtomicCounter Counter = 0 ;
65+ const TInsertWriteId insertWriteId = (TInsertWriteId)Counter.Inc ();
66+ AFL_VERIFY (CommitOneBlob (txc, i, insertWriteId))(" write_id" , writeMeta.GetWriteId ())(" insert_write_id" , insertWriteId)(
67+ " size" , aggr->GetSplittedBlobs ().size ());
68+ } else {
69+ const TInsertWriteId insertWriteId = Self->InsertTable ->BuildNextWriteId (txc);
70+ aggr->AddInsertWriteId (insertWriteId);
71+ AFL_VERIFY (InsertOneBlob (txc, i, insertWriteId))(" write_id" , writeMeta.GetWriteId ())(" insert_write_id" , insertWriteId)(
72+ " size" , aggr->GetSplittedBlobs ().size ());
73+ }
7274 }
7375 }
7476 }
@@ -92,8 +94,6 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
9294 if (operation->GetBehaviour () == EOperationBehaviour::NoTxWrite) {
9395 auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted (Self->TabletID ());
9496 Results.emplace_back (std::move (ev), writeMeta.GetSource (), operation->GetCookie ());
95- Self->OperationsManager ->AddTemporaryTxLink (operation->GetLockId ());
96- Self->OperationsManager ->CommitTransactionOnExecute (*Self, operation->GetLockId (), txc, Self->GetLastTxSnapshot ());
9797 } else if (operation->GetBehaviour () == EOperationBehaviour::InTxWrite) {
9898 NKikimrTxColumnShard::TCommitWriteTxBody proto;
9999 proto.SetLockId (operation->GetLockId ());
@@ -156,13 +156,15 @@ void TTxWrite::Complete(const TActorContext& ctx) {
156156 Self->GetOperationsManager ().AddEventForLock (*Self, op->GetLockId (), evWrite);
157157 }
158158 if (op->GetBehaviour () == EOperationBehaviour::NoTxWrite) {
159+ Self->OperationsManager ->AddTemporaryTxLink (op->GetLockId ());
159160 Self->OperationsManager ->CommitTransactionOnComplete (*Self, op->GetLockId (), Self->GetLastTxSnapshot ());
160161 }
161162 }
162163 Self->Counters .GetCSCounters ().OnWriteTxComplete (now - writeMeta.GetWriteStartInstant ());
163164 Self->Counters .GetCSCounters ().OnSuccessWriteResponse ();
164165 }
165166 Self->Counters .GetTabletCounters ()->IncCounter (COUNTER_IMMEDIATE_TX_COMPLETED);
167+ Self->SetupIndexation ();
166168}
167169
168170} // namespace NKikimr::NColumnShard
0 commit comments